HADOOP-14296. Move logging APIs over to slf4j in hadoop-tools.
This commit is contained in:
parent
099cbb427a
commit
3369540653
@ -27,18 +27,18 @@
|
|||||||
import java.util.concurrent.ThreadPoolExecutor;
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.apache.commons.logging.impl.Log4JLogger;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.azure.NativeAzureFileSystem.FolderRenamePending;
|
import org.apache.hadoop.fs.azure.NativeAzureFileSystem.FolderRenamePending;
|
||||||
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
|
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
|
||||||
import org.apache.log4j.Logger;
|
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Rule;
|
import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.rules.ExpectedException;
|
import org.junit.rules.ExpectedException;
|
||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests the Native Azure file system (WASB) using parallel threads for rename and delete operations.
|
* Tests the Native Azure file system (WASB) using parallel threads for rename and delete operations.
|
||||||
@ -68,8 +68,8 @@ public void setUp() throws Exception {
|
|||||||
fs.initialize(uri, conf);
|
fs.initialize(uri, conf);
|
||||||
|
|
||||||
// Capture logs
|
// Capture logs
|
||||||
logs = LogCapturer.captureLogs(new Log4JLogger(Logger
|
logs = LogCapturer.captureLogs(
|
||||||
.getRootLogger()));
|
LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME));
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -24,12 +24,12 @@
|
|||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.util.StringTokenizer;
|
import java.util.StringTokenizer;
|
||||||
|
|
||||||
import org.apache.commons.logging.impl.Log4JLogger;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
|
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
|
||||||
import org.apache.log4j.Logger;
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test to validate Azure storage client side logging. Tests works only when
|
* Test to validate Azure storage client side logging. Tests works only when
|
||||||
@ -97,8 +97,8 @@ private void performWASBOperations() throws Exception {
|
|||||||
@Test
|
@Test
|
||||||
public void testLoggingEnabled() throws Exception {
|
public void testLoggingEnabled() throws Exception {
|
||||||
|
|
||||||
LogCapturer logs = LogCapturer.captureLogs(new Log4JLogger(Logger
|
LogCapturer logs = LogCapturer.captureLogs(
|
||||||
.getRootLogger()));
|
LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME));
|
||||||
|
|
||||||
// Update configuration based on the Test.
|
// Update configuration based on the Test.
|
||||||
updateFileSystemConfiguration(true);
|
updateFileSystemConfiguration(true);
|
||||||
@ -111,8 +111,8 @@ public void testLoggingEnabled() throws Exception {
|
|||||||
@Test
|
@Test
|
||||||
public void testLoggingDisabled() throws Exception {
|
public void testLoggingDisabled() throws Exception {
|
||||||
|
|
||||||
LogCapturer logs = LogCapturer.captureLogs(new Log4JLogger(Logger
|
LogCapturer logs = LogCapturer.captureLogs(
|
||||||
.getRootLogger()));
|
LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME));
|
||||||
|
|
||||||
// Update configuration based on the Test.
|
// Update configuration based on the Test.
|
||||||
updateFileSystemConfiguration(false);
|
updateFileSystemConfiguration(false);
|
||||||
|
@ -22,7 +22,6 @@
|
|||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStreamReader;
|
import java.io.InputStreamReader;
|
||||||
import java.io.Reader;
|
import java.io.Reader;
|
||||||
import java.text.MessageFormat;
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
@ -85,7 +84,8 @@
|
|||||||
import org.apache.hadoop.yarn.sls.utils.SLSUtils;
|
import org.apache.hadoop.yarn.sls.utils.SLSUtils;
|
||||||
import org.apache.hadoop.yarn.util.UTCClock;
|
import org.apache.hadoop.yarn.util.UTCClock;
|
||||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
import org.apache.log4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
@Unstable
|
@Unstable
|
||||||
@ -120,7 +120,7 @@ public class SLSRunner extends Configured implements Tool {
|
|||||||
new HashMap<String, Object>();
|
new HashMap<String, Object>();
|
||||||
|
|
||||||
// logger
|
// logger
|
||||||
public final static Logger LOG = Logger.getLogger(SLSRunner.class);
|
public final static Logger LOG = LoggerFactory.getLogger(SLSRunner.class);
|
||||||
|
|
||||||
private final static int DEFAULT_MAPPER_PRIORITY = 20;
|
private final static int DEFAULT_MAPPER_PRIORITY = 20;
|
||||||
private final static int DEFAULT_REDUCER_PRIORITY = 10;
|
private final static int DEFAULT_REDUCER_PRIORITY = 10;
|
||||||
@ -322,14 +322,12 @@ private void waitForNodesRunning() throws InterruptedException {
|
|||||||
if (numRunningNodes == numNMs) {
|
if (numRunningNodes == numNMs) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
LOG.info(MessageFormat.format(
|
LOG.info("SLSRunner is waiting for all nodes RUNNING."
|
||||||
"SLSRunner is waiting for all "
|
+ " {} of {} NMs initialized.", numRunningNodes, numNMs);
|
||||||
+ "nodes RUNNING. {0} of {1} NMs initialized.",
|
|
||||||
numRunningNodes, numNMs));
|
|
||||||
Thread.sleep(1000);
|
Thread.sleep(1000);
|
||||||
}
|
}
|
||||||
LOG.info(MessageFormat.format("SLSRunner takes {0} ms to launch all nodes.",
|
LOG.info("SLSRunner takes {} ms to launch all nodes.",
|
||||||
(System.currentTimeMillis() - startTimeMS)));
|
System.currentTimeMillis() - startTimeMS);
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
@ -375,7 +373,7 @@ private void startAMFromSLSTrace(String inputTrace) throws IOException {
|
|||||||
try {
|
try {
|
||||||
createAMForJob(jobIter.next());
|
createAMForJob(jobIter.next());
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.error("Failed to create an AM: " + e.getMessage());
|
LOG.error("Failed to create an AM: {}", e.getMessage());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -497,7 +495,7 @@ private void startAMFromRumenTrace(String inputTrace, long baselineTimeMS)
|
|||||||
try {
|
try {
|
||||||
createAMForJob(job, baselineTimeMS);
|
createAMForJob(job, baselineTimeMS);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.error("Failed to create an AM: " + e.getMessage());
|
LOG.error("Failed to create an AM: {}", e.getMessage());
|
||||||
}
|
}
|
||||||
|
|
||||||
job = reader.getNext();
|
job = reader.getNext();
|
||||||
@ -519,7 +517,7 @@ private void createAMForJob(LoggedJob job, long baselineTimeMs)
|
|||||||
jobStartTimeMS -= baselineTimeMs;
|
jobStartTimeMS -= baselineTimeMs;
|
||||||
jobFinishTimeMS -= baselineTimeMs;
|
jobFinishTimeMS -= baselineTimeMs;
|
||||||
if (jobStartTimeMS < 0) {
|
if (jobStartTimeMS < 0) {
|
||||||
LOG.warn("Warning: reset job " + oldJobId + " start time to 0.");
|
LOG.warn("Warning: reset job {} start time to 0.", oldJobId);
|
||||||
jobFinishTimeMS = jobFinishTimeMS - jobStartTimeMS;
|
jobFinishTimeMS = jobFinishTimeMS - jobStartTimeMS;
|
||||||
jobStartTimeMS = 0;
|
jobStartTimeMS = 0;
|
||||||
}
|
}
|
||||||
@ -610,7 +608,7 @@ private void startAMFromSynthGenerator() throws YarnException, IOException {
|
|||||||
jobStartTimeMS -= baselineTimeMS;
|
jobStartTimeMS -= baselineTimeMS;
|
||||||
jobFinishTimeMS -= baselineTimeMS;
|
jobFinishTimeMS -= baselineTimeMS;
|
||||||
if (jobStartTimeMS < 0) {
|
if (jobStartTimeMS < 0) {
|
||||||
LOG.warn("Warning: reset job " + oldJobId + " start time to 0.");
|
LOG.warn("Warning: reset job {} start time to 0.", oldJobId);
|
||||||
jobFinishTimeMS = jobFinishTimeMS - jobStartTimeMS;
|
jobFinishTimeMS = jobFinishTimeMS - jobStartTimeMS;
|
||||||
jobStartTimeMS = 0;
|
jobStartTimeMS = 0;
|
||||||
}
|
}
|
||||||
@ -726,16 +724,14 @@ private void printSimulationInfo() {
|
|||||||
if (printSimulation) {
|
if (printSimulation) {
|
||||||
// node
|
// node
|
||||||
LOG.info("------------------------------------");
|
LOG.info("------------------------------------");
|
||||||
LOG.info(MessageFormat.format(
|
LOG.info("# nodes = {}, # racks = {}, capacity " +
|
||||||
"# nodes = {0}, # racks = {1}, capacity "
|
"of each node {} MB memory and {} vcores.",
|
||||||
+ "of each node {2} MB memory and {3} vcores.",
|
numNMs, numRacks, nmMemoryMB, nmVCores);
|
||||||
numNMs, numRacks, nmMemoryMB, nmVCores));
|
|
||||||
LOG.info("------------------------------------");
|
LOG.info("------------------------------------");
|
||||||
// job
|
// job
|
||||||
LOG.info(MessageFormat.format(
|
LOG.info("# applications = {}, # total " +
|
||||||
"# applications = {0}, # total "
|
"tasks = {}, average # tasks per application = {}",
|
||||||
+ "tasks = {1}, average # tasks per application = {2}",
|
numAMs, numTasks, (int)(Math.ceil((numTasks + 0.0) / numAMs)));
|
||||||
numAMs, numTasks, (int) (Math.ceil((numTasks + 0.0) / numAMs))));
|
|
||||||
LOG.info("JobId\tQueue\tAMType\tDuration\t#Tasks");
|
LOG.info("JobId\tQueue\tAMType\tDuration\t#Tasks");
|
||||||
for (Map.Entry<String, AMSimulator> entry : amMap.entrySet()) {
|
for (Map.Entry<String, AMSimulator> entry : amMap.entrySet()) {
|
||||||
AMSimulator am = entry.getValue();
|
AMSimulator am = entry.getValue();
|
||||||
@ -744,15 +740,13 @@ private void printSimulationInfo() {
|
|||||||
}
|
}
|
||||||
LOG.info("------------------------------------");
|
LOG.info("------------------------------------");
|
||||||
// queue
|
// queue
|
||||||
LOG.info(MessageFormat.format(
|
LOG.info("number of queues = {} average number of apps = {}",
|
||||||
"number of queues = {0} average " + "number of apps = {1}",
|
|
||||||
queueAppNumMap.size(),
|
queueAppNumMap.size(),
|
||||||
(int) (Math.ceil((numAMs + 0.0) / queueAppNumMap.size()))));
|
(int)(Math.ceil((numAMs + 0.0) / queueAppNumMap.size())));
|
||||||
LOG.info("------------------------------------");
|
LOG.info("------------------------------------");
|
||||||
// runtime
|
// runtime
|
||||||
LOG.info(
|
LOG.info("estimated simulation time is {} seconds",
|
||||||
MessageFormat.format("estimated simulation time is {0}" + " seconds",
|
(long)(Math.ceil(maxRuntime / 1000.0)));
|
||||||
(long) (Math.ceil(maxRuntime / 1000.0))));
|
|
||||||
LOG.info("------------------------------------");
|
LOG.info("------------------------------------");
|
||||||
}
|
}
|
||||||
// package these information in the simulateInfoMap used by other places
|
// package these information in the simulateInfoMap used by other places
|
||||||
|
@ -22,7 +22,6 @@
|
|||||||
import java.lang.reflect.UndeclaredThrowableException;
|
import java.lang.reflect.UndeclaredThrowableException;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
import java.text.MessageFormat;
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@ -68,13 +67,13 @@
|
|||||||
import org.apache.hadoop.yarn.sls.scheduler.SchedulerMetrics;
|
import org.apache.hadoop.yarn.sls.scheduler.SchedulerMetrics;
|
||||||
import org.apache.hadoop.yarn.util.Records;
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
import org.apache.log4j.Logger;
|
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
|
import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
|
||||||
import org.apache.hadoop.yarn.sls.scheduler.SchedulerWrapper;
|
import org.apache.hadoop.yarn.sls.scheduler.SchedulerWrapper;
|
||||||
import org.apache.hadoop.yarn.sls.SLSRunner;
|
import org.apache.hadoop.yarn.sls.SLSRunner;
|
||||||
import org.apache.hadoop.yarn.sls.scheduler.TaskRunner;
|
import org.apache.hadoop.yarn.sls.scheduler.TaskRunner;
|
||||||
import org.apache.hadoop.yarn.sls.utils.SLSUtils;
|
import org.apache.hadoop.yarn.sls.utils.SLSUtils;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
@Unstable
|
@Unstable
|
||||||
@ -115,7 +114,7 @@ public abstract class AMSimulator extends TaskRunner.Task {
|
|||||||
volatile boolean isAMContainerRunning = false;
|
volatile boolean isAMContainerRunning = false;
|
||||||
volatile Container amContainer;
|
volatile Container amContainer;
|
||||||
|
|
||||||
protected final Logger LOG = Logger.getLogger(AMSimulator.class);
|
private static final Logger LOG = LoggerFactory.getLogger(AMSimulator.class);
|
||||||
|
|
||||||
// resource for AM container
|
// resource for AM container
|
||||||
private final static int MR_AM_CONTAINER_RESOURCE_MEMORY_MB = 1024;
|
private final static int MR_AM_CONTAINER_RESOURCE_MEMORY_MB = 1024;
|
||||||
@ -216,7 +215,7 @@ public void middleStep() throws Exception {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void lastStep() throws Exception {
|
public void lastStep() throws Exception {
|
||||||
LOG.info(MessageFormat.format("Application {0} is shutting down.", appId));
|
LOG.info("Application {} is shutting down.", appId);
|
||||||
// unregister tracking
|
// unregister tracking
|
||||||
if (isTracked) {
|
if (isTracked) {
|
||||||
untrackApp();
|
untrackApp();
|
||||||
@ -224,7 +223,7 @@ public void lastStep() throws Exception {
|
|||||||
|
|
||||||
// Finish AM container
|
// Finish AM container
|
||||||
if (amContainer != null) {
|
if (amContainer != null) {
|
||||||
LOG.info("AM container = " + amContainer.getId() + " reported to finish");
|
LOG.info("AM container = {} reported to finish", amContainer.getId());
|
||||||
se.getNmMap().get(amContainer.getNodeId()).cleanupContainer(
|
se.getNmMap().get(amContainer.getNodeId()).cleanupContainer(
|
||||||
amContainer.getId());
|
amContainer.getId());
|
||||||
} else {
|
} else {
|
||||||
@ -343,7 +342,7 @@ public Object run() throws YarnException, IOException {
|
|||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
LOG.info(MessageFormat.format("Submit a new application {0}", appId));
|
LOG.info("Submit a new application {}", appId);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void registerAM()
|
private void registerAM()
|
||||||
@ -370,8 +369,7 @@ public RegisterApplicationMasterResponse run() throws Exception {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
LOG.info(MessageFormat.format(
|
LOG.info("Register the application master for application {}", appId);
|
||||||
"Register the application master for application {0}", appId));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void trackApp() {
|
private void trackApp() {
|
||||||
|
@ -20,7 +20,6 @@
|
|||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
import java.text.MessageFormat;
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
@ -42,10 +41,10 @@
|
|||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
|
import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
|
||||||
import org.apache.hadoop.yarn.sls.SLSRunner;
|
import org.apache.hadoop.yarn.sls.SLSRunner;
|
||||||
import org.apache.log4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
@Unstable
|
@Unstable
|
||||||
@ -111,7 +110,8 @@ scheduled when all maps have finished (not support slow-start currently).
|
|||||||
// finished
|
// finished
|
||||||
private boolean isFinished = false;
|
private boolean isFinished = false;
|
||||||
|
|
||||||
public final Logger LOG = Logger.getLogger(MRAMSimulator.class);
|
private static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(MRAMSimulator.class);
|
||||||
|
|
||||||
@SuppressWarnings("checkstyle:parameternumber")
|
@SuppressWarnings("checkstyle:parameternumber")
|
||||||
public void init(int id, int heartbeatInterval,
|
public void init(int id, int heartbeatInterval,
|
||||||
@ -135,9 +135,8 @@ public void init(int id, int heartbeatInterval,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG.info(MessageFormat
|
LOG.info("Added new job with {} mapper and {} reducers",
|
||||||
.format("Added new job with {0} mapper and {1} reducers",
|
allMaps.size(), allReduces.size());
|
||||||
allMaps.size(), allReduces.size()));
|
|
||||||
|
|
||||||
mapTotal = allMaps.size();
|
mapTotal = allMaps.size();
|
||||||
reduceTotal = allReduces.size();
|
reduceTotal = allReduces.size();
|
||||||
@ -165,22 +164,21 @@ protected void processResponseQueue() throws Exception {
|
|||||||
ContainerId containerId = cs.getContainerId();
|
ContainerId containerId = cs.getContainerId();
|
||||||
if (cs.getExitStatus() == ContainerExitStatus.SUCCESS) {
|
if (cs.getExitStatus() == ContainerExitStatus.SUCCESS) {
|
||||||
if (assignedMaps.containsKey(containerId)) {
|
if (assignedMaps.containsKey(containerId)) {
|
||||||
LOG.debug(MessageFormat.format("Application {0} has one" +
|
LOG.debug("Application {} has one mapper finished ({}).",
|
||||||
"mapper finished ({1}).", appId, containerId));
|
appId, containerId);
|
||||||
assignedMaps.remove(containerId);
|
assignedMaps.remove(containerId);
|
||||||
mapFinished ++;
|
mapFinished ++;
|
||||||
finishedContainers ++;
|
finishedContainers ++;
|
||||||
} else if (assignedReduces.containsKey(containerId)) {
|
} else if (assignedReduces.containsKey(containerId)) {
|
||||||
LOG.debug(MessageFormat.format("Application {0} has one" +
|
LOG.debug("Application {} has one reducer finished ({}).",
|
||||||
"reducer finished ({1}).", appId, containerId));
|
appId, containerId);
|
||||||
assignedReduces.remove(containerId);
|
assignedReduces.remove(containerId);
|
||||||
reduceFinished ++;
|
reduceFinished ++;
|
||||||
finishedContainers ++;
|
finishedContainers ++;
|
||||||
} else if (amContainer.getId().equals(containerId)){
|
} else if (amContainer.getId().equals(containerId)){
|
||||||
// am container released event
|
// am container released event
|
||||||
isFinished = true;
|
isFinished = true;
|
||||||
LOG.info(MessageFormat.format("Application {0} goes to " +
|
LOG.info("Application {} goes to finish.", appId);
|
||||||
"finish.", appId));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (mapFinished >= mapTotal && reduceFinished >= reduceTotal) {
|
if (mapFinished >= mapTotal && reduceFinished >= reduceTotal) {
|
||||||
@ -189,16 +187,16 @@ protected void processResponseQueue() throws Exception {
|
|||||||
} else {
|
} else {
|
||||||
// container to be killed
|
// container to be killed
|
||||||
if (assignedMaps.containsKey(containerId)) {
|
if (assignedMaps.containsKey(containerId)) {
|
||||||
LOG.debug(MessageFormat.format("Application {0} has one " +
|
LOG.debug("Application {} has one mapper killed ({}).",
|
||||||
"mapper killed ({1}).", appId, containerId));
|
appId, containerId);
|
||||||
pendingFailedMaps.add(assignedMaps.remove(containerId));
|
pendingFailedMaps.add(assignedMaps.remove(containerId));
|
||||||
} else if (assignedReduces.containsKey(containerId)) {
|
} else if (assignedReduces.containsKey(containerId)) {
|
||||||
LOG.debug(MessageFormat.format("Application {0} has one " +
|
LOG.debug("Application {} has one reducer killed ({}).",
|
||||||
"reducer killed ({1}).", appId, containerId));
|
appId, containerId);
|
||||||
pendingFailedReduces.add(assignedReduces.remove(containerId));
|
pendingFailedReduces.add(assignedReduces.remove(containerId));
|
||||||
} else if (amContainer.getId().equals(containerId)){
|
} else if (amContainer.getId().equals(containerId)){
|
||||||
LOG.info(MessageFormat.format("Application {0}'s AM is " +
|
LOG.info("Application {}'s AM is " +
|
||||||
"going to be killed. Waiting for rescheduling...", appId));
|
"going to be killed. Waiting for rescheduling...", appId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -209,8 +207,8 @@ protected void processResponseQueue() throws Exception {
|
|||||||
(mapFinished >= mapTotal) &&
|
(mapFinished >= mapTotal) &&
|
||||||
(reduceFinished >= reduceTotal)) {
|
(reduceFinished >= reduceTotal)) {
|
||||||
isAMContainerRunning = false;
|
isAMContainerRunning = false;
|
||||||
LOG.debug(MessageFormat.format("Application {0} sends out event " +
|
LOG.debug("Application {} sends out event to clean up"
|
||||||
"to clean up its AM container.", appId));
|
+ " its AM container.", appId);
|
||||||
isFinished = true;
|
isFinished = true;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -219,15 +217,15 @@ protected void processResponseQueue() throws Exception {
|
|||||||
for (Container container : response.getAllocatedContainers()) {
|
for (Container container : response.getAllocatedContainers()) {
|
||||||
if (! scheduledMaps.isEmpty()) {
|
if (! scheduledMaps.isEmpty()) {
|
||||||
ContainerSimulator cs = scheduledMaps.remove();
|
ContainerSimulator cs = scheduledMaps.remove();
|
||||||
LOG.debug(MessageFormat.format("Application {0} starts a " +
|
LOG.debug("Application {} starts to launch a mapper ({}).",
|
||||||
"launch a mapper ({1}).", appId, container.getId()));
|
appId, container.getId());
|
||||||
assignedMaps.put(container.getId(), cs);
|
assignedMaps.put(container.getId(), cs);
|
||||||
se.getNmMap().get(container.getNodeId())
|
se.getNmMap().get(container.getNodeId())
|
||||||
.addNewContainer(container, cs.getLifeTime());
|
.addNewContainer(container, cs.getLifeTime());
|
||||||
} else if (! this.scheduledReduces.isEmpty()) {
|
} else if (! this.scheduledReduces.isEmpty()) {
|
||||||
ContainerSimulator cs = scheduledReduces.remove();
|
ContainerSimulator cs = scheduledReduces.remove();
|
||||||
LOG.debug(MessageFormat.format("Application {0} starts a " +
|
LOG.debug("Application {} starts to launch a reducer ({}).",
|
||||||
"launch a reducer ({1}).", appId, container.getId()));
|
appId, container.getId());
|
||||||
assignedReduces.put(container.getId(), cs);
|
assignedReduces.put(container.getId(), cs);
|
||||||
se.getNmMap().get(container.getNodeId())
|
se.getNmMap().get(container.getNodeId())
|
||||||
.addNewContainer(container, cs.getLifeTime());
|
.addNewContainer(container, cs.getLifeTime());
|
||||||
@ -289,17 +287,15 @@ protected void sendContainerRequest()
|
|||||||
if (!pendingMaps.isEmpty()) {
|
if (!pendingMaps.isEmpty()) {
|
||||||
ask = packageRequests(mergeLists(pendingMaps, scheduledMaps),
|
ask = packageRequests(mergeLists(pendingMaps, scheduledMaps),
|
||||||
PRIORITY_MAP);
|
PRIORITY_MAP);
|
||||||
LOG.debug(MessageFormat
|
LOG.debug("Application {} sends out request for {} mappers.",
|
||||||
.format("Application {0} sends out " + "request for {1} mappers.",
|
appId, pendingMaps.size());
|
||||||
appId, pendingMaps.size()));
|
|
||||||
scheduledMaps.addAll(pendingMaps);
|
scheduledMaps.addAll(pendingMaps);
|
||||||
pendingMaps.clear();
|
pendingMaps.clear();
|
||||||
} else if (!pendingFailedMaps.isEmpty()) {
|
} else if (!pendingFailedMaps.isEmpty()) {
|
||||||
ask = packageRequests(mergeLists(pendingFailedMaps, scheduledMaps),
|
ask = packageRequests(mergeLists(pendingFailedMaps, scheduledMaps),
|
||||||
PRIORITY_MAP);
|
PRIORITY_MAP);
|
||||||
LOG.debug(MessageFormat.format(
|
LOG.debug("Application {} sends out requests for {} failed mappers.",
|
||||||
"Application {0} sends out " + "requests for {1} failed mappers.",
|
appId, pendingFailedMaps.size());
|
||||||
appId, pendingFailedMaps.size()));
|
|
||||||
scheduledMaps.addAll(pendingFailedMaps);
|
scheduledMaps.addAll(pendingFailedMaps);
|
||||||
pendingFailedMaps.clear();
|
pendingFailedMaps.clear();
|
||||||
}
|
}
|
||||||
@ -308,17 +304,15 @@ protected void sendContainerRequest()
|
|||||||
if (!pendingReduces.isEmpty()) {
|
if (!pendingReduces.isEmpty()) {
|
||||||
ask = packageRequests(mergeLists(pendingReduces, scheduledReduces),
|
ask = packageRequests(mergeLists(pendingReduces, scheduledReduces),
|
||||||
PRIORITY_REDUCE);
|
PRIORITY_REDUCE);
|
||||||
LOG.debug(MessageFormat
|
LOG.debug("Application {} sends out requests for {} reducers.",
|
||||||
.format("Application {0} sends out " + "requests for {1} reducers.",
|
appId, pendingReduces.size());
|
||||||
appId, pendingReduces.size()));
|
|
||||||
scheduledReduces.addAll(pendingReduces);
|
scheduledReduces.addAll(pendingReduces);
|
||||||
pendingReduces.clear();
|
pendingReduces.clear();
|
||||||
} else if (!pendingFailedReduces.isEmpty()) {
|
} else if (!pendingFailedReduces.isEmpty()) {
|
||||||
ask = packageRequests(mergeLists(pendingFailedReduces, scheduledReduces),
|
ask = packageRequests(mergeLists(pendingFailedReduces, scheduledReduces),
|
||||||
PRIORITY_REDUCE);
|
PRIORITY_REDUCE);
|
||||||
LOG.debug(MessageFormat.format(
|
LOG.debug("Application {} sends out request for {} failed reducers.",
|
||||||
"Application {0} sends out " + "request for {1} failed reducers.",
|
appId, pendingFailedReduces.size());
|
||||||
appId, pendingFailedReduces.size()));
|
|
||||||
scheduledReduces.addAll(pendingFailedReduces);
|
scheduledReduces.addAll(pendingFailedReduces);
|
||||||
pendingFailedReduces.clear();
|
pendingFailedReduces.clear();
|
||||||
}
|
}
|
||||||
|
@ -19,7 +19,6 @@
|
|||||||
package org.apache.hadoop.yarn.sls.nodemanager;
|
package org.apache.hadoop.yarn.sls.nodemanager;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.text.MessageFormat;
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@ -51,11 +50,11 @@
|
|||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||||
import org.apache.hadoop.yarn.util.Records;
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
import org.apache.log4j.Logger;
|
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
|
import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
|
||||||
import org.apache.hadoop.yarn.sls.scheduler.TaskRunner;
|
import org.apache.hadoop.yarn.sls.scheduler.TaskRunner;
|
||||||
import org.apache.hadoop.yarn.sls.utils.SLSUtils;
|
import org.apache.hadoop.yarn.sls.utils.SLSUtils;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
@Unstable
|
@Unstable
|
||||||
@ -74,7 +73,7 @@ public class NMSimulator extends TaskRunner.Task {
|
|||||||
private ResourceManager rm;
|
private ResourceManager rm;
|
||||||
// heart beat response id
|
// heart beat response id
|
||||||
private int RESPONSE_ID = 1;
|
private int RESPONSE_ID = 1;
|
||||||
private final static Logger LOG = Logger.getLogger(NMSimulator.class);
|
private final static Logger LOG = LoggerFactory.getLogger(NMSimulator.class);
|
||||||
|
|
||||||
public void init(String nodeIdStr, int memory, int cores,
|
public void init(String nodeIdStr, int memory, int cores,
|
||||||
int dispatchTime, int heartBeatInterval, ResourceManager rm)
|
int dispatchTime, int heartBeatInterval, ResourceManager rm)
|
||||||
@ -120,8 +119,7 @@ public void middleStep() throws Exception {
|
|||||||
while ((cs = containerQueue.poll()) != null) {
|
while ((cs = containerQueue.poll()) != null) {
|
||||||
runningContainers.remove(cs.getId());
|
runningContainers.remove(cs.getId());
|
||||||
completedContainerList.add(cs.getId());
|
completedContainerList.add(cs.getId());
|
||||||
LOG.debug(MessageFormat.format("Container {0} has completed",
|
LOG.debug("Container {} has completed", cs.getId());
|
||||||
cs.getId()));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -148,14 +146,14 @@ public void middleStep() throws Exception {
|
|||||||
synchronized(amContainerList) {
|
synchronized(amContainerList) {
|
||||||
amContainerList.remove(containerId);
|
amContainerList.remove(containerId);
|
||||||
}
|
}
|
||||||
LOG.debug(MessageFormat.format("NodeManager {0} releases " +
|
LOG.debug("NodeManager {} releases an AM ({}).",
|
||||||
"an AM ({1}).", node.getNodeID(), containerId));
|
node.getNodeID(), containerId);
|
||||||
} else {
|
} else {
|
||||||
cs = runningContainers.remove(containerId);
|
cs = runningContainers.remove(containerId);
|
||||||
containerQueue.remove(cs);
|
containerQueue.remove(cs);
|
||||||
releasedContainerList.add(containerId);
|
releasedContainerList.add(containerId);
|
||||||
LOG.debug(MessageFormat.format("NodeManager {0} releases a " +
|
LOG.debug("NodeManager {} releases a container ({}).",
|
||||||
"container ({1}).", node.getNodeID(), containerId));
|
node.getNodeID(), containerId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -189,8 +187,8 @@ private ArrayList<ContainerStatus> generateContainerStatusList() {
|
|||||||
// add complete containers
|
// add complete containers
|
||||||
synchronized(completedContainerList) {
|
synchronized(completedContainerList) {
|
||||||
for (ContainerId cId : completedContainerList) {
|
for (ContainerId cId : completedContainerList) {
|
||||||
LOG.debug(MessageFormat.format("NodeManager {0} completed" +
|
LOG.debug("NodeManager {} completed container ({}).",
|
||||||
" container ({1}).", node.getNodeID(), cId));
|
node.getNodeID(), cId);
|
||||||
csList.add(newContainerStatus(
|
csList.add(newContainerStatus(
|
||||||
cId, ContainerState.COMPLETE, ContainerExitStatus.SUCCESS));
|
cId, ContainerState.COMPLETE, ContainerExitStatus.SUCCESS));
|
||||||
}
|
}
|
||||||
@ -199,8 +197,8 @@ private ArrayList<ContainerStatus> generateContainerStatusList() {
|
|||||||
// released containers
|
// released containers
|
||||||
synchronized(releasedContainerList) {
|
synchronized(releasedContainerList) {
|
||||||
for (ContainerId cId : releasedContainerList) {
|
for (ContainerId cId : releasedContainerList) {
|
||||||
LOG.debug(MessageFormat.format("NodeManager {0} released container" +
|
LOG.debug("NodeManager {} released container ({}).",
|
||||||
" ({1}).", node.getNodeID(), cId));
|
node.getNodeID(), cId);
|
||||||
csList.add(newContainerStatus(
|
csList.add(newContainerStatus(
|
||||||
cId, ContainerState.COMPLETE, ContainerExitStatus.ABORTED));
|
cId, ContainerState.COMPLETE, ContainerExitStatus.ABORTED));
|
||||||
}
|
}
|
||||||
@ -227,8 +225,8 @@ public RMNode getNode() {
|
|||||||
* launch a new container with the given life time
|
* launch a new container with the given life time
|
||||||
*/
|
*/
|
||||||
public void addNewContainer(Container container, long lifeTimeMS) {
|
public void addNewContainer(Container container, long lifeTimeMS) {
|
||||||
LOG.debug(MessageFormat.format("NodeManager {0} launches a new " +
|
LOG.debug("NodeManager {} launches a new container ({}).",
|
||||||
"container ({1}).", node.getNodeID(), container.getId()));
|
node.getNodeID(), container.getId());
|
||||||
if (lifeTimeMS != -1) {
|
if (lifeTimeMS != -1) {
|
||||||
// normal container
|
// normal container
|
||||||
ContainerSimulator cs = new ContainerSimulator(container.getId(),
|
ContainerSimulator cs = new ContainerSimulator(container.getId(),
|
||||||
|
@ -60,14 +60,16 @@
|
|||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
|
||||||
import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
|
import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
|
||||||
import org.apache.hadoop.yarn.sls.web.SLSWebApp;
|
import org.apache.hadoop.yarn.sls.web.SLSWebApp;
|
||||||
import org.apache.log4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
@Unstable
|
@Unstable
|
||||||
public abstract class SchedulerMetrics {
|
public abstract class SchedulerMetrics {
|
||||||
private static final String EOL = System.getProperty("line.separator");
|
private static final String EOL = System.getProperty("line.separator");
|
||||||
private static final int SAMPLING_SIZE = 60;
|
private static final int SAMPLING_SIZE = 60;
|
||||||
private static final Logger LOG = Logger.getLogger(SchedulerMetrics.class);
|
private static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(SchedulerMetrics.class);
|
||||||
|
|
||||||
protected ResourceScheduler scheduler;
|
protected ResourceScheduler scheduler;
|
||||||
protected Set<String> trackedQueues;
|
protected Set<String> trackedQueues;
|
||||||
@ -420,7 +422,7 @@ private void initMetricsCSVOutput() {
|
|||||||
SLSConfiguration.METRICS_RECORD_INTERVAL_MS_DEFAULT);
|
SLSConfiguration.METRICS_RECORD_INTERVAL_MS_DEFAULT);
|
||||||
File dir = new File(metricsOutputDir + "/metrics");
|
File dir = new File(metricsOutputDir + "/metrics");
|
||||||
if(!dir.exists() && !dir.mkdirs()) {
|
if(!dir.exists() && !dir.mkdirs()) {
|
||||||
LOG.error("Cannot create directory " + dir.getAbsoluteFile());
|
LOG.error("Cannot create directory {}", dir.getAbsoluteFile());
|
||||||
}
|
}
|
||||||
final CsvReporter reporter = CsvReporter.forRegistry(metrics)
|
final CsvReporter reporter = CsvReporter.forRegistry(metrics)
|
||||||
.formatFor(Locale.US)
|
.formatFor(Locale.US)
|
||||||
|
Loading…
Reference in New Issue
Block a user