From 50fa9b89f42bd3fe6aad5086b0df14a00dadb24b Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Wed, 14 Dec 2011 19:55:22 +0000 Subject: [PATCH] MAPREDUCE-3398. Fixed log aggregation to work correctly in secure mode. Contributed by Siddharth Seth. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1214429 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../hadoop/yarn/conf/YarnConfiguration.java | 36 +++-- .../yarn/webapp/log/AggregatedLogsBlock.java | 4 +- .../yarn/server/api/records/NodeStatus.java | 4 + .../api/records/impl/pb/NodeStatusPBImpl.java | 116 ++++++++++++--- .../proto/yarn_server_common_protos.proto | 1 + .../src/main/resources/yarn-default.xml | 12 +- .../nodemanager/NodeStatusUpdaterImpl.java | 68 +++++++++ .../ContainerManagerImpl.java | 4 +- .../logaggregation/AppLogAggregatorImpl.java | 1 + .../nodemanager/webapp/NMController.java | 4 +- .../nodemanager/TestNodeStatusUpdater.java | 140 +++++++++++++++++- .../TestNonAggregatingLogHandler.java | 4 +- .../server/resourcemanager/RMAppManager.java | 2 +- .../ResourceTrackerService.java | 3 +- .../resourcemanager/rmnode/RMNodeImpl.java | 4 +- .../rmnode/RMNodeStatusEvent.java | 13 +- .../security/DelegationTokenRenewer.java | 111 +++++++++++++- .../security/TestDelegationTokenRenewer.java | 128 +++++++++++++++- .../webapp/TestRMWebServices.java | 2 +- .../src/site/apt/ClusterSetup.apt.vm | 8 +- 21 files changed, 606 insertions(+), 62 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index de372342d7..c73b06f044 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -296,6 +296,9 @@ Release 0.23.1 - Unreleased MAPREDUCE-3541. Fix broken TestJobQueueClient test. (Ravi Prakash via mahadev) + MAPREDUCE-3398. Fixed log aggregation to work correctly in secure mode. + (Siddharth Seth via vinodkv) + Release 0.23.0 - 2011-11-01 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index fa4ffa1656..1133d78816 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -91,12 +91,7 @@ public class YarnConfiguration extends Configuration { public static final String RM_CLIENT_THREAD_COUNT = RM_PREFIX + "client.thread-count"; public static final int DEFAULT_RM_CLIENT_THREAD_COUNT = 10; - - /** The expiry interval for application master reporting.*/ - public static final String RM_AM_EXPIRY_INTERVAL_MS = - RM_PREFIX + "am.liveness-monitor.expiry-interval-ms"; - public static final int DEFAULT_RM_AM_EXPIRY_INTERVAL_MS = 600000; - + /** The Kerberos principal for the resource manager.*/ public static final String RM_PRINCIPAL = RM_PREFIX + "principal"; @@ -126,7 +121,17 @@ public class YarnConfiguration extends Configuration { public static final int DEFAULT_RM_RESOURCE_TRACKER_PORT = 8025; public static final String DEFAULT_RM_RESOURCE_TRACKER_ADDRESS = "0.0.0.0:" + DEFAULT_RM_RESOURCE_TRACKER_PORT; - + + /** The expiry interval for application master reporting.*/ + public static final String RM_AM_EXPIRY_INTERVAL_MS = + YARN_PREFIX + "am.liveness-monitor.expiry-interval-ms"; + public static final int DEFAULT_RM_AM_EXPIRY_INTERVAL_MS = 600000; + + /** How long to wait until a node manager is considered dead.*/ + public static final String RM_NM_EXPIRY_INTERVAL_MS = + YARN_PREFIX + "nm.liveness-monitor.expiry-interval-ms"; + public static final int DEFAULT_RM_NM_EXPIRY_INTERVAL_MS = 600000; + /** Are acls enabled.*/ public static final String YARN_ACL_ENABLE = YARN_PREFIX + "acl.enable"; @@ -160,12 +165,7 @@ public class YarnConfiguration extends Configuration { /** The keytab for the resource manager.*/ public static final String RM_KEYTAB = RM_PREFIX + "keytab"; - - /** How long to wait until a node manager is considered dead.*/ - public static final String RM_NM_EXPIRY_INTERVAL_MS = - RM_PREFIX + "nm.liveness-monitor.expiry-interval-ms"; - public static final int DEFAULT_RM_NM_EXPIRY_INTERVAL_MS = 600000; - + /** How long to wait until a container is considered dead.*/ public static final String RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS = RM_PREFIX + "rm.container-allocation.expiry-interval-ms"; @@ -293,10 +293,16 @@ public class YarnConfiguration extends Configuration { public static final String NM_LOG_DIRS = NM_PREFIX + "log-dirs"; public static final String DEFAULT_NM_LOG_DIRS = "/tmp/logs"; + /** Interval at which the delayed token removal thread runs */ + public static final String RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS = + RM_PREFIX + "delayed.delegation-token.removal-interval-ms"; + public static final long DEFAULT_RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS = + 30000l; + /** Whether to enable log aggregation */ - public static final String NM_LOG_AGGREGATION_ENABLED = NM_PREFIX + public static final String LOG_AGGREGATION_ENABLED = YARN_PREFIX + "log-aggregation-enable"; - public static final boolean DEFAULT_NM_LOG_AGGREGATION_ENABLED = false; + public static final boolean DEFAULT_LOG_AGGREGATION_ENABLED = false; /** * Number of seconds to retain logs on the NodeManager. Only applicable if Log diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java index 5698e04dc8..766bf867b9 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java @@ -53,8 +53,8 @@ protected void render(Block html) { logEntity = containerId.toString(); } - if (!conf.getBoolean(YarnConfiguration.NM_LOG_AGGREGATION_ENABLED, - YarnConfiguration.DEFAULT_NM_LOG_AGGREGATION_ENABLED)) { + if (!conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, + YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)) { html.h1() ._("Aggregation is not enabled. Try the nodemanager at " + nodeId) ._(); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java index 7822789eb0..25602663c5 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java @@ -19,6 +19,7 @@ import java.util.List; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.api.records.NodeId; @@ -33,6 +34,9 @@ public interface NodeStatus { public abstract void setContainersStatuses( List containersStatuses); + public abstract List getKeepAliveApplications(); + public abstract void setKeepAliveApplications(List appIds); + NodeHealthStatus getNodeHealthStatus(); void setNodeHealthStatus(NodeHealthStatus healthStatus); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java index 03d5e8cdbd..8b5ff01a94 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java @@ -23,13 +23,16 @@ import java.util.Iterator; import java.util.List; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.ProtoBase; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NodeHealthStatusPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl; +import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeHealthStatusProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto; @@ -37,7 +40,9 @@ import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProtoOrBuilder; import org.apache.hadoop.yarn.server.api.records.NodeStatus; -public class NodeStatusPBImpl extends ProtoBase implements NodeStatus { + +public class NodeStatusPBImpl extends ProtoBase implements + NodeStatus { NodeStatusProto proto = NodeStatusProto.getDefaultInstance(); NodeStatusProto.Builder builder = null; boolean viaProto = false; @@ -45,6 +50,7 @@ public class NodeStatusPBImpl extends ProtoBase implements Node private NodeId nodeId = null; private List containers = null; private NodeHealthStatus nodeHealthStatus = null; + private List keepAliveApplications = null; public NodeStatusPBImpl() { builder = NodeStatusProto.newBuilder(); @@ -55,15 +61,14 @@ public NodeStatusPBImpl(NodeStatusProto proto) { viaProto = true; } - public NodeStatusProto getProto() { - - mergeLocalToProto(); + public synchronized NodeStatusProto getProto() { + mergeLocalToProto(); proto = viaProto ? proto : builder.build(); viaProto = true; return proto; } - private void mergeLocalToBuilder() { + private synchronized void mergeLocalToBuilder() { if (this.nodeId != null) { builder.setNodeId(convertToProtoFormat(this.nodeId)); } @@ -73,9 +78,12 @@ private void mergeLocalToBuilder() { if (this.nodeHealthStatus != null) { builder.setNodeHealthStatus(convertToProtoFormat(this.nodeHealthStatus)); } + if (this.keepAliveApplications != null) { + addKeepAliveApplicationsToProto(); + } } - private void mergeLocalToProto() { + private synchronized void mergeLocalToProto() { if (viaProto) maybeInitBuilder(); mergeLocalToBuilder(); @@ -84,14 +92,14 @@ private void mergeLocalToProto() { viaProto = true; } - private void maybeInitBuilder() { + private synchronized void maybeInitBuilder() { if (viaProto || builder == null) { builder = NodeStatusProto.newBuilder(proto); } viaProto = false; } - private void addContainersToProto() { + private synchronized void addContainersToProto() { maybeInitBuilder(); builder.clearContainersStatuses(); if (containers == null) @@ -124,19 +132,53 @@ public void remove() { }; builder.addAllContainersStatuses(iterable); } + + private synchronized void addKeepAliveApplicationsToProto() { + maybeInitBuilder(); + builder.clearKeepAliveApplications(); + if (keepAliveApplications == null) + return; + Iterable iterable = new Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + + Iterator iter = keepAliveApplications.iterator(); + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public ApplicationIdProto next() { + return convertToProtoFormat(iter.next()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + + } + }; + + } + }; + builder.addAllKeepAliveApplications(iterable); + } @Override - public int getResponseId() { + public synchronized int getResponseId() { NodeStatusProtoOrBuilder p = viaProto ? proto : builder; return p.getResponseId(); } @Override - public void setResponseId(int responseId) { + public synchronized void setResponseId(int responseId) { maybeInitBuilder(); builder.setResponseId(responseId); } @Override - public NodeId getNodeId() { + public synchronized NodeId getNodeId() { NodeStatusProtoOrBuilder p = viaProto ? proto : builder; if (this.nodeId != null) { return this.nodeId; @@ -148,8 +190,9 @@ public NodeId getNodeId() { return this.nodeId; } + @Override - public void setNodeId(NodeId nodeId) { + public synchronized void setNodeId(NodeId nodeId) { maybeInitBuilder(); if (nodeId == null) builder.clearNodeId(); @@ -158,20 +201,35 @@ public void setNodeId(NodeId nodeId) { } @Override - public List getContainersStatuses() { + public synchronized List getContainersStatuses() { initContainers(); return this.containers; } @Override - public void setContainersStatuses(List containers) { + public synchronized void setContainersStatuses( + List containers) { if (containers == null) { builder.clearContainersStatuses(); } this.containers = containers; } + + @Override + public synchronized List getKeepAliveApplications() { + initKeepAliveApplications(); + return this.keepAliveApplications; + } + + @Override + public synchronized void setKeepAliveApplications(List appIds) { + if (appIds == null) { + builder.clearKeepAliveApplications(); + } + this.keepAliveApplications = appIds; + } - private void initContainers() { + private synchronized void initContainers() { if (this.containers != null) { return; } @@ -185,8 +243,22 @@ private void initContainers() { } + private synchronized void initKeepAliveApplications() { + if (this.keepAliveApplications != null) { + return; + } + NodeStatusProtoOrBuilder p = viaProto ? proto : builder; + List list = p.getKeepAliveApplicationsList(); + this.keepAliveApplications = new ArrayList(); + + for (ApplicationIdProto c : list) { + this.keepAliveApplications.add(convertFromProtoFormat(c)); + } + + } + @Override - public NodeHealthStatus getNodeHealthStatus() { + public synchronized NodeHealthStatus getNodeHealthStatus() { NodeStatusProtoOrBuilder p = viaProto ? proto : builder; if (nodeHealthStatus != null) { return nodeHealthStatus; @@ -199,7 +271,7 @@ public NodeHealthStatus getNodeHealthStatus() { } @Override - public void setNodeHealthStatus(NodeHealthStatus healthStatus) { + public synchronized void setNodeHealthStatus(NodeHealthStatus healthStatus) { maybeInitBuilder(); if (healthStatus == null) { builder.clearNodeHealthStatus(); @@ -231,4 +303,12 @@ private ContainerStatusPBImpl convertFromProtoFormat(ContainerStatusProto c) { private ContainerStatusProto convertToProtoFormat(ContainerStatus c) { return ((ContainerStatusPBImpl)c).getProto(); } -} + + private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto c) { + return new ApplicationIdPBImpl(c); + } + + private ApplicationIdProto convertToProtoFormat(ApplicationId c) { + return ((ApplicationIdPBImpl)c).getProto(); + } +} diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto index b2e995f45a..4f5543e5ca 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto @@ -34,6 +34,7 @@ message NodeStatusProto { optional int32 response_id = 2; repeated ContainerStatusProto containersStatuses = 3; optional NodeHealthStatusProto nodeHealthStatus = 4; + repeated ApplicationIdProto keep_alive_applications = 5; } message RegistrationResponseProto { diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/resources/yarn-default.xml b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/resources/yarn-default.xml index fdb7cb6c5b..25e25cd0a7 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/resources/yarn-default.xml +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/resources/yarn-default.xml @@ -72,7 +72,7 @@ The expiry interval for application master reporting. - yarn.resourcemanager.am.liveness-monitor.expiry-interval-ms + yarn.am.liveness-monitor.expiry-interval-ms 600000 @@ -155,7 +155,7 @@ How long to wait until a node manager is considered dead. - yarn.resourcemanager.nm.liveness-monitor.expiry-interval-ms + yarn.nm.liveness-monitor.expiry-interval-ms 600000 @@ -210,6 +210,12 @@ 10000 + + Interval at which the delayed token removal thread runs + yarn.resourcemanager.delayed.delegation-token.removal-interval-ms + 30000 + + address of node manager IPC. @@ -304,7 +310,7 @@ Whether to enable log aggregation - yarn.nodemanager.log-aggregation-enable + yarn.log-aggregation-enable false diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 6da70f1502..f0007a657a 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -20,8 +20,12 @@ import java.net.InetSocketAddress; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; +import java.util.Random; import java.util.Map.Entry; import org.apache.avro.AvroRuntimeException; @@ -56,6 +60,7 @@ import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager; import org.apache.hadoop.yarn.service.AbstractService; + public class NodeStatusUpdaterImpl extends AbstractService implements NodeStatusUpdater { @@ -76,6 +81,12 @@ public class NodeStatusUpdaterImpl extends AbstractService implements private byte[] secretKeyBytes = new byte[0]; private boolean isStopped; private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); + private boolean tokenKeepAliveEnabled; + private long tokenRemovalDelayMs; + /** Keeps track of when the next keep alive request should be sent for an app*/ + private Map appTokenKeepAliveMap = + new HashMap(); + private Random keepAliveDelayRandom = new Random(); private final NodeHealthCheckerService healthChecker; private final NodeManagerMetrics metrics; @@ -103,6 +114,13 @@ public synchronized void init(Configuration conf) { this.totalResource = recordFactory.newRecordInstance(Resource.class); this.totalResource.setMemory(memoryMb); metrics.addResource(totalResource); + this.tokenKeepAliveEnabled = + conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, + YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED) + && isSecurityEnabled(); + this.tokenRemovalDelayMs = + conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, + YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS); super.init(conf); } @@ -139,6 +157,10 @@ public synchronized void stop() { super.stop(); } + protected boolean isSecurityEnabled() { + return UserGroupInformation.isSecurityEnabled(); + } + protected ResourceTracker getRMClient() { Configuration conf = getConfig(); YarnRPC rpc = YarnRPC.create(conf); @@ -188,6 +210,29 @@ public byte[] getRMNMSharedSecret() { return this.secretKeyBytes.clone(); } + private List createKeepAliveApplicationList() { + if (!tokenKeepAliveEnabled) { + return Collections.emptyList(); + } + + List appList = new ArrayList(); + for (Iterator> i = + this.appTokenKeepAliveMap.entrySet().iterator(); i.hasNext();) { + Entry e = i.next(); + ApplicationId appId = e.getKey(); + Long nextKeepAlive = e.getValue(); + if (!this.context.getApplications().containsKey(appId)) { + // Remove if the application has finished. + i.remove(); + } else if (System.currentTimeMillis() > nextKeepAlive) { + // KeepAlive list for the next hearbeat. + appList.add(appId); + trackAppForKeepAlive(appId); + } + } + return appList; + } + private NodeStatus getNodeStatus() { NodeStatus nodeStatus = recordFactory.newRecordInstance(NodeStatus.class); @@ -231,9 +276,29 @@ private NodeStatus getNodeStatus() { } nodeStatus.setNodeHealthStatus(nodeHealthStatus); + List keepAliveAppIds = createKeepAliveApplicationList(); + nodeStatus.setKeepAliveApplications(keepAliveAppIds); + return nodeStatus; } + private void trackAppsForKeepAlive(List appIds) { + if (tokenKeepAliveEnabled && appIds != null && appIds.size() > 0) { + for (ApplicationId appId : appIds) { + trackAppForKeepAlive(appId); + } + } + } + + private void trackAppForKeepAlive(ApplicationId appId) { + // Next keepAlive request for app between 0.7 & 0.9 of when the token will + // likely expire. + long nextTime = System.currentTimeMillis() + + (long) (0.7 * tokenRemovalDelayMs + (0.2 * tokenRemovalDelayMs + * keepAliveDelayRandom.nextInt(100))/100); + appTokenKeepAliveMap.put(appId, nextTime); + } + @Override public void sendOutofBandHeartBeat() { synchronized (this.heartbeatMonitor) { @@ -245,6 +310,7 @@ protected void startStatusUpdater() { new Thread("Node Status Updater") { @Override + @SuppressWarnings("unchecked") public void run() { int lastHeartBeatID = 0; while (!isStopped) { @@ -284,6 +350,8 @@ public void run() { } List appsToCleanup = response.getApplicationsToCleanupList(); + //Only start tracking for keepAlive on FINISH_APP + trackAppsForKeepAlive(appsToCleanup); if (appsToCleanup.size() != 0) { dispatcher.getEventHandler().handle( new CMgrCompletedAppsEvent(appsToCleanup)); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 615b825c4f..3169f2f1b8 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -192,8 +192,8 @@ private void addIfService(Object object) { protected LogHandler createLogHandler(Configuration conf, Context context, DeletionService deletionService) { - if (conf.getBoolean(YarnConfiguration.NM_LOG_AGGREGATION_ENABLED, - YarnConfiguration.DEFAULT_NM_LOG_AGGREGATION_ENABLED)) { + if (conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, + YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)) { return new LogAggregationService(this.dispatcher, context, deletionService, dirsHandler); } else { diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java index 5cfcc0d2ea..fdd4ecb2f6 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java @@ -170,6 +170,7 @@ public void run() { this.writer.closeWriter(); LOG.info("Finished aggregate log-file for app " + this.applicationId); } + try { userUgi.doAs(new PrivilegedExceptionAction() { @Override diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMController.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMController.java index 033271afde..86e25056bb 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMController.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMController.java @@ -88,8 +88,8 @@ public void logs() { containerId.getApplicationAttemptId().getApplicationId(); Application app = nmContext.getApplications().get(appId); if (app == null - && nmConf.getBoolean(YarnConfiguration.NM_LOG_AGGREGATION_ENABLED, - YarnConfiguration.DEFAULT_NM_LOG_AGGREGATION_ENABLED)) { + && nmConf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, + YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)) { String logServerUrl = nmConf.get(YarnConfiguration.YARN_LOG_SERVER_URL); String redirectUrl = null; if (logServerUrl == null || logServerUrl.isEmpty()) { diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index c1462746ff..cfb32679a6 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -22,7 +22,9 @@ import java.net.InetAddress; import java.net.UnknownHostException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentMap; @@ -56,6 +58,7 @@ import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.api.records.RegistrationResponse; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; @@ -63,10 +66,12 @@ import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager; import org.apache.hadoop.yarn.service.Service; import org.apache.hadoop.yarn.service.Service.STATE; +import org.apache.hadoop.yarn.util.BuilderUtils; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import static org.mockito.Mockito.mock; public class TestNodeStatusUpdater { @@ -216,7 +221,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) HeartbeatResponse response = recordFactory .newRecordInstance(HeartbeatResponse.class); response.setResponseId(heartBeatID); - + NodeHeartbeatResponse nhResponse = recordFactory .newRecordInstance(NodeHeartbeatResponse.class); nhResponse.setHeartbeatResponse(response); @@ -241,6 +246,48 @@ protected ResourceTracker getRMClient() { return resourceTracker; } } + + private class MyNodeStatusUpdater3 extends NodeStatusUpdaterImpl { + public ResourceTracker resourceTracker; + private Context context; + + public MyNodeStatusUpdater3(Context context, Dispatcher dispatcher, + NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics, + ContainerTokenSecretManager containerTokenSecretManager) { + super(context, dispatcher, healthChecker, metrics, + containerTokenSecretManager); + this.context = context; + this.resourceTracker = new MyResourceTracker3(this.context); + } + + @Override + protected ResourceTracker getRMClient() { + return resourceTracker; + } + + @Override + protected boolean isSecurityEnabled() { + return true; + } + } + + private class MyNodeManager extends NodeManager { + + private MyNodeStatusUpdater3 nodeStatusUpdater; + @Override + protected NodeStatusUpdater createNodeStatusUpdater(Context context, + Dispatcher dispatcher, NodeHealthCheckerService healthChecker, + ContainerTokenSecretManager containerTokenSecretManager) { + this.nodeStatusUpdater = + new MyNodeStatusUpdater3(context, dispatcher, healthChecker, metrics, + containerTokenSecretManager); + return this.nodeStatusUpdater; + } + + protected MyNodeStatusUpdater3 getNodeStatusUpdater() { + return this.nodeStatusUpdater; + } + } // private class MyResourceTracker2 implements ResourceTracker { @@ -276,6 +323,65 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) } } + private class MyResourceTracker3 implements ResourceTracker { + public NodeAction heartBeatNodeAction = NodeAction.NORMAL; + public NodeAction registerNodeAction = NodeAction.NORMAL; + private Map> keepAliveRequests = + new HashMap>(); + private ApplicationId appId = BuilderUtils.newApplicationId(1, 1); + private final Context context; + + MyResourceTracker3(Context context) { + this.context = context; + } + + @Override + public RegisterNodeManagerResponse registerNodeManager( + RegisterNodeManagerRequest request) throws YarnRemoteException { + + RegisterNodeManagerResponse response = + recordFactory.newRecordInstance(RegisterNodeManagerResponse.class); + RegistrationResponse regResponse = + recordFactory.newRecordInstance(RegistrationResponse.class); + regResponse.setNodeAction(registerNodeAction); + response.setRegistrationResponse(regResponse); + return response; + } + + @Override + public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) + throws YarnRemoteException { + LOG.info("Got heartBeatId: [" + heartBeatID +"]"); + NodeStatus nodeStatus = request.getNodeStatus(); + nodeStatus.setResponseId(heartBeatID++); + HeartbeatResponse response = + recordFactory.newRecordInstance(HeartbeatResponse.class); + response.setResponseId(heartBeatID); + response.setNodeAction(heartBeatNodeAction); + + if (nodeStatus.getKeepAliveApplications() != null + && nodeStatus.getKeepAliveApplications().size() > 0) { + for (ApplicationId appId : nodeStatus.getKeepAliveApplications()) { + List list = keepAliveRequests.get(appId); + if (list == null) { + list = new LinkedList(); + keepAliveRequests.put(appId, list); + } + list.add(System.currentTimeMillis()); + } + } + if (heartBeatID == 2) { + LOG.info("Sending FINISH_APP for application: [" + appId + "]"); + this.context.getApplications().put(appId, mock(Application.class)); + response.addAllApplicationsToCleanup(Collections.singletonList(appId)); + } + NodeHeartbeatResponse nhResponse = + recordFactory.newRecordInstance(NodeHeartbeatResponse.class); + nhResponse.setHeartbeatResponse(response); + return nhResponse; + } + } + @Before public void clearError() { nmStartError = null; @@ -456,6 +562,38 @@ public void start() { verifyNodeStartFailure("Starting of RPC Server failed"); } + @Test + public void testApplicationKeepAlive() throws Exception { + MyNodeManager nm = new MyNodeManager(); + try { + YarnConfiguration conf = createNMConfig(); + conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true); + conf.setLong(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, + 4000l); + nm.init(conf); + nm.start(); + // HB 2 -> app cancelled by RM. + while (heartBeatID < 12) { + Thread.sleep(1000l); + } + MyResourceTracker3 rt = + (MyResourceTracker3) nm.getNodeStatusUpdater().getRMClient(); + rt.context.getApplications().remove(rt.appId); + Assert.assertEquals(1, rt.keepAliveRequests.size()); + int numKeepAliveRequests = rt.keepAliveRequests.get(rt.appId).size(); + LOG.info("Number of Keep Alive Requests: [" + numKeepAliveRequests + "]"); + Assert.assertTrue(numKeepAliveRequests == 2 || numKeepAliveRequests == 3); + while (heartBeatID < 20) { + Thread.sleep(1000l); + } + int numKeepAliveRequests2 = rt.keepAliveRequests.get(rt.appId).size(); + Assert.assertEquals(numKeepAliveRequests, numKeepAliveRequests2); + } finally { + if (nm.getServiceState() == STATE.STARTED) + nm.stop(); + } + } + private void verifyNodeStartFailure(String errMessage) { YarnConfiguration conf = createNMConfig(); nm.init(conf); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java index a5e5eb06bc..bbee9c5848 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java @@ -68,7 +68,7 @@ public void testLogDeletion() { + localLogDirs[1].getAbsolutePath(); conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDirsString); - conf.setBoolean(YarnConfiguration.NM_LOG_AGGREGATION_ENABLED, false); + conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false); conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 0l); DrainDispatcher dispatcher = createDispatcher(conf); @@ -142,7 +142,7 @@ public void testDelayedDelete() { + localLogDirs[1].getAbsolutePath(); conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDirsString); - conf.setBoolean(YarnConfiguration.NM_LOG_AGGREGATION_ENABLED, false); + conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false); conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 10800l); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index 4c4334c4dc..ad28d6c910 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -173,7 +173,7 @@ protected synchronized void finishApplication(ApplicationId applicationId) { } else { // Inform the DelegationTokenRenewer if (UserGroupInformation.isSecurityEnabled()) { - rmContext.getDelegationTokenRenewer().removeApplication(applicationId); + rmContext.getDelegationTokenRenewer().applicationFinished(applicationId); } completedApps.add(applicationId); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index a01a0bf42e..58697486d5 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -272,7 +272,8 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) // 4. Send status to RMNode, saving the latest response. this.rmContext.getDispatcher().getEventHandler().handle( new RMNodeStatusEvent(nodeId, remoteNodeStatus.getNodeHealthStatus(), - remoteNodeStatus.getContainersStatuses(), latestResponse)); + remoteNodeStatus.getContainersStatuses(), + remoteNodeStatus.getKeepAliveApplications(), latestResponse)); nodeHeartBeatResponse.setHeartbeatResponse(latestResponse); return nodeHeartBeatResponse; diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 30109edbc0..24fc80d442 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -414,7 +414,9 @@ public RMNodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { rmNode.context.getDispatcher().getEventHandler().handle( new NodeUpdateSchedulerEvent(rmNode, newlyLaunchedContainers, completedContainers)); - + + rmNode.context.getDelegationTokenRenewer().updateKeepAliveApplications( + statusEvent.getKeepAliveAppIds()); return RMNodeState.RUNNING; } } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java index e4a2930168..1285c2bed9 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java @@ -20,6 +20,7 @@ import java.util.List; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.api.records.NodeId; @@ -28,15 +29,17 @@ public class RMNodeStatusEvent extends RMNodeEvent { private final NodeHealthStatus nodeHealthStatus; - private List containersCollection; + private final List containersCollection; private final HeartbeatResponse latestResponse; + private final List keepAliveAppIds; public RMNodeStatusEvent(NodeId nodeId, NodeHealthStatus nodeHealthStatus, - List collection, + List collection, List keepAliveAppIds, HeartbeatResponse latestResponse) { super(nodeId, RMNodeEventType.STATUS_UPDATE); this.nodeHealthStatus = nodeHealthStatus; this.containersCollection = collection; + this.keepAliveAppIds = keepAliveAppIds; this.latestResponse = latestResponse; } @@ -51,4 +54,8 @@ public List getContainers() { public HeartbeatResponse getLatestResponse() { return this.latestResponse; } -} + + public List getKeepAliveAppIds() { + return this.keepAliveAppIds; + } +} \ No newline at end of file diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java index a7d2e4582d..d837f7d759 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java @@ -20,14 +20,19 @@ import java.io.IOException; import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Date; import java.util.HashSet; import java.util.Iterator; +import java.util.List; +import java.util.Map.Entry; import java.util.Set; import java.util.Timer; import java.util.TimerTask; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.LinkedBlockingQueue; import org.apache.commons.logging.Log; @@ -40,6 +45,7 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.service.AbstractService; /** @@ -65,7 +71,16 @@ public class DelegationTokenRenewer extends AbstractService { // appId=>List private Set delegationTokens = Collections.synchronizedSet(new HashSet()); + + private final ConcurrentMap delayedRemovalMap = + new ConcurrentHashMap(); + private long tokenRemovalDelayMs; + + private Thread delayedRemovalThread; + + private boolean tokenKeepAliveEnabled; + public DelegationTokenRenewer() { super(DelegationTokenRenewer.class.getName()); } @@ -73,6 +88,12 @@ public DelegationTokenRenewer() { @Override public synchronized void init(Configuration conf) { super.init(conf); + this.tokenKeepAliveEnabled = + conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, + YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED); + this.tokenRemovalDelayMs = + conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, + YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS); } @Override @@ -81,6 +102,12 @@ public synchronized void start() { dtCancelThread.start(); renewalTimer = new Timer(true); + if (tokenKeepAliveEnabled) { + delayedRemovalThread = + new Thread(new DelayedTokenRemovalRunnable(getConfig()), + "DelayedTokenCanceller"); + delayedRemovalThread.start(); + } } @Override @@ -94,6 +121,14 @@ public synchronized void stop() { } catch (InterruptedException e) { e.printStackTrace(); } + if (tokenKeepAliveEnabled && delayedRemovalThread != null) { + delayedRemovalThread.interrupt(); + try { + delayedRemovalThread.join(1000); + } catch (InterruptedException e) { + LOG.info("Interrupted while joining on delayed removal thread.", e); + } + } super.stop(); } @@ -343,12 +378,38 @@ private void removeFailedDelegationToken(DelegationTokenToRenew t) { if(t.timerTask!=null) t.timerTask.cancel(); } - + /** * Removing delegation token for completed applications. * @param applicationId completed application */ - public void removeApplication(ApplicationId applicationId) { + public void applicationFinished(ApplicationId applicationId) { + if (!tokenKeepAliveEnabled) { + removeApplicationFromRenewal(applicationId); + } else { + delayedRemovalMap.put(applicationId, System.currentTimeMillis() + + tokenRemovalDelayMs); + } + } + + /** + * Add a list of applications to the keep alive list. If an appId already + * exists, update it's keep-alive time. + * + * @param appIds + * the list of applicationIds to be kept alive. + * + */ + public void updateKeepAliveApplications(List appIds) { + if (tokenKeepAliveEnabled && appIds != null && appIds.size() > 0) { + for (ApplicationId appId : appIds) { + delayedRemovalMap.put(appId, System.currentTimeMillis() + + tokenRemovalDelayMs); + } + } + } + + private void removeApplicationFromRenewal(ApplicationId applicationId) { synchronized (delegationTokens) { Iterator it = delegationTokens.iterator(); while(it.hasNext()) { @@ -371,4 +432,50 @@ public void removeApplication(ApplicationId applicationId) { } } } + + /** + * Takes care of cancelling app delegation tokens after the configured + * cancellation delay, taking into consideration keep-alive requests. + * + */ + private class DelayedTokenRemovalRunnable implements Runnable { + + private long waitTimeMs; + + DelayedTokenRemovalRunnable(Configuration conf) { + waitTimeMs = + conf.getLong( + YarnConfiguration.RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS, + YarnConfiguration.DEFAULT_RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS); + } + + @Override + public void run() { + List toCancel = new ArrayList(); + while (!Thread.currentThread().isInterrupted()) { + Iterator> it = + delayedRemovalMap.entrySet().iterator(); + toCancel.clear(); + while (it.hasNext()) { + Entry e = it.next(); + if (e.getValue() < System.currentTimeMillis()) { + toCancel.add(e.getKey()); + } + } + for (ApplicationId appId : toCancel) { + removeApplicationFromRenewal(appId); + delayedRemovalMap.remove(appId); + } + synchronized (this) { + try { + wait(waitTimeMs); + } catch (InterruptedException e) { + LOG.info("Delayed Deletion Thread Interrupted. Shutting it down"); + return; + } + } + } + } + } + } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java index 9448fe0ed4..4184465451 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java @@ -20,12 +20,12 @@ import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.util.Collections; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -44,6 +44,7 @@ import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.util.BuilderUtils; import org.junit.After; import org.junit.Before; @@ -328,7 +329,7 @@ public void testDTRenewal () throws Exception { ApplicationId applicationId_1 = BuilderUtils.newApplicationId(0, 1); delegationTokenRenewer.addApplication(applicationId_1, ts, true); - delegationTokenRenewer.removeApplication(applicationId_1); + delegationTokenRenewer.applicationFinished(applicationId_1); numberOfExpectedRenewals = Renewer.counter; // number of renewals so far try { @@ -343,7 +344,7 @@ public void testDTRenewal () throws Exception { // also renewing of the cancelled token should fail try { token4.renew(conf); - assertTrue("Renewal of canceled token didn't fail", false); + fail("Renewal of cancelled token should have failed"); } catch (InvalidToken ite) { //expected } @@ -377,7 +378,7 @@ public void testDTRenewalWithNoCancel () throws Exception { ApplicationId applicationId_1 = BuilderUtils.newApplicationId(0, 1); delegationTokenRenewer.addApplication(applicationId_1, ts, false); - delegationTokenRenewer.removeApplication(applicationId_1); + delegationTokenRenewer.applicationFinished(applicationId_1); int numberOfExpectedRenewals = Renewer.counter; // number of renewals so far try { @@ -393,4 +394,123 @@ public void testDTRenewalWithNoCancel () throws Exception { // been canceled token1.renew(conf); } + + /** + * Basic idea of the test: + * 0. Setup token KEEP_ALIVE + * 1. create tokens. + * 2. register them for renewal - to be cancelled on app complete + * 3. Complete app. + * 4. Verify token is alive within the KEEP_ALIVE time + * 5. Verify token has been cancelled after the KEEP_ALIVE_TIME + * @throws IOException + * @throws URISyntaxException + */ + @Test + public void testDTKeepAlive1 () throws Exception { + DelegationTokenRenewer localDtr = new DelegationTokenRenewer(); + Configuration lconf = new Configuration(conf); + lconf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true); + //Keep tokens alive for 6 seconds. + lconf.setLong(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, 6000l); + //Try removing tokens every second. + lconf.setLong( + YarnConfiguration.RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS, + 1000l); + localDtr.init(lconf); + localDtr.start(); + + MyFS dfs = (MyFS)FileSystem.get(lconf); + LOG.info("dfs="+(Object)dfs.hashCode() + ";conf="+lconf.hashCode()); + + Credentials ts = new Credentials(); + // get the delegation tokens + MyToken token1 = dfs.getDelegationToken(new Text("user1")); + + String nn1 = DelegationTokenRenewer.SCHEME + "://host1:0"; + ts.addToken(new Text(nn1), token1); + + // register the tokens for renewal + ApplicationId applicationId_0 = BuilderUtils.newApplicationId(0, 0); + localDtr.addApplication(applicationId_0, ts, true); + localDtr.applicationFinished(applicationId_0); + + Thread.sleep(3000l); + + //Token should still be around. Renewal should not fail. + token1.renew(lconf); + + //Allow the keepalive time to run out + Thread.sleep(6000l); + + //The token should have been cancelled at this point. Renewal will fail. + try { + token1.renew(lconf); + fail("Renewal of cancelled token should have failed"); + } catch (InvalidToken ite) {} + } + + /** + * Basic idea of the test: + * 0. Setup token KEEP_ALIVE + * 1. create tokens. + * 2. register them for renewal - to be cancelled on app complete + * 3. Complete app. + * 4. Verify token is alive within the KEEP_ALIVE time + * 5. Send an explicity KEEP_ALIVE_REQUEST + * 6. Verify token KEEP_ALIVE time is renewed. + * 7. Verify token has been cancelled after the renewed KEEP_ALIVE_TIME. + * @throws IOException + * @throws URISyntaxException + */ + @Test + public void testDTKeepAlive2() throws Exception { + DelegationTokenRenewer localDtr = new DelegationTokenRenewer(); + Configuration lconf = new Configuration(conf); + lconf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true); + //Keep tokens alive for 6 seconds. + lconf.setLong(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, 6000l); + //Try removing tokens every second. + lconf.setLong( + YarnConfiguration.RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS, + 1000l); + localDtr.init(lconf); + localDtr.start(); + + MyFS dfs = (MyFS)FileSystem.get(lconf); + LOG.info("dfs="+(Object)dfs.hashCode() + ";conf="+lconf.hashCode()); + + Credentials ts = new Credentials(); + // get the delegation tokens + MyToken token1 = dfs.getDelegationToken(new Text("user1")); + + String nn1 = DelegationTokenRenewer.SCHEME + "://host1:0"; + ts.addToken(new Text(nn1), token1); + + // register the tokens for renewal + ApplicationId applicationId_0 = BuilderUtils.newApplicationId(0, 0); + localDtr.addApplication(applicationId_0, ts, true); + localDtr.applicationFinished(applicationId_0); + + Thread.sleep(4000l); + + //Send another keep alive. + localDtr.updateKeepAliveApplications(Collections + .singletonList(applicationId_0)); + //Renewal should not fail. + token1.renew(lconf); + + //Token should be around after this. + Thread.sleep(4500l); + //Renewal should not fail. - ~1.5 seconds for keepalive timeout. + token1.renew(lconf); + + //Allow the keepalive time to run out + Thread.sleep(3000l); + //The token should have been cancelled at this point. Renewal will fail. + try { + token1.renew(lconf); + fail("Renewal of cancelled token should have failed"); + } catch (InvalidToken ite) {} + } } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServices.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServices.java index 8f06500af5..b19e7b54e5 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServices.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServices.java @@ -395,7 +395,7 @@ public void testNodesQueryHealthyAndState() throws JSONException, Exception { nodeHealth.setHealthReport("test health report"); nodeHealth.setIsNodeHealthy(false); node.handle(new RMNodeStatusEvent(nm1.getNodeId(), nodeHealth, - new ArrayList(), null)); + new ArrayList(), null, null)); rm.NMwaitForState(nm1.getNodeId(), RMNodeState.UNHEALTHY); JSONObject json = r.path("ws").path("v1").path("cluster").path("nodes") diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/ClusterSetup.apt.vm b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/ClusterSetup.apt.vm index 98e19b2c51..8fe515babe 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/ClusterSetup.apt.vm +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/ClusterSetup.apt.vm @@ -193,6 +193,10 @@ Hadoop MapReduce Next Generation - Cluster Setup | | | ACLs are of for . | | | | Defaults to special value of <<*>> which means . | | | | Special value of just means no one has access. | +*-------------------------+-------------------------+------------------------+ +| <<>> | | | +| | | | +| | | Configuration to enable or disable log aggregation | *-------------------------+-------------------------+------------------------+ @@ -260,10 +264,6 @@ Hadoop MapReduce Next Generation - Cluster Setup | | are written. | | | | | Multiple paths help spread disk i/o. | *-------------------------+-------------------------+------------------------+ -| <<>> | | | -| | | | -| | | Configuration to enable or disable log aggregation | -*-------------------------+-------------------------+------------------------+ | <<>> | | | | | <10800> | | | | | Default time (in seconds) to retain log files on the NodeManager |