diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index a212d45099..76ef215122 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -310,6 +310,9 @@ Release 2.2.0 - UNRELEASED HADOOP-9703. org.apache.hadoop.ipc.Client leaks threads on stop. (Tsuyoshi OZAWA vi Colin Patrick McCabe) + HADOOP-9618. Add thread which detects GC pauses. + (Todd Lipcon via Colin Patrick McCabe) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/JvmPauseMonitor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/JvmPauseMonitor.java new file mode 100644 index 0000000000..f7932a6c8b --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/JvmPauseMonitor.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.util; + +import java.lang.management.GarbageCollectorMXBean; +import java.lang.management.ManagementFactory; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.base.Stopwatch; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +/** + * Class which sets up a simple thread which runs in a loop sleeping + * for a short interval of time. If the sleep takes significantly longer + * than its target time, it implies that the JVM or host machine has + * paused processing, which may cause other problems. If such a pause is + * detected, the thread logs a message. + */ +@InterfaceAudience.Private +public class JvmPauseMonitor { + private static final Log LOG = LogFactory.getLog( + JvmPauseMonitor.class); + + /** The target sleep time */ + private static final long SLEEP_INTERVAL_MS = 500; + + /** log WARN if we detect a pause longer than this threshold */ + private final long warnThresholdMs; + private static final String WARN_THRESHOLD_KEY = + "jvm.pause.warn-threshold.ms"; + private static final long WARN_THRESHOLD_DEFAULT = 10000; + + /** log INFO if we detect a pause longer than this threshold */ + private final long infoThresholdMs; + private static final String INFO_THRESHOLD_KEY = + "jvm.pause.info-threshold.ms"; + private static final long INFO_THRESHOLD_DEFAULT = 1000; + + + private Thread monitorThread; + private volatile boolean shouldRun = true; + + public JvmPauseMonitor(Configuration conf) { + this.warnThresholdMs = conf.getLong(WARN_THRESHOLD_KEY, WARN_THRESHOLD_DEFAULT); + this.infoThresholdMs = conf.getLong(INFO_THRESHOLD_KEY, INFO_THRESHOLD_DEFAULT); + } + + public void start() { + Preconditions.checkState(monitorThread == null, + "Already started"); + monitorThread = new Daemon(new Monitor()); + monitorThread.start(); + } + + public void stop() { + shouldRun = false; + monitorThread.interrupt(); + try { + monitorThread.join(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + private String formatMessage(long extraSleepTime, + Map gcTimesAfterSleep, + Map gcTimesBeforeSleep) { + + Set gcBeanNames = Sets.intersection( + gcTimesAfterSleep.keySet(), + gcTimesBeforeSleep.keySet()); + List gcDiffs = Lists.newArrayList(); + for (String name : gcBeanNames) { + GcTimes diff = gcTimesAfterSleep.get(name).subtract( + gcTimesBeforeSleep.get(name)); + if (diff.gcCount != 0) { + gcDiffs.add("GC pool '" + name + "' had collection(s): " + + diff.toString()); + } + } + + String ret = "Detected pause in JVM or host machine (eg GC): " + + "pause of approximately " + extraSleepTime + "ms\n"; + if (gcDiffs.isEmpty()) { + ret += "No GCs detected"; + } else { + ret += Joiner.on("\n").join(gcDiffs); + } + return ret; + } + + private Map getGcTimes() { + Map map = Maps.newHashMap(); + List gcBeans = + ManagementFactory.getGarbageCollectorMXBeans(); + for (GarbageCollectorMXBean gcBean : gcBeans) { + map.put(gcBean.getName(), new GcTimes(gcBean)); + } + return map; + } + + private static class GcTimes { + private GcTimes(GarbageCollectorMXBean gcBean) { + gcCount = gcBean.getCollectionCount(); + gcTimeMillis = gcBean.getCollectionTime(); + } + + private GcTimes(long count, long time) { + this.gcCount = count; + this.gcTimeMillis = time; + } + + private GcTimes subtract(GcTimes other) { + return new GcTimes(this.gcCount - other.gcCount, + this.gcTimeMillis - other.gcTimeMillis); + } + + @Override + public String toString() { + return "count=" + gcCount + " time=" + gcTimeMillis + "ms"; + } + + private long gcCount; + private long gcTimeMillis; + } + + private class Monitor implements Runnable { + @Override + public void run() { + Stopwatch sw = new Stopwatch(); + Map gcTimesBeforeSleep = getGcTimes(); + while (shouldRun) { + sw.reset().start(); + try { + Thread.sleep(SLEEP_INTERVAL_MS); + } catch (InterruptedException ie) { + return; + } + long extraSleepTime = sw.elapsedMillis() - SLEEP_INTERVAL_MS; + Map gcTimesAfterSleep = getGcTimes(); + + if (extraSleepTime > warnThresholdMs) { + LOG.warn(formatMessage( + extraSleepTime, gcTimesAfterSleep, gcTimesBeforeSleep)); + } else if (extraSleepTime > infoThresholdMs) { + LOG.info(formatMessage( + extraSleepTime, gcTimesAfterSleep, gcTimesBeforeSleep)); + } + + gcTimesBeforeSleep = gcTimesAfterSleep; + } + } + } + + /** + * Simple 'main' to facilitate manual testing of the pause monitor. + * + * This main function just leaks memory into a list. Running this class + * with a 1GB heap will very quickly go into "GC hell" and result in + * log messages about the GC pauses. + */ + public static void main(String []args) throws Exception { + new JvmPauseMonitor(new Configuration()).start(); + List list = Lists.newArrayList(); + int i = 0; + while (true) { + list.add(String.valueOf(i++)); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index ab6170dd97..6c358f755b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -169,6 +169,7 @@ import org.apache.hadoop.util.DiskChecker.DiskErrorException; import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; import org.apache.hadoop.util.GenericOptionsParser; +import org.apache.hadoop.util.JvmPauseMonitor; import org.apache.hadoop.util.ServicePlugin; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Time; @@ -280,6 +281,8 @@ public static InetSocketAddress createSocketAddr(String target) { // For InterDataNodeProtocol public RPC.Server ipcServer; + private JvmPauseMonitor pauseMonitor; + private SecureResources secureResources = null; private AbstractList dataDirs; private Configuration conf; @@ -739,6 +742,8 @@ void startDataNode(Configuration conf, registerMXBean(); initDataXceiver(conf); startInfoServer(conf); + pauseMonitor = new JvmPauseMonitor(conf); + pauseMonitor.start(); // BlockPoolTokenSecretManager is required to create ipc server. this.blockPoolTokenSecretManager = new BlockPoolTokenSecretManager(); @@ -1221,6 +1226,9 @@ public void shutdown() { if (ipcServer != null) { ipcServer.stop(); } + if (pauseMonitor != null) { + pauseMonitor.stop(); + } if (dataXceiverServer != null) { ((DataXceiverServer) this.dataXceiverServer.getRunnable()).kill(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java index 32a6b6ee29..717f07fe87 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java @@ -76,6 +76,7 @@ import org.apache.hadoop.tools.GetUserMappingsProtocol; import org.apache.hadoop.util.ExitUtil.ExitException; import org.apache.hadoop.util.GenericOptionsParser; +import org.apache.hadoop.util.JvmPauseMonitor; import org.apache.hadoop.util.ServicePlugin; import org.apache.hadoop.util.StringUtils; @@ -254,6 +255,8 @@ public long getProtocolVersion(String protocol, private List plugins; private NameNodeRpcServer rpcServer; + + private JvmPauseMonitor pauseMonitor; /** Format a new filesystem. Destroys any filesystem that may already * exist at this location. **/ @@ -461,6 +464,9 @@ protected void initialize(Configuration conf) throws IOException { } else { validateConfigurationSettingsOrAbort(conf); } + + pauseMonitor = new JvmPauseMonitor(conf); + pauseMonitor.start(); startCommonServices(conf); } @@ -541,6 +547,7 @@ private void startCommonServices(Configuration conf) throws IOException { private void stopCommonServices() { if(namesystem != null) namesystem.close(); if(rpcServer != null) rpcServer.stop(); + if (pauseMonitor != null) pauseMonitor.stop(); if (plugins != null) { for (ServicePlugin p : plugins) { try {