From e324c20692036c59dcdc687c2a13e87a3789157d Mon Sep 17 00:00:00 2001 From: Hemanth Yamijala Date: Mon, 7 Sep 2009 08:16:09 +0000 Subject: [PATCH] HADOOP-6230. Moved process tree and memory calculator related classes from Common to Map/Reduce. Contributed by Vinod Kumar Vavilapalli. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@812031 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 3 + .../util/LinuxMemoryCalculatorPlugin.java | 132 ----- .../hadoop/util/MemoryCalculatorPlugin.java | 74 --- .../org/apache/hadoop/util/ProcessTree.java | 313 ----------- .../hadoop/util/ProcfsBasedProcessTree.java | 526 ------------------ src/java/org/apache/hadoop/util/Shell.java | 2 +- .../util/TestProcfsBasedProcessTree.java | 468 ---------------- 7 files changed, 4 insertions(+), 1514 deletions(-) delete mode 100644 src/java/org/apache/hadoop/util/LinuxMemoryCalculatorPlugin.java delete mode 100644 src/java/org/apache/hadoop/util/MemoryCalculatorPlugin.java delete mode 100644 src/java/org/apache/hadoop/util/ProcessTree.java delete mode 100644 src/java/org/apache/hadoop/util/ProcfsBasedProcessTree.java delete mode 100644 src/test/core/org/apache/hadoop/util/TestProcfsBasedProcessTree.java diff --git a/CHANGES.txt b/CHANGES.txt index 84d9112658..96b8111cfd 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -78,6 +78,9 @@ Trunk (unreleased changes) FileNotFoundException if the directory does not exist, rather than letting this be implementation-specific. (Jakob Homan via cdouglas) + HADOOP-6230. Moved process tree and memory calculator related classes + from Common to Map/Reduce. (Vinod Kumar Vavilapalli via yhemanth) + NEW FEATURES HADOOP-4268. Change fsck to use ClientProtocol methods so that the diff --git a/src/java/org/apache/hadoop/util/LinuxMemoryCalculatorPlugin.java b/src/java/org/apache/hadoop/util/LinuxMemoryCalculatorPlugin.java deleted file mode 100644 index 3870a4715a..0000000000 --- a/src/java/org/apache/hadoop/util/LinuxMemoryCalculatorPlugin.java +++ /dev/null @@ -1,132 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.util; - -import java.io.BufferedReader; -import java.io.FileNotFoundException; -import java.io.FileReader; -import java.io.IOException; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -/** - * Plugin to calculate virtual and physical memories on Linux systems. - */ -public class LinuxMemoryCalculatorPlugin extends MemoryCalculatorPlugin { - private static final Log LOG = - LogFactory.getLog(LinuxMemoryCalculatorPlugin.class); - - /** - * proc's meminfo virtual file has keys-values in the format - * "key:[ \t]*value[ \t]kB". - */ - private static final String PROCFS_MEMFILE = "/proc/meminfo"; - private static final Pattern PROCFS_MEMFILE_FORMAT = - Pattern.compile("^([a-zA-Z]*):[ \t]*([0-9]*)[ \t]kB"); - - // We just need the values for the keys MemTotal and SwapTotal - private static final String MEMTOTAL_STRING = "MemTotal"; - private static final String SWAPTOTAL_STRING = "SwapTotal"; - - private long ramSize = 0; - private long swapSize = 0; - - boolean readMemInfoFile = false; - - private void readProcMemInfoFile() { - - if (readMemInfoFile) { - return; - } - - // Read "/proc/memInfo" file - BufferedReader in = null; - FileReader fReader = null; - try { - fReader = new FileReader(PROCFS_MEMFILE); - in = new BufferedReader(fReader); - } catch (FileNotFoundException f) { - // shouldn't happen.... - return; - } - - Matcher mat = null; - - try { - String str = in.readLine(); - while (str != null) { - mat = PROCFS_MEMFILE_FORMAT.matcher(str); - if (mat.find()) { - if (mat.group(1).equals(MEMTOTAL_STRING)) { - ramSize = Long.parseLong(mat.group(2)); - } else if (mat.group(1).equals(SWAPTOTAL_STRING)) { - swapSize = Long.parseLong(mat.group(2)); - } - } - str = in.readLine(); - } - } catch (IOException io) { - LOG.warn("Error reading the stream " + io); - } finally { - // Close the streams - try { - fReader.close(); - try { - in.close(); - } catch (IOException i) { - LOG.warn("Error closing the stream " + in); - } - } catch (IOException i) { - LOG.warn("Error closing the stream " + fReader); - } - } - - readMemInfoFile = true; - } - - /** {@inheritDoc} */ - @Override - public long getPhysicalMemorySize() { - readProcMemInfoFile(); - return ramSize * 1024; - } - - /** {@inheritDoc} */ - @Override - public long getVirtualMemorySize() { - readProcMemInfoFile(); - return (ramSize + swapSize) * 1024; - } - - /** - * Test the {@link LinuxMemoryCalculatorPlugin} - * - * @param args - */ - public static void main(String[] args) { - LinuxMemoryCalculatorPlugin plugin = new LinuxMemoryCalculatorPlugin(); - System.out.println("Physical memory Size(bytes) : " - + plugin.getPhysicalMemorySize()); - System.out.println("Total Virtual memory Size(bytes) : " - + plugin.getVirtualMemorySize()); - } -} \ No newline at end of file diff --git a/src/java/org/apache/hadoop/util/MemoryCalculatorPlugin.java b/src/java/org/apache/hadoop/util/MemoryCalculatorPlugin.java deleted file mode 100644 index a767b663d4..0000000000 --- a/src/java/org/apache/hadoop/util/MemoryCalculatorPlugin.java +++ /dev/null @@ -1,74 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.util; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.Configured; - -/** - * Plugin to calculate virtual and physical memories on the system. - * - */ -public abstract class MemoryCalculatorPlugin extends Configured { - - /** - * Obtain the total size of the virtual memory present in the system. - * - * @return virtual memory size in bytes. - */ - public abstract long getVirtualMemorySize(); - - /** - * Obtain the total size of the physical memory present in the system. - * - * @return physical memory size bytes. - */ - public abstract long getPhysicalMemorySize(); - - /** - * Get the MemoryCalculatorPlugin from the class name and configure it. If - * class name is null, this method will try and return a memory calculator - * plugin available for this system. - * - * @param clazz class-name - * @param conf configure the plugin with this. - * @return MemoryCalculatorPlugin - */ - public static MemoryCalculatorPlugin getMemoryCalculatorPlugin( - Class clazz, Configuration conf) { - - if (clazz != null) { - return ReflectionUtils.newInstance(clazz, conf); - } - - // No class given, try a os specific class - try { - String osName = System.getProperty("os.name"); - if (osName.startsWith("Linux")) { - return new LinuxMemoryCalculatorPlugin(); - } - } catch (SecurityException se) { - // Failed to get Operating System name. - return null; - } - - // Not supported on this system. - return null; - } -} \ No newline at end of file diff --git a/src/java/org/apache/hadoop/util/ProcessTree.java b/src/java/org/apache/hadoop/util/ProcessTree.java deleted file mode 100644 index 9242046932..0000000000 --- a/src/java/org/apache/hadoop/util/ProcessTree.java +++ /dev/null @@ -1,313 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.util; - -import java.io.IOException; -import java.util.Arrays; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import org.apache.hadoop.util.Shell.ExitCodeException; -import org.apache.hadoop.util.Shell.ShellCommandExecutor; - -/** - * Process tree related operations - */ -public class ProcessTree { - - private static final Log LOG = LogFactory.getLog(ProcessTree.class); - - public static final long DEFAULT_SLEEPTIME_BEFORE_SIGKILL = 5000L; - - public static final boolean isSetsidAvailable = isSetsidSupported(); - private static boolean isSetsidSupported() { - ShellCommandExecutor shexec = null; - boolean setsidSupported = true; - try { - String[] args = {"setsid", "bash", "-c", "echo $$"}; - shexec = new ShellCommandExecutor(args); - shexec.execute(); - } catch (IOException ioe) { - LOG.warn("setsid is not available on this machine. So not using it."); - setsidSupported = false; - } finally { // handle the exit code - LOG.info("setsid exited with exit code " + shexec.getExitCode()); - return setsidSupported; - } - } - - /** - * Destroy the process-tree. - * @param pid process id of the root process of the subtree of processes - * to be killed - * @param sleeptimeBeforeSigkill The time to wait before sending SIGKILL - * after sending SIGTERM - * @param isProcessGroup pid is a process group leader or not - * @param inBackground Process is to be killed in the back ground with - * a separate thread - */ - public static void destroy(String pid, long sleeptimeBeforeSigkill, - boolean isProcessGroup, boolean inBackground) { - if(isProcessGroup) { - destroyProcessGroup(pid, sleeptimeBeforeSigkill, inBackground); - } - else { - //TODO: Destroy all the processes in the subtree in this case also. - // For the time being, killing only the root process. - destroyProcess(pid, sleeptimeBeforeSigkill, inBackground); - } - } - - /** Destroy the process. - * @param pid Process id of to-be-killed-process - * @param sleeptimeBeforeSigkill The time to wait before sending SIGKILL - * after sending SIGTERM - * @param inBackground Process is to be killed in the back ground with - * a separate thread - */ - protected static void destroyProcess(String pid, long sleeptimeBeforeSigkill, - boolean inBackground) { - terminateProcess(pid); - sigKill(pid, false, sleeptimeBeforeSigkill, inBackground); - } - - /** Destroy the process group. - * @param pgrpId Process group id of to-be-killed-processes - * @param sleeptimeBeforeSigkill The time to wait before sending SIGKILL - * after sending SIGTERM - * @param inBackground Process group is to be killed in the back ground with - * a separate thread - */ - protected static void destroyProcessGroup(String pgrpId, - long sleeptimeBeforeSigkill, boolean inBackground) { - terminateProcessGroup(pgrpId); - sigKill(pgrpId, true, sleeptimeBeforeSigkill, inBackground); - } - - /** - * Sends terminate signal to the process, allowing it to gracefully exit. - * - * @param pid pid of the process to be sent SIGTERM - */ - public static void terminateProcess(String pid) { - ShellCommandExecutor shexec = null; - try { - String[] args = { "kill", pid }; - shexec = new ShellCommandExecutor(args); - shexec.execute(); - } catch (IOException ioe) { - LOG.warn("Error executing shell command " + ioe); - } finally { - LOG.info("Killing process " + pid + - " with SIGTERM. Exit code " + shexec.getExitCode()); - } - } - - /** - * Sends terminate signal to all the process belonging to the passed process - * group, allowing the group to gracefully exit. - * - * @param pgrpId process group id - */ - public static void terminateProcessGroup(String pgrpId) { - ShellCommandExecutor shexec = null; - try { - String[] args = { "kill", "--", "-" + pgrpId }; - shexec = new ShellCommandExecutor(args); - shexec.execute(); - } catch (IOException ioe) { - LOG.warn("Error executing shell command " + ioe); - } finally { - LOG.info("Killing all processes in the process group " + pgrpId + - " with SIGTERM. Exit code " + shexec.getExitCode()); - } - } - - /** - * Kills the process(OR process group) by sending the signal SIGKILL - * in the current thread - * @param pid Process id(OR process group id) of to-be-deleted-process - * @param isProcessGroup Is pid a process group id of to-be-deleted-processes - * @param sleepTimeBeforeSigKill wait time before sending SIGKILL after - * sending SIGTERM - */ - private static void sigKillInCurrentThread(String pid, boolean isProcessGroup, - long sleepTimeBeforeSigKill) { - // Kill the subprocesses of root process(even if the root process is not - // alive) if process group is to be killed. - if (isProcessGroup || ProcessTree.isAlive(pid)) { - try { - // Sleep for some time before sending SIGKILL - Thread.sleep(sleepTimeBeforeSigKill); - } catch (InterruptedException i) { - LOG.warn("Thread sleep is interrupted."); - } - if(isProcessGroup) { - killProcessGroup(pid); - } else { - killProcess(pid); - } - } - } - - - /** Kills the process(OR process group) by sending the signal SIGKILL - * @param pid Process id(OR process group id) of to-be-deleted-process - * @param isProcessGroup Is pid a process group id of to-be-deleted-processes - * @param sleeptimeBeforeSigkill The time to wait before sending SIGKILL - * after sending SIGTERM - * @param inBackground Process is to be killed in the back ground with - * a separate thread - */ - private static void sigKill(String pid, boolean isProcessGroup, - long sleeptimeBeforeSigkill, boolean inBackground) { - - if(inBackground) { // use a separate thread for killing - SigKillThread sigKillThread = new SigKillThread(pid, isProcessGroup, - sleeptimeBeforeSigkill); - sigKillThread.setDaemon(true); - sigKillThread.start(); - } - else { - sigKillInCurrentThread(pid, isProcessGroup, sleeptimeBeforeSigkill); - } - } - - /** - * Sends kill signal to process, forcefully terminating the process. - * - * @param pid process id - */ - public static void killProcess(String pid) { - - //If process tree is not alive then return immediately. - if(!ProcessTree.isAlive(pid)) { - return; - } - String[] args = { "kill", "-9", pid }; - ShellCommandExecutor shexec = new ShellCommandExecutor(args); - try { - shexec.execute(); - } catch (IOException e) { - LOG.warn("Error sending SIGKILL to process "+ pid + " ."+ - StringUtils.stringifyException(e)); - } finally { - LOG.info("Killing process " + pid + " with SIGKILL. Exit code " - + shexec.getExitCode()); - } - } - - /** - * Sends kill signal to all process belonging to same process group, - * forcefully terminating the process group. - * - * @param pgrpId process group id - */ - public static void killProcessGroup(String pgrpId) { - - //If process tree is not alive then return immediately. - if(!ProcessTree.isProcessGroupAlive(pgrpId)) { - return; - } - - String[] args = { "kill", "-9", "-"+pgrpId }; - ShellCommandExecutor shexec = new ShellCommandExecutor(args); - try { - shexec.execute(); - } catch (IOException e) { - LOG.warn("Error sending SIGKILL to process group "+ pgrpId + " ."+ - StringUtils.stringifyException(e)); - } finally { - LOG.info("Killing process group" + pgrpId + " with SIGKILL. Exit code " - + shexec.getExitCode()); - } - } - - /** - * Is the process with PID pid still alive? - * This method assumes that isAlive is called on a pid that was alive not - * too long ago, and hence assumes no chance of pid-wrapping-around. - * - * @param pid pid of the process to check. - * @return true if process is alive. - */ - public static boolean isAlive(String pid) { - ShellCommandExecutor shexec = null; - try { - String[] args = { "kill", "-0", pid }; - shexec = new ShellCommandExecutor(args); - shexec.execute(); - } catch (ExitCodeException ee) { - return false; - } catch (IOException ioe) { - LOG.warn("Error executing shell command " - + Arrays.toString(shexec.getExecString()) + ioe); - return false; - } - return (shexec.getExitCode() == 0 ? true : false); - } - - /** - * Is the process group with still alive? - * - * This method assumes that isAlive is called on a pid that was alive not - * too long ago, and hence assumes no chance of pid-wrapping-around. - * - * @param pgrpId process group id - * @return true if any of process in group is alive. - */ - public static boolean isProcessGroupAlive(String pgrpId) { - ShellCommandExecutor shexec = null; - try { - String[] args = { "kill", "-0", "-"+pgrpId }; - shexec = new ShellCommandExecutor(args); - shexec.execute(); - } catch (ExitCodeException ee) { - return false; - } catch (IOException ioe) { - LOG.warn("Error executing shell command " - + Arrays.toString(shexec.getExecString()) + ioe); - return false; - } - return (shexec.getExitCode() == 0 ? true : false); - } - - - /** - * Helper thread class that kills process-tree with SIGKILL in background - */ - static class SigKillThread extends Thread { - private String pid = null; - private boolean isProcessGroup = false; - - private long sleepTimeBeforeSigKill = DEFAULT_SLEEPTIME_BEFORE_SIGKILL; - - private SigKillThread(String pid, boolean isProcessGroup, long interval) { - this.pid = pid; - this.isProcessGroup = isProcessGroup; - this.setName(this.getClass().getName() + "-" + pid); - sleepTimeBeforeSigKill = interval; - } - - public void run() { - sigKillInCurrentThread(pid, isProcessGroup, sleepTimeBeforeSigKill); - } - } -} diff --git a/src/java/org/apache/hadoop/util/ProcfsBasedProcessTree.java b/src/java/org/apache/hadoop/util/ProcfsBasedProcessTree.java deleted file mode 100644 index 901bcc683c..0000000000 --- a/src/java/org/apache/hadoop/util/ProcfsBasedProcessTree.java +++ /dev/null @@ -1,526 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.util; - -import java.io.BufferedReader; -import java.io.File; -import java.io.FileNotFoundException; -import java.io.FileReader; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.HashMap; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import java.util.LinkedList; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -/** - * A Proc file-system based ProcessTree. Works only on Linux. - */ -public class ProcfsBasedProcessTree extends ProcessTree { - - private static final Log LOG = LogFactory - .getLog(ProcfsBasedProcessTree.class); - - private static final String PROCFS = "/proc/"; - - private static final Pattern PROCFS_STAT_FILE_FORMAT = Pattern - .compile("^([0-9-]+)\\s([^\\s]+)\\s[^\\s]\\s([0-9-]+)\\s([0-9-]+)\\s([0-9-]+)\\s([0-9-]+\\s){16}([0-9]+)(\\s[0-9-]+){16}"); - - // to enable testing, using this variable which can be configured - // to a test directory. - private String procfsDir; - - private Integer pid = -1; - private boolean setsidUsed = false; - private long sleeptimeBeforeSigkill = DEFAULT_SLEEPTIME_BEFORE_SIGKILL; - - private Map processTree = new HashMap(); - - public ProcfsBasedProcessTree(String pid) { - this(pid, false, DEFAULT_SLEEPTIME_BEFORE_SIGKILL); - } - - public ProcfsBasedProcessTree(String pid, boolean setsidUsed, - long sigkillInterval) { - this(pid, setsidUsed, sigkillInterval, PROCFS); - } - - /** - * Build a new process tree rooted at the pid. - * - * This method is provided mainly for testing purposes, where - * the root of the proc file system can be adjusted. - * - * @param pid root of the process tree - * @param setsidUsed true, if setsid was used for the root pid - * @param sigkillInterval how long to wait between a SIGTERM and SIGKILL - * when killing a process tree - * @param procfsDir the root of a proc file system - only used for testing. - */ - public ProcfsBasedProcessTree(String pid, boolean setsidUsed, - long sigkillInterval, String procfsDir) { - this.pid = getValidPID(pid); - this.setsidUsed = setsidUsed; - sleeptimeBeforeSigkill = sigkillInterval; - this.procfsDir = procfsDir; - } - - /** - * Sets SIGKILL interval - * @deprecated Use {@link ProcfsBasedProcessTree#ProcfsBasedProcessTree( - * String, boolean, long)} instead - * @param interval The time to wait before sending SIGKILL - * after sending SIGTERM - */ - @Deprecated - public void setSigKillInterval(long interval) { - sleeptimeBeforeSigkill = interval; - } - - /** - * Checks if the ProcfsBasedProcessTree is available on this system. - * - * @return true if ProcfsBasedProcessTree is available. False otherwise. - */ - public static boolean isAvailable() { - try { - String osName = System.getProperty("os.name"); - if (!osName.startsWith("Linux")) { - LOG.info("ProcfsBasedProcessTree currently is supported only on " - + "Linux."); - return false; - } - } catch (SecurityException se) { - LOG.warn("Failed to get Operating System name. " + se); - return false; - } - return true; - } - - /** - * Get the process-tree with latest state. If the root-process is not alive, - * an empty tree will be returned. - * - * @return the process-tree with latest state. - */ - public ProcfsBasedProcessTree getProcessTree() { - if (pid != -1) { - // Get the list of processes - List processList = getProcessList(); - - Map allProcessInfo = new HashMap(); - - // cache the processTree to get the age for processes - Map oldProcs = - new HashMap(processTree); - processTree.clear(); - - ProcessInfo me = null; - for (Integer proc : processList) { - // Get information for each process - ProcessInfo pInfo = new ProcessInfo(proc); - if (constructProcessInfo(pInfo, procfsDir) != null) { - allProcessInfo.put(proc, pInfo); - if (proc.equals(this.pid)) { - me = pInfo; // cache 'me' - processTree.put(proc, pInfo); - } - } - } - - if (me == null) { - return this; - } - - // Add each process to its parent. - for (Map.Entry entry : allProcessInfo.entrySet()) { - Integer pID = entry.getKey(); - if (pID != 1) { - ProcessInfo pInfo = entry.getValue(); - ProcessInfo parentPInfo = allProcessInfo.get(pInfo.getPpid()); - if (parentPInfo != null) { - parentPInfo.addChild(pInfo); - } - } - } - - // now start constructing the process-tree - LinkedList pInfoQueue = new LinkedList(); - pInfoQueue.addAll(me.getChildren()); - while (!pInfoQueue.isEmpty()) { - ProcessInfo pInfo = pInfoQueue.remove(); - if (!processTree.containsKey(pInfo.getPid())) { - processTree.put(pInfo.getPid(), pInfo); - } - pInfoQueue.addAll(pInfo.getChildren()); - } - - // update age values. - for (Map.Entry procs : processTree.entrySet()) { - ProcessInfo oldInfo = oldProcs.get(procs.getKey()); - if (oldInfo != null) { - if (procs.getValue() != null) { - procs.getValue().updateAge(oldInfo); - } - } - } - - if (LOG.isDebugEnabled()) { - // Log.debug the ProcfsBasedProcessTree - LOG.debug(this.toString()); - } - } - return this; - } - - /** - * Is the root-process alive? - * - * @return true if the root-process is alive, false otherwise. - */ - public boolean isAlive() { - if (pid == -1) { - return false; - } else { - return isAlive(pid.toString()); - } - } - - /** - * Is any of the subprocesses in the process-tree alive? - * - * @return true if any of the processes in the process-tree is - * alive, false otherwise. - */ - public boolean isAnyProcessInTreeAlive() { - for (Integer pId : processTree.keySet()) { - if (isAlive(pId.toString())) { - return true; - } - } - return false; - } - - /** Verify that the given process id is same as its process group id. - * @param pidStr Process id of the to-be-verified-process - */ - private static boolean assertPidPgrpidForMatch(String pidStr) { - Integer pId = Integer.parseInt(pidStr); - // Get information for this process - ProcessInfo pInfo = new ProcessInfo(pId); - pInfo = constructProcessInfo(pInfo); - //make sure that pId and its pgrpId match - if (!pInfo.getPgrpId().equals(pId)) { - LOG.warn("Unexpected: Process with PID " + pId + - " is not a process group leader."); - return false; - } - if (LOG.isDebugEnabled()) { - LOG.debug(pId + " is a process group leader, as expected."); - } - return true; - } - - /** Make sure that the given pid is a process group leader and then - * destroy the process group. - * @param pgrpId Process group id of to-be-killed-processes - * @param interval The time to wait before sending SIGKILL - * after sending SIGTERM - * @param inBackground Process is to be killed in the back ground with - * a separate thread - */ - public static void assertAndDestroyProcessGroup(String pgrpId, long interval, - boolean inBackground) - throws IOException { - // Make sure that the pid given is a process group leader - if (!assertPidPgrpidForMatch(pgrpId)) { - throw new IOException("Process with PID " + pgrpId + - " is not a process group leader."); - } - destroyProcessGroup(pgrpId, interval, inBackground); - } - - /** - * Destroy the process-tree. - */ - public void destroy() { - destroy(true); - } - - /** - * Destroy the process-tree. - * @param inBackground Process is to be killed in the back ground with - * a separate thread - */ - public void destroy(boolean inBackground) { - LOG.debug("Killing ProcfsBasedProcessTree of " + pid); - if (pid == -1) { - return; - } - - if (isAlive(pid.toString())) { - if (isSetsidAvailable && setsidUsed) { - // In this case, we know that pid got created using setsid. So kill the - // whole processGroup. - try { - assertAndDestroyProcessGroup(pid.toString(), sleeptimeBeforeSigkill, - inBackground); - } catch (IOException e) { - LOG.warn(StringUtils.stringifyException(e)); - } - } - else { - //TODO: Destroy all the processes in the subtree in this case also. - // For the time being, killing only the root process. - destroyProcess(pid.toString(), sleeptimeBeforeSigkill, inBackground); - } - } - } - - /** - * Get the cumulative virtual memory used by all the processes in the - * process-tree. - * - * @return cumulative virtual memory used by the process-tree in bytes. - */ - public long getCumulativeVmem() { - // include all processes.. all processes will be older than 0. - return getCumulativeVmem(0); - } - - /** - * Get the cumulative virtual memory used by all the processes in the - * process-tree that are older than the passed in age. - * - * @param olderThanAge processes above this age are included in the - * memory addition - * @return cumulative virtual memory used by the process-tree in bytes, - * for processes older than this age. - */ - public long getCumulativeVmem(int olderThanAge) { - long total = 0; - for (ProcessInfo p : processTree.values()) { - if ((p != null) && (p.getAge() > olderThanAge)) { - total += p.getVmem(); - } - } - return total; - } - - private static Integer getValidPID(String pid) { - Integer retPid = -1; - try { - retPid = Integer.parseInt(pid); - if (retPid <= 0) { - retPid = -1; - } - } catch (NumberFormatException nfe) { - retPid = -1; - } - return retPid; - } - - /** - * Get the list of all processes in the system. - */ - private List getProcessList() { - String[] processDirs = (new File(procfsDir)).list(); - List processList = new ArrayList(); - - for (String dir : processDirs) { - try { - int pd = Integer.parseInt(dir); - if ((new File(procfsDir, dir)).isDirectory()) { - processList.add(Integer.valueOf(pd)); - } - } catch (NumberFormatException n) { - // skip this directory - } catch (SecurityException s) { - // skip this process - } - } - return processList; - } - - /** - * - * Construct the ProcessInfo using the process' PID and procfs and return the - * same. Returns null on failing to read from procfs, - */ - private static ProcessInfo constructProcessInfo(ProcessInfo pinfo) { - return constructProcessInfo(pinfo, PROCFS); - } - - /** - * Construct the ProcessInfo using the process' PID and procfs rooted at the - * specified directory and return the same. It is provided mainly to assist - * testing purposes. - * - * Returns null on failing to read from procfs, - * - * @param pinfo ProcessInfo that needs to be updated - * @param procfsDir root of the proc file system - * @return updated ProcessInfo, null on errors. - */ - private static ProcessInfo constructProcessInfo(ProcessInfo pinfo, - String procfsDir) { - ProcessInfo ret = null; - // Read "procfsDir//stat" file - typically /proc//stat - BufferedReader in = null; - FileReader fReader = null; - try { - File pidDir = new File(procfsDir, String.valueOf(pinfo.getPid())); - fReader = new FileReader(new File(pidDir, "/stat")); - in = new BufferedReader(fReader); - } catch (FileNotFoundException f) { - // The process vanished in the interim! - return ret; - } - - ret = pinfo; - try { - String str = in.readLine(); // only one line - Matcher m = PROCFS_STAT_FILE_FORMAT.matcher(str); - boolean mat = m.find(); - if (mat) { - // Set ( name ) ( ppid ) ( pgrpId ) (session ) (vsize ) - pinfo.updateProcessInfo(m.group(2), Integer.parseInt(m.group(3)), Integer - .parseInt(m.group(4)), Integer.parseInt(m.group(5)), Long - .parseLong(m.group(7))); - } - } catch (IOException io) { - LOG.warn("Error reading the stream " + io); - ret = null; - } finally { - // Close the streams - try { - if (fReader != null) { - fReader.close(); - } - try { - if (in != null) { - in.close(); - } - } catch (IOException i) { - LOG.warn("Error closing the stream " + in); - } - } catch (IOException i) { - LOG.warn("Error closing the stream " + fReader); - } - } - - return ret; - } - /** - * Returns a string printing PIDs of process present in the - * ProcfsBasedProcessTree. Output format : [pid pid ..] - */ - public String toString() { - StringBuffer pTree = new StringBuffer("[ "); - for (Integer p : processTree.keySet()) { - pTree.append(p); - pTree.append(" "); - } - return pTree.substring(0, pTree.length()) + "]"; - } - - /** - * - * Class containing information of a process. - * - */ - private static class ProcessInfo { - private Integer pid; // process-id - private String name; // command name - private Integer pgrpId; // process group-id - private Integer ppid; // parent process-id - private Integer sessionId; // session-id - private Long vmem; // virtual memory usage - // how many times has this process been seen alive - private int age; - private List children = new ArrayList(); // list of children - - public ProcessInfo(int pid) { - this.pid = Integer.valueOf(pid); - // seeing this the first time. - this.age = 1; - } - - public Integer getPid() { - return pid; - } - - public String getName() { - return name; - } - - public Integer getPgrpId() { - return pgrpId; - } - - public Integer getPpid() { - return ppid; - } - - public Integer getSessionId() { - return sessionId; - } - - public Long getVmem() { - return vmem; - } - - public int getAge() { - return age; - } - - public boolean isParent(ProcessInfo p) { - if (pid.equals(p.getPpid())) { - return true; - } - return false; - } - - public void updateProcessInfo(String name, Integer ppid, Integer pgrpId, - Integer sessionId, Long vmem) { - this.name = name; - this.ppid = ppid; - this.pgrpId = pgrpId; - this.sessionId = sessionId; - this.vmem = vmem; - } - - public void updateAge(ProcessInfo oldInfo) { - this.age = oldInfo.age + 1; - } - - public boolean addChild(ProcessInfo p) { - return children.add(p); - } - - public List getChildren() { - return children; - } - } -} diff --git a/src/java/org/apache/hadoop/util/Shell.java b/src/java/org/apache/hadoop/util/Shell.java index 2c0ab786b8..f9e176ab42 100644 --- a/src/java/org/apache/hadoop/util/Shell.java +++ b/src/java/org/apache/hadoop/util/Shell.java @@ -363,7 +363,7 @@ public void execute() throws IOException { this.run(); } - protected String[] getExecString() { + public String[] getExecString() { return command; } diff --git a/src/test/core/org/apache/hadoop/util/TestProcfsBasedProcessTree.java b/src/test/core/org/apache/hadoop/util/TestProcfsBasedProcessTree.java deleted file mode 100644 index d9c5f10e26..0000000000 --- a/src/test/core/org/apache/hadoop/util/TestProcfsBasedProcessTree.java +++ /dev/null @@ -1,468 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.util; - -import java.io.BufferedReader; -import java.io.BufferedWriter; -import java.io.File; -import java.io.FileNotFoundException; -import java.io.FileReader; -import java.io.FileWriter; -import java.io.IOException; -import java.util.Random; -import java.util.Vector; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.FileUtil; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.util.Shell.ExitCodeException; -import org.apache.hadoop.util.Shell.ShellCommandExecutor; - -import junit.framework.TestCase; - -/** - * A JUnit test to test ProcfsBasedProcessTree. - */ -public class TestProcfsBasedProcessTree extends TestCase { - - private static final Log LOG = LogFactory - .getLog(TestProcfsBasedProcessTree.class); - private static String TEST_ROOT_DIR = new Path(System.getProperty( - "test.build.data", "/tmp")).toString().replace(' ', '+'); - - private ShellCommandExecutor shexec = null; - private String pidFile, lowestDescendant; - private String shellScript; - private static final int N = 6; // Controls the RogueTask - - private class RogueTaskThread extends Thread { - public void run() { - try { - Vector args = new Vector(); - if(ProcessTree.isSetsidAvailable) { - args.add("setsid"); - } - args.add("bash"); - args.add("-c"); - args.add(" echo $$ > " + pidFile + "; sh " + - shellScript + " " + N + ";") ; - shexec = new ShellCommandExecutor(args.toArray(new String[0])); - shexec.execute(); - } catch (ExitCodeException ee) { - LOG.info("Shell Command exit with a non-zero exit code. This is" + - " expected as we are killing the subprocesses of the" + - " task intentionally. " + ee); - } catch (IOException ioe) { - LOG.info("Error executing shell command " + ioe); - } finally { - LOG.info("Exit code: " + shexec.getExitCode()); - } - } - } - - private String getRogueTaskPID() { - File f = new File(pidFile); - while (!f.exists()) { - try { - Thread.sleep(500); - } catch (InterruptedException ie) { - break; - } - } - - // read from pidFile - return getPidFromPidFile(pidFile); - } - - public void testProcessTree() { - - try { - if (!ProcfsBasedProcessTree.isAvailable()) { - System.out - .println("ProcfsBasedProcessTree is not available on this system. Not testing"); - return; - } - } catch (Exception e) { - LOG.info(StringUtils.stringifyException(e)); - return; - } - // create shell script - Random rm = new Random(); - File tempFile = new File(TEST_ROOT_DIR, this.getName() + "_shellScript_" + - rm.nextInt() + ".sh"); - tempFile.deleteOnExit(); - shellScript = TEST_ROOT_DIR + File.separator + tempFile.getName(); - - // create pid file - tempFile = new File(TEST_ROOT_DIR, this.getName() + "_pidFile_" + - rm.nextInt() + ".pid"); - tempFile.deleteOnExit(); - pidFile = TEST_ROOT_DIR + File.separator + tempFile.getName(); - - lowestDescendant = TEST_ROOT_DIR + File.separator + "lowestDescendantPidFile"; - - // write to shell-script - try { - FileWriter fWriter = new FileWriter(shellScript); - fWriter.write( - "# rogue task\n" + - "sleep 1\n" + - "echo hello\n" + - "if [ $1 -ne 0 ]\n" + - "then\n" + - " sh " + shellScript + " $(($1-1))\n" + - "else\n" + - " echo $$ > " + lowestDescendant + "\n" + - " while true\n do\n" + - " sleep 5\n" + - " done\n" + - "fi"); - fWriter.close(); - } catch (IOException ioe) { - LOG.info("Error: " + ioe); - return; - } - - Thread t = new RogueTaskThread(); - t.start(); - String pid = getRogueTaskPID(); - LOG.info("Root process pid: " + pid); - ProcfsBasedProcessTree p = new ProcfsBasedProcessTree(pid, - ProcessTree.isSetsidAvailable, - ProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL); - p = p.getProcessTree(); // initialize - LOG.info("ProcessTree: " + p.toString()); - - File leaf = new File(lowestDescendant); - //wait till lowest descendant process of Rougue Task starts execution - while (!leaf.exists()) { - try { - Thread.sleep(500); - } catch (InterruptedException ie) { - break; - } - } - - p = p.getProcessTree(); // reconstruct - LOG.info("ProcessTree: " + p.toString()); - - // destroy the map task and all its subprocesses - p.destroy(true/*in the background*/); - - if(ProcessTree.isSetsidAvailable) {// whole processtree should be gone - assertEquals(false, p.isAnyProcessInTreeAlive()); - } - else {// process should be gone - assertFalse("ProcessTree must have been gone", p.isAlive()); - } - // Not able to join thread sometimes when forking with large N. - try { - t.join(2000); - LOG.info("RogueTaskThread successfully joined."); - } catch (InterruptedException ie) { - LOG.info("Interrupted while joining RogueTaskThread."); - } - - // ProcessTree is gone now. Any further calls should be sane. - p = p.getProcessTree(); - assertFalse("ProcessTree must have been gone", p.isAlive()); - assertTrue("Cumulative vmem for the gone-process is " - + p.getCumulativeVmem() + " . It should be zero.", p - .getCumulativeVmem() == 0); - assertTrue(p.toString().equals("[ ]")); - } - - /** - * Get PID from a pid-file. - * - * @param pidFileName - * Name of the pid-file. - * @return the PID string read from the pid-file. Returns null if the - * pidFileName points to a non-existing file or if read fails from the - * file. - */ - public static String getPidFromPidFile(String pidFileName) { - BufferedReader pidFile = null; - FileReader fReader = null; - String pid = null; - - try { - fReader = new FileReader(pidFileName); - pidFile = new BufferedReader(fReader); - } catch (FileNotFoundException f) { - LOG.debug("PidFile doesn't exist : " + pidFileName); - return pid; - } - - try { - pid = pidFile.readLine(); - } catch (IOException i) { - LOG.error("Failed to read from " + pidFileName); - } finally { - try { - if (fReader != null) { - fReader.close(); - } - try { - if (pidFile != null) { - pidFile.close(); - } - } catch (IOException i) { - LOG.warn("Error closing the stream " + pidFile); - } - } catch (IOException i) { - LOG.warn("Error closing the stream " + fReader); - } - } - return pid; - } - - public static class ProcessStatInfo { - // sample stat in a single line : 3910 (gpm) S 1 3910 3910 0 -1 4194624 - // 83 0 0 0 0 0 0 0 16 0 1 0 7852 2408448 88 4294967295 134512640 - // 134590050 3220521392 3220520036 10975138 0 0 4096 134234626 - // 4294967295 0 0 17 1 0 0 - String pid; - String name; - String ppid; - String pgrpId; - String session; - String vmem; - - public ProcessStatInfo(String[] statEntries) { - pid = statEntries[0]; - name = statEntries[1]; - ppid = statEntries[2]; - pgrpId = statEntries[3]; - session = statEntries[4]; - vmem = statEntries[5]; - } - - // construct a line that mimics the procfs stat file. - // all unused numerical entries are set to 0. - public String getStatLine() { - return String.format("%s (%s) S %s %s %s 0 0 0" + - " 0 0 0 0 0 0 0 0 0 0 0 0 0 %s 0 0 0" + - " 0 0 0 0 0 0 0 0" + - " 0 0 0 0 0", - pid, name, ppid, pgrpId, session, vmem); - } - } - - /** - * A basic test that creates a few process directories and writes - * stat files. Verifies that the virtual memory is correctly - * computed. - * @throws IOException if there was a problem setting up the - * fake procfs directories or files. - */ - public void testVirtualMemoryForProcessTree() throws IOException { - - // test processes - String[] pids = { "100", "200", "300", "400" }; - // create the fake procfs root directory. - File procfsRootDir = new File(TEST_ROOT_DIR, "proc"); - - try { - setupProcfsRootDir(procfsRootDir); - setupPidDirs(procfsRootDir, pids); - - // create stat objects. - // assuming processes 100, 200, 300 are in tree and 400 is not. - ProcessStatInfo[] procInfos = new ProcessStatInfo[4]; - procInfos[0] = new ProcessStatInfo(new String[] - {"100", "proc1", "1", "100", "100", "100000"}); - procInfos[1] = new ProcessStatInfo(new String[] - {"200", "proc2", "100", "100", "100", "200000"}); - procInfos[2] = new ProcessStatInfo(new String[] - {"300", "proc3", "200", "100", "100", "300000"}); - procInfos[3] = new ProcessStatInfo(new String[] - {"400", "proc4", "1", "400", "400", "400000"}); - - writeStatFiles(procfsRootDir, pids, procInfos); - - // crank up the process tree class. - ProcfsBasedProcessTree processTree = - new ProcfsBasedProcessTree("100", true, 100L, - procfsRootDir.getAbsolutePath()); - // build the process tree. - processTree.getProcessTree(); - - // verify cumulative memory - assertEquals("Cumulative memory does not match", - Long.parseLong("600000"), processTree.getCumulativeVmem()); - } finally { - FileUtil.fullyDelete(procfsRootDir); - } - } - - /** - * Tests that cumulative memory is computed only for - * processes older than a given age. - * @throws IOException if there was a problem setting up the - * fake procfs directories or files. - */ - public void testVMemForOlderProcesses() throws IOException { - // initial list of processes - String[] pids = { "100", "200", "300", "400" }; - // create the fake procfs root directory. - File procfsRootDir = new File(TEST_ROOT_DIR, "proc"); - - try { - setupProcfsRootDir(procfsRootDir); - setupPidDirs(procfsRootDir, pids); - - // create stat objects. - // assuming 100, 200 and 400 are in tree, 300 is not. - ProcessStatInfo[] procInfos = new ProcessStatInfo[4]; - procInfos[0] = new ProcessStatInfo(new String[] - {"100", "proc1", "1", "100", "100", "100000"}); - procInfos[1] = new ProcessStatInfo(new String[] - {"200", "proc2", "100", "100", "100", "200000"}); - procInfos[2] = new ProcessStatInfo(new String[] - {"300", "proc3", "1", "300", "300", "300000"}); - procInfos[3] = new ProcessStatInfo(new String[] - {"400", "proc4", "100", "100", "100", "400000"}); - - writeStatFiles(procfsRootDir, pids, procInfos); - - // crank up the process tree class. - ProcfsBasedProcessTree processTree = - new ProcfsBasedProcessTree("100", true, 100L, - procfsRootDir.getAbsolutePath()); - // build the process tree. - processTree.getProcessTree(); - - // verify cumulative memory - assertEquals("Cumulative memory does not match", - Long.parseLong("700000"), processTree.getCumulativeVmem()); - - // write one more process as child of 100. - String[] newPids = { "500" }; - setupPidDirs(procfsRootDir, newPids); - - ProcessStatInfo[] newProcInfos = new ProcessStatInfo[1]; - newProcInfos[0] = new ProcessStatInfo(new String[] - {"500", "proc5", "100", "100", "100", "500000"}); - writeStatFiles(procfsRootDir, newPids, newProcInfos); - - // check vmem includes the new process. - processTree.getProcessTree(); - assertEquals("Cumulative memory does not include new process", - Long.parseLong("1200000"), processTree.getCumulativeVmem()); - - // however processes older than 1 iteration will retain the older value - assertEquals("Cumulative memory shouldn't have included new process", - Long.parseLong("700000"), processTree.getCumulativeVmem(1)); - - // one more process - newPids = new String[]{ "600" }; - setupPidDirs(procfsRootDir, newPids); - - newProcInfos = new ProcessStatInfo[1]; - newProcInfos[0] = new ProcessStatInfo(new String[] - {"600", "proc6", "100", "100", "100", "600000"}); - writeStatFiles(procfsRootDir, newPids, newProcInfos); - - // refresh process tree - processTree.getProcessTree(); - - // processes older than 2 iterations should be same as before. - assertEquals("Cumulative memory shouldn't have included new processes", - Long.parseLong("700000"), processTree.getCumulativeVmem(2)); - - // processes older than 1 iteration should not include new process, - // but include process 500 - assertEquals("Cumulative memory shouldn't have included new processes", - Long.parseLong("1200000"), processTree.getCumulativeVmem(1)); - - // no processes older than 3 iterations, this should be 0 - assertEquals("Getting non-zero vmem for processes older than 3 iterations", - 0L, processTree.getCumulativeVmem(3)); - } finally { - FileUtil.fullyDelete(procfsRootDir); - } - } - - /** - * Create a directory to mimic the procfs file system's root. - * @param procfsRootDir root directory to create. - * @throws IOException if could not delete the procfs root directory - */ - public static void setupProcfsRootDir(File procfsRootDir) - throws IOException { - // cleanup any existing process root dir. - if (procfsRootDir.exists()) { - assertTrue(FileUtil.fullyDelete(procfsRootDir)); - } - - // create afresh - assertTrue(procfsRootDir.mkdirs()); - } - - /** - * Create PID directories under the specified procfs root directory - * @param procfsRootDir root directory of procfs file system - * @param pids the PID directories to create. - * @throws IOException If PID dirs could not be created - */ - public static void setupPidDirs(File procfsRootDir, String[] pids) - throws IOException { - for (String pid : pids) { - File pidDir = new File(procfsRootDir, pid); - pidDir.mkdir(); - if (!pidDir.exists()) { - throw new IOException ("couldn't make process directory under " + - "fake procfs"); - } else { - LOG.info("created pid dir"); - } - } - } - - /** - * Write stat files under the specified pid directories with data - * setup in the corresponding ProcessStatInfo objects - * @param procfsRootDir root directory of procfs file system - * @param pids the PID directories under which to create the stat file - * @param procs corresponding ProcessStatInfo objects whose data should be - * written to the stat files. - * @throws IOException if stat files could not be written - */ - public static void writeStatFiles(File procfsRootDir, String[] pids, - ProcessStatInfo[] procs) throws IOException { - for (int i=0; i