From 7fc6ad661d4723cc2ea1df1ff0c4611d5f534f9e Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Fri, 17 Aug 2012 20:29:38 +0000 Subject: [PATCH] YARN-25. remove old aggregated logs (Robert Evans via tgraves) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1374424 13f79535-47bb-0310-9956-ffa450edef68 --- .../mapreduce/v2/hs/JobHistoryServer.java | 4 + hadoop-yarn-project/CHANGES.txt | 2 + .../hadoop/yarn/conf/YarnConfiguration.java | 8 + .../AggregatedLogDeletionService.java | 156 ++++++++++++++++++ .../yarn/webapp/log/AggregatedLogsBlock.java | 18 ++ .../yarn/webapp/log/AggregatedLogsPage.java | 18 ++ .../src/main/resources/yarn-default.xml | 7 + .../TestAggregatedLogDeletionService.java | 131 +++++++++++++++ .../logaggregation/LogAggregationService.java | 4 +- .../src/site/apt/ClusterSetup.apt.vm | 13 ++ 10 files changed, 359 insertions(+), 2 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java index cdf1f380d9..1eb62b0567 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java @@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.logaggregation.AggregatedLogDeletionService; import org.apache.hadoop.yarn.service.CompositeService; /****************************************************************** @@ -53,6 +54,7 @@ public class JobHistoryServer extends CompositeService { private HistoryClientService clientService; private JobHistory jobHistoryService; private JHSDelegationTokenSecretManager jhsDTSecretManager; + private AggregatedLogDeletionService aggLogDelService; public JobHistoryServer() { super(JobHistoryServer.class.getName()); @@ -74,8 +76,10 @@ public synchronized void init(Configuration conf) { this.jhsDTSecretManager = createJHSSecretManager(conf); clientService = new HistoryClientService(historyContext, this.jhsDTSecretManager); + aggLogDelService = new AggregatedLogDeletionService(); addService(jobHistoryService); addService(clientService); + addService(aggLogDelService); super.init(config); } diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 3fb3739bc9..e31c3a64e9 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -47,3 +47,5 @@ Release 0.23.3 - Unreleased YARN-14. Symlinks to peer distributed cache files no longer work (Jason Lowe via bobby) + YARN-25. remove old aggregated logs (Robert Evans via tgraves) + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index c68fde287d..816df428d8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -353,6 +353,14 @@ public class YarnConfiguration extends Configuration { + "log-aggregation-enable"; public static final boolean DEFAULT_LOG_AGGREGATION_ENABLED = false; + /** + * How long to wait before deleting aggregated logs, -1 disables. + * Be careful set this too small and you will spam the name node. + */ + public static final String LOG_AGGREGATION_RETAIN_SECONDS = YARN_PREFIX + + "log-aggregation.retain-seconds"; + public static final long DEFAULT_LOG_AGGREGATION_RETAIN_SECONDS = -1; + /** * Number of seconds to retain logs on the NodeManager. Only applicable if Log * aggregation is disabled diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java new file mode 100644 index 0000000000..9fbcae9989 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java @@ -0,0 +1,156 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.logaggregation; + +import java.io.IOException; +import java.util.Timer; +import java.util.TimerTask; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.service.AbstractService; + +public class AggregatedLogDeletionService extends AbstractService { + private static final Log LOG = LogFactory.getLog(AggregatedLogDeletionService.class); + + private Timer timer = null; + + static class LogDeletionTask extends TimerTask { + private Configuration conf; + private long retentionMillis; + private String suffix = null; + private Path remoteRootLogDir = null; + + public LogDeletionTask(Configuration conf, long retentionSecs) { + this.conf = conf; + this.retentionMillis = retentionSecs * 1000; + this.suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf); + this.remoteRootLogDir = + new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, + YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR)); + } + + @Override + public void run() { + long cutoffMillis = System.currentTimeMillis() - retentionMillis; + LOG.info("aggregated log deletion started."); + try { + FileSystem fs = remoteRootLogDir.getFileSystem(conf); + + for(FileStatus userDir : fs.listStatus(remoteRootLogDir)) { + if(userDir.isDirectory()) { + Path userDirPath = new Path(userDir.getPath(), suffix); + deleteOldLogDirsFrom(userDirPath, cutoffMillis, fs); + } + } + } catch (IOException e) { + logIOException("Error reading root log dir this deletion " + + "attempt is being aborted", e); + } + LOG.info("aggregated log deletion finished."); + } + + private static void deleteOldLogDirsFrom(Path dir, long cutoffMillis, + FileSystem fs) { + try { + for(FileStatus appDir : fs.listStatus(dir)) { + if(appDir.isDirectory() && + appDir.getModificationTime() < cutoffMillis) { + if(shouldDeleteLogDir(appDir, cutoffMillis, fs)) { + try { + LOG.info("Deleting aggregated logs in "+appDir.getPath()); + fs.delete(appDir.getPath(), true); + } catch (IOException e) { + logIOException("Could not delete "+appDir.getPath(), e); + } + } + } + } + } catch (IOException e) { + logIOException("Could not read the contents of " + dir, e); + } + } + + private static boolean shouldDeleteLogDir(FileStatus dir, long cutoffMillis, + FileSystem fs) { + boolean shouldDelete = true; + try { + for(FileStatus node: fs.listStatus(dir.getPath())) { + if(node.getModificationTime() >= cutoffMillis) { + shouldDelete = false; + break; + } + } + } catch(IOException e) { + logIOException("Error reading the contents of " + dir.getPath(), e); + shouldDelete = false; + } + return shouldDelete; + } + } + + private static void logIOException(String comment, IOException e) { + if(e instanceof AccessControlException) { + String message = e.getMessage(); + //TODO fix this after HADOOP-8661 + message = message.split("\n")[0]; + LOG.warn(comment + " " + message); + } else { + LOG.error(comment, e); + } + } + + public AggregatedLogDeletionService() { + super(AggregatedLogDeletionService.class.getName()); + } + + public void start() { + Configuration conf = getConfig(); + if (!conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, + YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)) { + //Log aggregation is not enabled so don't bother + return; + } + long retentionSecs = conf.getLong(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, + YarnConfiguration.DEFAULT_LOG_AGGREGATION_RETAIN_SECONDS); + if(retentionSecs < 0) { + LOG.info("Log Aggregation deletion is disabled because retention is" + + " too small (" + retentionSecs + ")"); + return; + } + TimerTask task = new LogDeletionTask(conf, retentionSecs); + timer = new Timer(); + timer.scheduleAtFixedRate(task, 0, retentionSecs * 1000); + super.start(); + } + + @Override + public void stop() { + if(timer != null) { + timer.cancel(); + } + super.stop(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java index f43b5745a2..14455320d6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java @@ -1,3 +1,21 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + package org.apache.hadoop.yarn.webapp.log; import static org.apache.hadoop.yarn.webapp.YarnWebParams.APP_OWNER; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsPage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsPage.java index add4fc6853..8d1c855e38 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsPage.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsPage.java @@ -1,3 +1,21 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + package org.apache.hadoop.yarn.webapp.log; import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_ID; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index be4bcd1e54..0408ee1644 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -367,6 +367,13 @@ yarn.log-aggregation-enable false + + + How long to keep aggregation logs before deleting them. -1 disables. + Be careful set this too small and you will spam the name node. + yarn.log-aggregation.retain-seconds + -1 + Time in seconds to retain user logs. Only applicable if diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java new file mode 100644 index 0000000000..c1cf9af360 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java @@ -0,0 +1,131 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.logaggregation; + +import java.io.IOException; +import java.net.URI; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FilterFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.junit.Test; + +import static org.mockito.Mockito.*; + +public class TestAggregatedLogDeletionService { + + @Test + public void testDeletion() throws Exception { + long now = System.currentTimeMillis(); + long toDeleteTime = now - (2000*1000); + long toKeepTime = now - (1500*1000); + + String root = "mockfs://foo/"; + String remoteRootLogDir = root+"tmp/logs"; + String suffix = "logs"; + Configuration conf = new Configuration(); + conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class); + conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true"); + conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "1800"); + conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir); + conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix); + + Path rootPath = new Path(root); + FileSystem rootFs = rootPath.getFileSystem(conf); + FileSystem mockFs = ((FilterFileSystem)rootFs).getRawFileSystem(); + + Path remoteRootLogPath = new Path(remoteRootLogDir); + + Path userDir = new Path(remoteRootLogPath, "me"); + FileStatus userDirStatus = new FileStatus(0, true, 0, 0, toKeepTime, userDir); + + when(mockFs.listStatus(remoteRootLogPath)).thenReturn( + new FileStatus[]{userDirStatus}); + + Path userLogDir = new Path(userDir, suffix); + Path app1Dir = new Path(userLogDir, "application_1_1"); + FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app1Dir); + + Path app2Dir = new Path(userLogDir, "application_1_2"); + FileStatus app2DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app2Dir); + + Path app3Dir = new Path(userLogDir, "application_1_3"); + FileStatus app3DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app3Dir); + + Path app4Dir = new Path(userLogDir, "application_1_4"); + FileStatus app4DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app4Dir); + + when(mockFs.listStatus(userLogDir)).thenReturn( + new FileStatus[]{app1DirStatus, app2DirStatus, app3DirStatus, app4DirStatus}); + + when(mockFs.listStatus(app1Dir)).thenReturn( + new FileStatus[]{}); + + Path app2Log1 = new Path(app2Dir, "host1"); + FileStatus app2Log1Status = new FileStatus(10, false, 1, 1, toDeleteTime, app2Log1); + + Path app2Log2 = new Path(app2Dir, "host2"); + FileStatus app2Log2Status = new FileStatus(10, false, 1, 1, toKeepTime, app2Log2); + + when(mockFs.listStatus(app2Dir)).thenReturn( + new FileStatus[]{app2Log1Status, app2Log2Status}); + + Path app3Log1 = new Path(app3Dir, "host1"); + FileStatus app3Log1Status = new FileStatus(10, false, 1, 1, toDeleteTime, app3Log1); + + Path app3Log2 = new Path(app3Dir, "host2"); + FileStatus app3Log2Status = new FileStatus(10, false, 1, 1, toDeleteTime, app3Log2); + + when(mockFs.delete(app3Dir, true)).thenThrow(new AccessControlException("Injected Error\nStack Trace :(")); + + when(mockFs.listStatus(app3Dir)).thenReturn( + new FileStatus[]{app3Log1Status, app3Log2Status}); + + Path app4Log1 = new Path(app4Dir, "host1"); + FileStatus app4Log1Status = new FileStatus(10, false, 1, 1, toDeleteTime, app4Log1); + + Path app4Log2 = new Path(app4Dir, "host2"); + FileStatus app4Log2Status = new FileStatus(10, false, 1, 1, toDeleteTime, app4Log2); + + when(mockFs.listStatus(app4Dir)).thenReturn( + new FileStatus[]{app4Log1Status, app4Log2Status}); + + AggregatedLogDeletionService.LogDeletionTask task = + new AggregatedLogDeletionService.LogDeletionTask(conf, 1800); + + task.run(); + + verify(mockFs).delete(app1Dir, true); + verify(mockFs, times(0)).delete(app2Dir, true); + verify(mockFs).delete(app3Dir, true); + verify(mockFs).delete(app4Dir, true); + } + + + static class MockFileSystem extends FilterFileSystem { + MockFileSystem() { + super(mock(FileSystem.class)); + } + public void initialize(URI name, Configuration conf) throws IOException {} + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java index d29e6a0cf4..28e098ebea 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java @@ -70,7 +70,7 @@ public class LogAggregationService extends AbstractService implements /* * Expected deployment TLD will be 1777, owner=, group= App dirs will be created as 750, + * Group to which NMOwner belongs> App dirs will be created as 770, * owner=, group=: so that the owner and can * access / modify the files. * should obviously be a limited access group. @@ -85,7 +85,7 @@ public class LogAggregationService extends AbstractService implements * Permissions for the Application directory. */ private static final FsPermission APP_DIR_PERMISSIONS = FsPermission - .createImmutable((short) 0750); + .createImmutable((short) 0770); private final Context context; private final DeletionService deletionService; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/ClusterSetup.apt.vm b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/ClusterSetup.apt.vm index ebdc546b15..673125506f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/ClusterSetup.apt.vm +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/ClusterSetup.apt.vm @@ -314,6 +314,19 @@ Hadoop MapReduce Next Generation - Cluster Setup | | | Shuffle service that needs to be set for Map Reduce applications. | *-------------------------+-------------------------+------------------------+ + * Configurations for History Server (Needs to be moved elsewhere): + +*-------------------------+-------------------------+------------------------+ +|| Parameter || Value || Notes | +*-------------------------+-------------------------+------------------------+ +| <<>> | | | +| | <-1> | | +| | | How long to keep aggregation logs before deleting them. -1 disables. | +| | | Be careful, set this too small and you will spam the name node. | +*-------------------------+-------------------------+------------------------+ + + + * <<>> * Configurations for MapReduce Applications: