diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml
index 537807227d..a2354784d0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml
@@ -249,10 +249,6 @@
src/test/resources/application_1440536969523_0001.har/part-0
src/test/resources/application_1440536969523_0001.har/_masterindex
src/test/resources/application_1440536969523_0001.har/_SUCCESS
- src/test/resources/application_123456_0001.har/_index
- src/test/resources/application_123456_0001.har/part-0
- src/test/resources/application_123456_0001.har/_masterindex
- src/test/resources/application_123456_0001.har/_SUCCESS
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java
index 5bba2e0a40..56bae26d93 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java
@@ -495,21 +495,16 @@ public boolean readAggregatedLogs(ContainerLogsRequest logRequest,
boolean getAllContainers = (containerIdStr == null
|| containerIdStr.isEmpty());
long size = logRequest.getBytes();
- RemoteIterator nodeFiles = LogAggregationUtils
- .getRemoteNodeFileDir(conf, appId, logRequest.getAppOwner(),
+ List nodeFiles = LogAggregationUtils
+ .getRemoteNodeFileList(conf, appId, logRequest.getAppOwner(),
this.remoteRootLogDir, this.remoteRootLogDirSuffix);
- if (!nodeFiles.hasNext()) {
+ if (nodeFiles.isEmpty()) {
throw new IOException("There is no available log fils for "
+ "application:" + appId);
}
- List allFiles = getAllNodeFiles(nodeFiles, appId);
- if (allFiles.isEmpty()) {
- throw new IOException("There is no available log fils for "
- + "application:" + appId);
- }
- Map checkSumFiles = parseCheckSumFiles(allFiles);
+ Map checkSumFiles = parseCheckSumFiles(nodeFiles);
List fileToRead = getNodeLogFileToRead(
- allFiles, nodeIdStr, appId);
+ nodeFiles, nodeIdStr, appId);
byte[] buf = new byte[65535];
for (FileStatus thisNodeFile : fileToRead) {
String nodeName = thisNodeFile.getPath().getName();
@@ -614,21 +609,16 @@ public List readAggregatedLogsMeta(
containerIdStr.isEmpty());
String nodeIdStr = (nodeId == null || nodeId.isEmpty()) ? null
: LogAggregationUtils.getNodeString(nodeId);
- RemoteIterator nodeFiles = LogAggregationUtils
- .getRemoteNodeFileDir(conf, appId, appOwner, this.remoteRootLogDir,
+ List nodeFiles = LogAggregationUtils
+ .getRemoteNodeFileList(conf, appId, appOwner, this.remoteRootLogDir,
this.remoteRootLogDirSuffix);
- if (!nodeFiles.hasNext()) {
+ if (nodeFiles.isEmpty()) {
throw new IOException("There is no available log fils for "
+ "application:" + appId);
}
- List allFiles = getAllNodeFiles(nodeFiles, appId);
- if (allFiles.isEmpty()) {
- throw new IOException("There is no available log fils for "
- + "application:" + appId);
- }
- Map checkSumFiles = parseCheckSumFiles(allFiles);
+ Map checkSumFiles = parseCheckSumFiles(nodeFiles);
List fileToRead = getNodeLogFileToRead(
- allFiles, nodeIdStr, appId);
+ nodeFiles, nodeIdStr, appId);
for(FileStatus thisNodeFile : fileToRead) {
try {
Long checkSumIndex = checkSumFiles.get(
@@ -737,37 +727,25 @@ public List getNodeLogFileToRead(
List nodeFiles, String nodeId, ApplicationId appId)
throws IOException {
List listOfFiles = new ArrayList<>();
- for (FileStatus thisNodeFile : nodeFiles) {
- String nodeName = thisNodeFile.getPath().getName();
+ List files = new ArrayList<>(nodeFiles);
+ for (FileStatus file : files) {
+ String nodeName = file.getPath().getName();
if ((nodeId == null || nodeId.isEmpty()
|| nodeName.contains(LogAggregationUtils
.getNodeString(nodeId))) && !nodeName.endsWith(
LogAggregationUtils.TMP_FILE_SUFFIX) &&
!nodeName.endsWith(CHECK_SUM_FILE_SUFFIX)) {
- listOfFiles.add(thisNodeFile);
+ if (nodeName.equals(appId + ".har")) {
+ Path p = new Path("har:///" + file.getPath().toUri().getRawPath());
+ files = Arrays.asList(HarFs.get(p.toUri(), conf).listStatus(p));
+ continue;
+ }
+ listOfFiles.add(file);
}
}
return listOfFiles;
}
- private List getAllNodeFiles(
- RemoteIterator nodeFiles, ApplicationId appId)
- throws IOException {
- List listOfFiles = new ArrayList<>();
- while (nodeFiles != null && nodeFiles.hasNext()) {
- FileStatus thisNodeFile = nodeFiles.next();
- String nodeName = thisNodeFile.getPath().getName();
- if (nodeName.equals(appId + ".har")) {
- Path p = new Path("har:///"
- + thisNodeFile.getPath().toUri().getRawPath());
- nodeFiles = HarFs.get(p.toUri(), conf).listStatusIterator(p);
- continue;
- }
- listOfFiles.add(thisNodeFile);
- }
- return listOfFiles;
- }
-
@Private
public FileStatus getAllChecksumFiles(Map fileMap,
String fileName) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/TestLogAggregationIndexFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/TestLogAggregationIndexFileController.java
index 7922679757..9c02c1b148 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/TestLogAggregationIndexFileController.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/TestLogAggregationIndexFileController.java
@@ -19,7 +19,6 @@
package org.apache.hadoop.yarn.logaggregation.filecontroller.ifile;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertNotNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.ByteArrayOutputStream;
@@ -28,7 +27,6 @@
import java.io.IOException;
import java.io.PrintStream;
import java.io.Writer;
-import java.net.URL;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.HashMap;
@@ -366,58 +364,6 @@ public boolean isRollover(final FileContext fc,
sysOutStream.reset();
}
- @Test(timeout = 15000)
- public void testFetchApplictionLogsHar() throws Exception {
- List newLogTypes = new ArrayList<>();
- newLogTypes.add("syslog");
- newLogTypes.add("stdout");
- newLogTypes.add("stderr");
- newLogTypes.add("test1");
- newLogTypes.add("test2");
- URL harUrl = ClassLoader.getSystemClassLoader()
- .getResource("application_123456_0001.har");
- assertNotNull(harUrl);
-
- Path path = new Path(remoteLogDir + "/" + USER_UGI.getShortUserName()
- + "/logs/application_123456_0001");
- if (fs.exists(path)) {
- fs.delete(path, true);
- }
- assertTrue(fs.mkdirs(path));
- Path harPath = new Path(path, "application_123456_0001.har");
- fs.copyFromLocalFile(false, new Path(harUrl.toURI()), harPath);
- assertTrue(fs.exists(harPath));
- LogAggregationIndexedFileController fileFormat
- = new LogAggregationIndexedFileController();
- fileFormat.initialize(conf, "Indexed");
- ContainerLogsRequest logRequest = new ContainerLogsRequest();
- logRequest.setAppId(appId);
- logRequest.setNodeId(nodeId.toString());
- logRequest.setAppOwner(USER_UGI.getShortUserName());
- logRequest.setContainerId(containerId.toString());
- logRequest.setBytes(Long.MAX_VALUE);
- List meta = fileFormat.readAggregatedLogsMeta(
- logRequest);
- Assert.assertEquals(meta.size(), 3);
- List fileNames = new ArrayList<>();
- for (ContainerLogMeta log : meta) {
- Assert.assertTrue(log.getContainerId().equals(containerId.toString()));
- Assert.assertTrue(log.getNodeId().equals(nodeId.toString()));
- for (ContainerLogFileInfo file : log.getContainerLogMeta()) {
- fileNames.add(file.getFileName());
- }
- }
- fileNames.removeAll(newLogTypes);
- Assert.assertTrue(fileNames.isEmpty());
- boolean foundLogs = fileFormat.readAggregatedLogs(logRequest, System.out);
- Assert.assertTrue(foundLogs);
- for (String logType : newLogTypes) {
- Assert.assertTrue(sysOutStream.toString().contains(logMessage(
- containerId, logType)));
- }
- sysOutStream.reset();
- }
-
private File createAndWriteLocalLogFile(ContainerId containerId,
Path localLogDir, String logType) throws IOException {
File file = new File(localLogDir.toString(), logType);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/application_123456_0001.har/_SUCCESS b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/application_123456_0001.har/_SUCCESS
deleted file mode 100644
index e69de29bb2..0000000000
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/application_123456_0001.har/_index b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/application_123456_0001.har/_index
deleted file mode 100644
index b042846e1b..0000000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/application_123456_0001.har/_index
+++ /dev/null
@@ -1,3 +0,0 @@
-%2F dir 1517728311922+493+xuan+supergroup 0 0 localhost_9999_1517727665265 localhost_9999_1517727668513
-%2Flocalhost_9999_1517727665265 file part-0 0 2895 1517728301581+420+xuan+supergroup
-%2Flocalhost_9999_1517727668513 file part-0 2895 1228 1517728311919+420+xuan+supergroup
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/application_123456_0001.har/_masterindex b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/application_123456_0001.har/_masterindex
deleted file mode 100644
index cda8cbdcab..0000000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/application_123456_0001.har/_masterindex
+++ /dev/null
@@ -1,2 +0,0 @@
-3
-0 1897968749 0 280
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/application_123456_0001.har/part-0 b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/application_123456_0001.har/part-0
deleted file mode 100644
index aea95fa8da..0000000000
Binary files a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/application_123456_0001.har/part-0 and /dev/null differ
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java
index ff50330df3..fc30a805bf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java
@@ -112,9 +112,4 @@ public abstract void setRunningApplications(
* @param physicalResource Physical resources in the node.
*/
public abstract void setPhysicalResource(Resource physicalResource);
-
- public abstract List getLogAggregationReportsForApps();
-
- public abstract void setLogAggregationReportsForApps(
- List logAggregationReportsForApps);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java
index f1d73397ff..eda06d0dd8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java
@@ -38,13 +38,11 @@
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeLabelProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
-import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NMContainerStatusProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto.Builder;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProtoOrBuilder;
-import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
@@ -59,8 +57,6 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest
private List runningApplications = null;
private Set labels = null;
- private List logAggregationReportsForApps = null;
-
/** Physical resources in the node. */
private Resource physicalResource = null;
@@ -104,48 +100,6 @@ private synchronized void mergeLocalToBuilder() {
if (this.physicalResource != null) {
builder.setPhysicalResource(convertToProtoFormat(this.physicalResource));
}
- if (this.logAggregationReportsForApps != null) {
- addLogAggregationStatusForAppsToProto();
- }
- }
-
- private void addLogAggregationStatusForAppsToProto() {
- maybeInitBuilder();
- builder.clearLogAggregationReportsForApps();
- if (this.logAggregationReportsForApps == null) {
- return;
- }
- Iterable it =
- new Iterable() {
- @Override
- public Iterator iterator() {
- return new Iterator() {
- private Iterator iter =
- logAggregationReportsForApps.iterator();
-
- @Override
- public boolean hasNext() {
- return iter.hasNext();
- }
-
- @Override
- public LogAggregationReportProto next() {
- return convertToProtoFormat(iter.next());
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
- };
- }
- };
- builder.addAllLogAggregationReportsForApps(it);
- }
-
- private LogAggregationReportProto convertToProtoFormat(
- LogAggregationReport value) {
- return ((LogAggregationReportPBImpl) value).getProto();
}
private synchronized void addNMContainerStatusesToProto() {
@@ -446,37 +400,4 @@ private static NMContainerStatusProto convertToProtoFormat(
NMContainerStatus c) {
return ((NMContainerStatusPBImpl)c).getProto();
}
-
- @Override
- public List getLogAggregationReportsForApps() {
- if (this.logAggregationReportsForApps != null) {
- return this.logAggregationReportsForApps;
- }
- initLogAggregationReportsForApps();
- return logAggregationReportsForApps;
- }
-
- private void initLogAggregationReportsForApps() {
- RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder;
- List list =
- p.getLogAggregationReportsForAppsList();
- this.logAggregationReportsForApps = new ArrayList();
- for (LogAggregationReportProto c : list) {
- this.logAggregationReportsForApps.add(convertFromProtoFormat(c));
- }
- }
-
- private LogAggregationReport convertFromProtoFormat(
- LogAggregationReportProto logAggregationReport) {
- return new LogAggregationReportPBImpl(logAggregationReport);
- }
-
- @Override
- public void setLogAggregationReportsForApps(
- List logAggregationStatusForApps) {
- if(logAggregationStatusForApps == null) {
- builder.clearLogAggregationReportsForApps();
- }
- this.logAggregationReportsForApps = logAggregationStatusForApps;
- }
}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
index 1b090bf232..e782cc251d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
@@ -66,7 +66,6 @@ message RegisterNodeManagerRequestProto {
repeated ApplicationIdProto runningApplications = 7;
optional NodeLabelsProto nodeLabels = 8;
optional ResourceProto physicalResource = 9;
- repeated LogAggregationReportProto log_aggregation_reports_for_apps = 10;
}
message RegisterNodeManagerResponseProto {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
index d3dd2b98a7..d7e3b528f1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
@@ -33,7 +33,7 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.tracker.NMLogAggregationStatusTracker;
+
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePluginManager;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
@@ -121,8 +121,6 @@ public interface Context {
NMTimelinePublisher getNMTimelinePublisher();
- NMLogAggregationStatusTracker getNMLogAggregationStatusTracker();
-
ContainerExecutor getContainerExecutor();
ContainerStateTransitionListener getContainerStateTransitionListener();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
index d5b8fd3bcf..42b7b5f8cc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
@@ -58,7 +58,6 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.tracker.NMLogAggregationStatusTracker;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePluginManager;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.nodelabels.ConfigurationNodeLabelsProvider;
@@ -622,8 +621,6 @@ public static class NMContext implements Context {
private ResourcePluginManager resourcePluginManager;
- private NMLogAggregationStatusTracker nmLogAggregationStatusTracker;
-
public NMContext(NMContainerTokenSecretManager containerTokenSecretManager,
NMTokenSecretManagerInNM nmTokenSecretManager,
LocalDirsHandlerService dirsHandler, ApplicationACLsManager aclsManager,
@@ -865,15 +862,6 @@ public DeletionService getDeletionService() {
public void setDeletionService(DeletionService deletionService) {
this.deletionService = deletionService;
}
-
- public void setNMLogAggregationStatusTracker(
- NMLogAggregationStatusTracker nmLogAggregationStatusTracker) {
- this.nmLogAggregationStatusTracker = nmLogAggregationStatusTracker;
- }
- @Override
- public NMLogAggregationStatusTracker getNMLogAggregationStatusTracker() {
- return nmLogAggregationStatusTracker;
- }
}
/**
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index 8154723f08..3d3f573769 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -381,20 +381,6 @@ nodeManagerVersionId, containerReports, getRunningApplications(),
if (containerReports != null) {
LOG.info("Registering with RM using containers :" + containerReports);
}
- if (logAggregationEnabled) {
- // pull log aggregation status for application running in this NM
- List logAggregationReports =
- context.getNMLogAggregationStatusTracker()
- .pullCachedLogAggregationReports();
- if (LOG.isDebugEnabled()) {
- LOG.debug("The cache log aggregation status size:"
- + logAggregationReports.size());
- }
- if (logAggregationReports != null
- && !logAggregationReports.isEmpty()) {
- request.setLogAggregationReportsForApps(logAggregationReports);
- }
- }
regNMResponse =
resourceTracker.registerNodeManager(request);
// Make sure rmIdentifier is set before we release the lock
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index 0b2fca1ec1..6b4d517599 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -109,7 +109,6 @@
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger;
import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger.AuditConstants;
-import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyService;
@@ -139,7 +138,6 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.tracker.NMLogAggregationStatusTracker;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.NonAggregatingLogHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEventType;
@@ -228,8 +226,6 @@ private enum ReInitOp {
// NM metrics publisher is set only if the timeline service v.2 is enabled
private NMTimelinePublisher nmMetricsPublisher;
- private NMLogAggregationStatusTracker nmLogAggregationStatusTracker;
-
public ContainerManagerImpl(Context context, ContainerExecutor exec,
DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater,
NodeManagerMetrics metrics, LocalDirsHandlerService dirsHandler) {
@@ -287,10 +283,6 @@ public ContainerManagerImpl(Context context, ContainerExecutor exec,
addService(dispatcher);
- this.nmLogAggregationStatusTracker = createNMLogAggregationStatusTracker(
- context);
- ((NMContext)context).setNMLogAggregationStatusTracker(
- this.nmLogAggregationStatusTracker);
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
this.readLock = lock.readLock();
this.writeLock = lock.writeLock();
@@ -566,11 +558,6 @@ protected NMTimelinePublisher createNMTimelinePublisher(Context ctxt) {
return nmTimelinePublisherLocal;
}
- protected NMLogAggregationStatusTracker createNMLogAggregationStatusTracker(
- Context ctxt) {
- return new NMLogAggregationStatusTracker(ctxt);
- }
-
protected ContainersLauncher createContainersLauncher(Context context,
ContainerExecutor exec) {
return new ContainersLauncher(context, this.dispatcher, exec, dirsHandler, this);
@@ -666,7 +653,6 @@ protected void serviceStart() throws Exception {
}
}
- this.nmLogAggregationStatusTracker.start();
LOG.info("ContainerManager started at " + connectAddress);
LOG.info("ContainerManager bound to " + initialAddress);
}
@@ -705,7 +691,6 @@ public void serviceStop() throws Exception {
server.stop();
}
super.serviceStop();
- this.nmLogAggregationStatusTracker.stop();
}
public void cleanUpApplicationsOnNMShutDown() {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
index c7e06ff73b..4ac150ac06 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
@@ -385,8 +385,7 @@ private void sendLogAggregationReport(
logAggregationSucceedInThisCycle
? LogAggregationStatus.RUNNING
: LogAggregationStatus.RUNNING_WITH_FAILURE;
- sendLogAggregationReportInternal(logAggregationStatus, diagnosticMessage,
- false);
+ sendLogAggregationReportInternal(logAggregationStatus, diagnosticMessage);
if (appFinished) {
// If the app is finished, one extra final report with log aggregation
// status SUCCEEDED/FAILED will be sent to RM to inform the RM
@@ -395,22 +394,18 @@ private void sendLogAggregationReport(
renameTemporaryLogFileFailed || !logAggregationSucceedInThisCycle
? LogAggregationStatus.FAILED
: LogAggregationStatus.SUCCEEDED;
- sendLogAggregationReportInternal(finalLogAggregationStatus, "", true);
+ sendLogAggregationReportInternal(finalLogAggregationStatus, "");
}
}
private void sendLogAggregationReportInternal(
- LogAggregationStatus logAggregationStatus, String diagnosticMessage,
- boolean finalized) {
+ LogAggregationStatus logAggregationStatus, String diagnosticMessage) {
LogAggregationReport report =
Records.newRecord(LogAggregationReport.class);
report.setApplicationId(appId);
report.setDiagnosticMessage(diagnosticMessage);
report.setLogAggregationStatus(logAggregationStatus);
this.context.getLogAggregationStatusForApps().add(report);
- this.context.getNMLogAggregationStatusTracker().updateLogAggregationStatus(
- appId, logAggregationStatus, System.currentTimeMillis(),
- diagnosticMessage, finalized);
}
@SuppressWarnings("unchecked")
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/tracker/NMLogAggregationStatusTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/tracker/NMLogAggregationStatusTracker.java
deleted file mode 100644
index 6d785d92c4..0000000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/tracker/NMLogAggregationStatusTracker.java
+++ /dev/null
@@ -1,244 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.tracker;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.Map.Entry;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
-import org.apache.hadoop.yarn.server.nodemanager.Context;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class NMLogAggregationStatusTracker {
-
- private static final Logger LOG =
- LoggerFactory.getLogger(NMLogAggregationStatusTracker.class);
-
- private final ReadLock updateLocker;
- private final WriteLock pullLocker;
- private final Context nmContext;
- private final long rollingInterval;
- private final Timer timer;
- private final Map trackers;
- private boolean disabled = false;
-
- public NMLogAggregationStatusTracker(Context context) {
- this.nmContext = context;
- Configuration conf = context.getConf();
- if (!conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
- YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)) {
- disabled = true;
- }
- this.trackers = new HashMap<>();
- ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
- this.updateLocker = lock.readLock();
- this.pullLocker = lock.writeLock();
- this.timer = new Timer();
- this.rollingInterval = conf.getLong(
- YarnConfiguration.LOG_AGGREGATION_STATUS_TIME_OUT_MS,
- YarnConfiguration.DEFAULT_LOG_AGGREGATION_STATUS_TIME_OUT_MS);
- LOG.info("the rolling interval seconds for the NodeManager Cached Log "
- + "aggregation status is " + (rollingInterval/1000));
- }
-
- public void start() {
- if (disabled) {
- LOG.warn("Log Aggregation is disabled."
- + "So is the LogAggregationStatusTracker.");
- } else {
- this.timer.scheduleAtFixedRate(new LogAggregationStatusRoller(),
- rollingInterval, rollingInterval);
- }
- }
-
- public void stop() {
- this.timer.cancel();
- }
-
- public void updateLogAggregationStatus(ApplicationId appId,
- LogAggregationStatus logAggregationStatus, long updateTime,
- String diagnosis, boolean finalized) {
- if (disabled) {
- LOG.warn("The log aggregation is diabled. No need to update "
- + "the log aggregation status");
- }
- this.updateLocker.lock();
- try {
- LogAggregationTrakcer tracker = trackers.get(appId);
- if (tracker == null) {
- Application application = this.nmContext.getApplications().get(appId);
- if (application == null) {
- // the application has already finished or
- // this application is unknown application.
- // Check the log aggregation status update time, if the update time is
- // still in the period of timeout, we add it to the trackers map.
- // Otherwise, we ignore it.
- long currentTime = System.currentTimeMillis();
- if (currentTime - updateTime > rollingInterval) {
- LOG.warn("Ignore the log aggregation status update request "
- + "for the application:" + appId + ". The log aggregation status"
- + " update time is " + updateTime + " while the request process "
- + "time is " + currentTime + ".");
- return;
- }
- }
- LogAggregationTrakcer newTracker = new LogAggregationTrakcer(
- logAggregationStatus, diagnosis);
- newTracker.setLastModifiedTime(updateTime);
- newTracker.setFinalized(finalized);
- trackers.put(appId, newTracker);
- } else {
- if (tracker.isFinalized()) {
- LOG.warn("Ignore the log aggregation status update request "
- + "for the application:" + appId + ". The cached log aggregation "
- + "status is " + tracker.getLogAggregationStatus() + ".");
- } else {
- if (tracker.getLastModifiedTime() > updateTime) {
- LOG.warn("Ignore the log aggregation status update request "
- + "for the application:" + appId + ". The request log "
- + "aggregation status update is older than the cached "
- + "log aggregation status.");
- } else {
- tracker.setLogAggregationStatus(logAggregationStatus);
- tracker.setDiagnosis(diagnosis);
- tracker.setLastModifiedTime(updateTime);
- tracker.setFinalized(finalized);
- trackers.put(appId, tracker);
- }
- }
- }
- } finally {
- this.updateLocker.unlock();
- }
- }
-
- public List pullCachedLogAggregationReports() {
- List reports = new ArrayList<>();
- if (disabled) {
- LOG.warn("The log aggregation is diabled."
- + "There is no cached log aggregation status.");
- return reports;
- }
- this.pullLocker.lock();
- try {
- for(Entry tracker :
- trackers.entrySet()) {
- LogAggregationTrakcer current = tracker.getValue();
- LogAggregationReport report = LogAggregationReport.newInstance(
- tracker.getKey(), current.getLogAggregationStatus(),
- current.getDiagnosis());
- reports.add(report);
- }
- return reports;
- } finally {
- this.pullLocker.unlock();
- }
- }
-
- private class LogAggregationStatusRoller extends TimerTask {
- @Override
- public void run() {
- rollLogAggregationStatus();
- }
- }
-
- @Private
- void rollLogAggregationStatus() {
- this.pullLocker.lock();
- try {
- long currentTimeStamp = System.currentTimeMillis();
- LOG.info("Rolling over the cached log aggregation status.");
- Iterator> it = trackers
- .entrySet().iterator();
- while (it.hasNext()) {
- Entry tracker = it.next();
- // the application has finished.
- if (nmContext.getApplications().get(tracker.getKey()) == null) {
- if (currentTimeStamp - tracker.getValue().getLastModifiedTime()
- > rollingInterval) {
- it.remove();
- }
- }
- }
- } finally {
- this.pullLocker.unlock();
- }
- }
-
- private static class LogAggregationTrakcer {
- private LogAggregationStatus logAggregationStatus;
- private long lastModifiedTime;
- private boolean finalized;
- private String diagnosis;
-
- public LogAggregationTrakcer(
- LogAggregationStatus logAggregationStatus, String diagnosis) {
- this.setLogAggregationStatus(logAggregationStatus);
- this.setDiagnosis(diagnosis);
- }
-
- public LogAggregationStatus getLogAggregationStatus() {
- return logAggregationStatus;
- }
-
- public void setLogAggregationStatus(
- LogAggregationStatus logAggregationStatus) {
- this.logAggregationStatus = logAggregationStatus;
- }
-
- public long getLastModifiedTime() {
- return lastModifiedTime;
- }
-
- public void setLastModifiedTime(long lastModifiedTime) {
- this.lastModifiedTime = lastModifiedTime;
- }
-
- public boolean isFinalized() {
- return finalized;
- }
-
- public void setFinalized(boolean finalized) {
- this.finalized = finalized;
- }
-
- public String getDiagnosis() {
- return diagnosis;
- }
-
- public void setDiagnosis(String diagnosis) {
- this.diagnosis = diagnosis;
- }
- }
-}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
index 4ac268b14e..9602142a14 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
@@ -55,7 +55,6 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.tracker.NMLogAggregationStatusTracker;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePluginManager;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService;
@@ -815,10 +814,5 @@ public NodeManagerMetrics getNodeManagerMetrics() {
public DeletionService getDeletionService() {
return null;
}
-
- @Override
- public NMLogAggregationStatusTracker getNMLogAggregationStatusTracker() {
- return null;
- }
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/tracker/TestNMLogAggregationStatusTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/tracker/TestNMLogAggregationStatusTracker.java
deleted file mode 100644
index e51bac179d..0000000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/tracker/TestNMLogAggregationStatusTracker.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.tracker;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
-import org.apache.hadoop.yarn.server.nodemanager.Context;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class TestNMLogAggregationStatusTracker {
-
- @Test
- public void testNMLogAggregationStatusUpdate() {
- Context mockContext = mock(Context.class);
- ConcurrentMap apps = new ConcurrentHashMap<>();
- when(mockContext.getApplications()).thenReturn(apps);
- // the log aggregation is disabled.
- Configuration conf = new YarnConfiguration();
- conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false);
- when(mockContext.getConf()).thenReturn(conf);
- NMLogAggregationStatusTracker tracker = new NMLogAggregationStatusTracker(
- mockContext);
- ApplicationId appId0 = ApplicationId.newInstance(0, 0);
- tracker.updateLogAggregationStatus(appId0,
- LogAggregationStatus.RUNNING, System.currentTimeMillis(), "", false);
- List reports = tracker
- .pullCachedLogAggregationReports();
- // we can not get any cached log aggregation status because
- // the log aggregation is disabled.
- Assert.assertTrue(reports.isEmpty());
-
- // enable the log aggregation.
- conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
- when(mockContext.getConf()).thenReturn(conf);
- tracker = new NMLogAggregationStatusTracker(mockContext);
- // update the log aggregation status for an un-existed application
- // the update time is not in the period of timeout.
- // So, we should not cache the log application status for this
- // application.
- appId0 = ApplicationId.newInstance(0, 0);
- tracker.updateLogAggregationStatus(appId0,
- LogAggregationStatus.RUNNING,
- System.currentTimeMillis() - 15 * 60 * 1000, "", false);
- reports = tracker
- .pullCachedLogAggregationReports();
- Assert.assertTrue(reports.isEmpty());
-
- tracker.updateLogAggregationStatus(appId0,
- LogAggregationStatus.RUNNING,
- System.currentTimeMillis() - 60 * 1000, "", false);
- reports = tracker
- .pullCachedLogAggregationReports();
- Assert.assertTrue(reports.size() == 1);
- Assert.assertTrue(reports.get(0).getLogAggregationStatus()
- == LogAggregationStatus.RUNNING);
-
- tracker.updateLogAggregationStatus(appId0,
- LogAggregationStatus.SUCCEEDED,
- System.currentTimeMillis() - 1 * 60 * 1000, "", true);
- reports = tracker
- .pullCachedLogAggregationReports();
- Assert.assertTrue(reports.size() == 1);
- Assert.assertTrue(reports.get(0).getLogAggregationStatus()
- == LogAggregationStatus.SUCCEEDED);
-
- // the log aggregation status is finalized. So, we would
- // ingore the following update
- tracker.updateLogAggregationStatus(appId0,
- LogAggregationStatus.FAILED,
- System.currentTimeMillis() - 1 * 60 * 1000, "", true);
- reports = tracker
- .pullCachedLogAggregationReports();
- Assert.assertTrue(reports.size() == 1);
- Assert.assertTrue(reports.get(0).getLogAggregationStatus()
- == LogAggregationStatus.SUCCEEDED);
- }
-
- public void testLogAggregationStatusRoller() throws InterruptedException {
- Context mockContext = mock(Context.class);
- Configuration conf = new YarnConfiguration();
- conf.setLong(YarnConfiguration.LOG_AGGREGATION_STATUS_TIME_OUT_MS,
- 10 * 1000);
- when(mockContext.getConf()).thenReturn(conf);
- NMLogAggregationStatusTracker tracker = new NMLogAggregationStatusTracker(
- mockContext);
- ApplicationId appId0 = ApplicationId.newInstance(0, 0);
- tracker.updateLogAggregationStatus(appId0,
- LogAggregationStatus.RUNNING,
- System.currentTimeMillis(), "", false);
- // sleep 10s
- Thread.sleep(10*1000);
- // the cache log aggregation status should be deleted.
- List reports = tracker
- .pullCachedLogAggregationReports();
- Assert.assertTrue(reports.size() == 0);
- }
-}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
index e997192811..9d95f636aa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
@@ -399,21 +399,9 @@ public RegisterNodeManagerResponse registerNodeManager(
RMNode oldNode = this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode);
if (oldNode == null) {
- RMNodeStartedEvent startEvent = new RMNodeStartedEvent(nodeId,
- request.getNMContainerStatuses(),
- request.getRunningApplications());
- if (request.getLogAggregationReportsForApps() != null
- && !request.getLogAggregationReportsForApps().isEmpty()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Found the number of previous cached log aggregation "
- + "status from nodemanager:" + nodeId + " is :"
- + request.getLogAggregationReportsForApps().size());
- }
- startEvent.setLogAggregationReportsForApps(request
- .getLogAggregationReportsForApps());
- }
this.rmContext.getDispatcher().getEventHandler().handle(
- startEvent);
+ new RMNodeStartedEvent(nodeId, request.getNMContainerStatuses(),
+ request.getRunningApplications()));
} else {
LOG.info("Reconnect from the node at: " + host);
this.nmLivelinessMonitor.unregister(nodeId);
@@ -438,6 +426,7 @@ public RegisterNodeManagerResponse registerNodeManager(
this.rmContext.getRMNodes().put(nodeId, rmNode);
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMNodeStartedEvent(nodeId, null, null));
+
} else {
// Reset heartbeat ID since node just restarted.
oldNode.resetLastNodeHeartBeatResponse();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index 14bc0daf91..3cbde01abe 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -866,12 +866,6 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
rmNode.context.getDispatcher().getEventHandler().handle(
new NodesListManagerEvent(
NodesListManagerEventType.NODE_USABLE, rmNode));
- List logAggregationReportsForApps =
- startEvent.getLogAggregationReportsForApps();
- if (logAggregationReportsForApps != null
- && !logAggregationReportsForApps.isEmpty()) {
- rmNode.handleLogAggregationStatus(logAggregationReportsForApps);
- }
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartedEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartedEvent.java
index 397699453f..4fc983ab7d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartedEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartedEvent.java
@@ -22,14 +22,12 @@
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
public class RMNodeStartedEvent extends RMNodeEvent {
private List containerStatuses;
private List runningApplications;
- private List logAggregationReportsForApps;
public RMNodeStartedEvent(NodeId nodeId,
List containerReports,
@@ -46,13 +44,4 @@ public List getNMContainerStatuses() {
public List getRunningApplications() {
return runningApplications;
}
-
- public List getLogAggregationReportsForApps() {
- return this.logAggregationReportsForApps;
- }
-
- public void setLogAggregationReportsForApps(
- List logAggregationReportsForApps) {
- this.logAggregationReportsForApps = logAggregationReportsForApps;
- }
}