From 15ccd967ee3e7046a50522089f67ba01f36ec76a Mon Sep 17 00:00:00 2001 From: Junping Du Date: Thu, 14 May 2015 10:57:36 -0700 Subject: [PATCH] YARN-3505. Node's Log Aggregation Report with SUCCEED should not cached in RMApps. Contributed by Xuan Gong. --- hadoop-yarn-project/CHANGES.txt | 3 + .../api/records/LogAggregationStatus.java | 2 + .../hadoop/yarn/conf/YarnConfiguration.java | 10 + .../src/main/proto/yarn_protos.proto | 1 + .../src/main/resources/yarn-default.xml | 8 + .../protocolrecords/LogAggregationReport.java | 16 +- .../protocolrecords/NodeHeartbeatRequest.java | 7 +- .../impl/pb/LogAggregationReportPBImpl.java | 40 --- .../impl/pb/NodeHeartbeatRequestPBImpl.java | 82 +++---- .../hadoop/yarn/server/webapp/AppBlock.java | 19 +- .../yarn_server_common_service_protos.proto | 14 +- .../nodemanager/NodeStatusUpdaterImpl.java | 46 +--- .../logaggregation/AppLogAggregatorImpl.java | 23 +- .../resourcemanager/rmapp/RMAppImpl.java | 228 ++++++++++++++---- .../resourcemanager/rmnode/RMNodeImpl.java | 13 +- .../rmnode/RMNodeStatusEvent.java | 11 +- .../resourcemanager/webapp/RMAppBlock.java | 11 +- .../RMAppLogAggregationStatusBlock.java | 37 ++- .../TestRMAppLogAggregationStatus.java | 181 ++++++++++---- 19 files changed, 471 insertions(+), 281 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 0346c5432b..e0f2c524ab 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -111,6 +111,9 @@ Release 2.8.0 - UNRELEASED YARN-3448. Added a rolling time-to-live LevelDB timeline store implementation. (Jonathan Eagles via zjshen) + YARN-3505. Node's Log Aggregation Report with SUCCEED should not cached in + RMApps. (Xuan Gong via junping_du) + IMPROVEMENTS YARN-644. Basic null check is not performed on passed in arguments before diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationStatus.java index da1230c27f..1e10972e8e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationStatus.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationStatus.java @@ -34,6 +34,8 @@ public enum LogAggregationStatus { /** Log Aggregation is Running. */ RUNNING, + /** Log Aggregation is Running, but has failures in previous cycles. */ + RUNNING_WITH_FAILURE, /** * Log Aggregation is Succeeded. All of the logs have been aggregated * successfully. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 94f3e603ca..52fff149ce 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -718,6 +718,16 @@ private static void addDeprecatedKeys() { + "proxy-user-privileges.enabled"; public static boolean DEFAULT_RM_PROXY_USER_PRIVILEGES_ENABLED = false; + /** + * How many diagnostics/failure messages can be saved in RM for + * log aggregation. It also defines the number of diagnostics/failure + * messages can be shown in log aggregation web ui. + */ + public static final String RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY = + RM_PREFIX + "max-log-aggregation-diagnostics-in-memory"; + public static final int DEFAULT_RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY = + 10; + /** Whether to enable log aggregation */ public static final String LOG_AGGREGATION_ENABLED = YARN_PREFIX + "log-aggregation-enable"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index c45081ac3c..4095676ba3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -204,6 +204,7 @@ enum LogAggregationStatusProto { LOG_SUCCEEDED = 4; LOG_FAILED = 5; LOG_TIME_OUT = 6; + LOG_RUNNING_WITH_FAILURE = 7; } message ApplicationAttemptReportProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 4d74f7622f..1dd88bdcd0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -674,6 +674,14 @@ 10 + + Number of diagnostics/failure messages can be saved in RM for + log aggregation. It also defines the number of diagnostics/failure + messages can be shown in log aggregation web ui. + yarn.resourcemanager.max-log-aggregation-diagnostics-in-memory + 10 + + The hostname of the NM. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/LogAggregationReport.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/LogAggregationReport.java index b2270d82fe..d76f4cde50 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/LogAggregationReport.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/LogAggregationReport.java @@ -22,7 +22,6 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.LogAggregationStatus; -import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.util.Records; /** @@ -32,7 +31,6 @@ * It includes details such as: * @@ -45,7 +43,7 @@ public abstract class LogAggregationReport { @Public @Unstable public static LogAggregationReport newInstance(ApplicationId appId, - NodeId nodeId, LogAggregationStatus status, String diagnosticMessage) { + LogAggregationStatus status, String diagnosticMessage) { LogAggregationReport report = Records.newRecord(LogAggregationReport.class); report.setApplicationId(appId); report.setLogAggregationStatus(status); @@ -65,18 +63,6 @@ public static LogAggregationReport newInstance(ApplicationId appId, @Unstable public abstract void setApplicationId(ApplicationId appId); - /** - * Get the NodeId. - * @return NodeId - */ - @Public - @Unstable - public abstract NodeId getNodeId(); - - @Public - @Unstable - public abstract void setNodeId(NodeId nodeId); - /** * Get the LogAggregationStatus. * @return LogAggregationStatus diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java index 227363fec7..767e4b0c46 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java @@ -18,10 +18,9 @@ package org.apache.hadoop.yarn.server.api.protocolrecords; -import java.util.Map; +import java.util.List; import java.util.Set; -import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.util.Records; @@ -54,9 +53,9 @@ public static NodeHeartbeatRequest newInstance(NodeStatus nodeStatus, public abstract Set getNodeLabels(); public abstract void setNodeLabels(Set nodeLabels); - public abstract Map + public abstract List getLogAggregationReportsForApps(); public abstract void setLogAggregationReportsForApps( - Map logAggregationReportsForApps); + List logAggregationReportsForApps); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/LogAggregationReportPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/LogAggregationReportPBImpl.java index 75b6eab6fc..ac6ad2e89f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/LogAggregationReportPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/LogAggregationReportPBImpl.java @@ -22,13 +22,10 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.LogAggregationStatus; -import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; -import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationStatusProto; -import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProtoOrBuilder; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; @@ -45,7 +42,6 @@ public class LogAggregationReportPBImpl extends LogAggregationReport { boolean viaProto = false; private ApplicationId applicationId; - private NodeId nodeId; public LogAggregationReportPBImpl() { builder = LogAggregationReportProto.newBuilder(); @@ -89,12 +85,6 @@ private void mergeLocalToBuilder() { builder.getApplicationId())) { builder.setApplicationId(convertToProtoFormat(this.applicationId)); } - - if (this.nodeId != null - && !((NodeIdPBImpl) this.nodeId).getProto().equals( - builder.getNodeId())) { - builder.setNodeId(convertToProtoFormat(this.nodeId)); - } } private void mergeLocalToProto() { @@ -191,34 +181,4 @@ public void setDiagnosticMessage(String diagnosticMessage) { } builder.setDiagnostics(diagnosticMessage); } - - @Override - public NodeId getNodeId() { - if (this.nodeId != null) { - return this.nodeId; - } - - LogAggregationReportProtoOrBuilder p = viaProto ? proto : builder; - if (!p.hasNodeId()) { - return null; - } - this.nodeId = convertFromProtoFormat(p.getNodeId()); - return this.nodeId; - } - - @Override - public void setNodeId(NodeId nodeId) { - maybeInitBuilder(); - if (nodeId == null) - builder.clearNodeId(); - this.nodeId = nodeId; - } - - private NodeIdProto convertToProtoFormat(NodeId t) { - return ((NodeIdPBImpl) t).getProto(); - } - - private NodeIdPBImpl convertFromProtoFormat(NodeIdProto nodeId) { - return new NodeIdPBImpl(nodeId); - } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java index 03db39ce63..81f173d85d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java @@ -18,21 +18,16 @@ package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; -import java.util.HashMap; +import java.util.ArrayList; import java.util.HashSet; +import java.util.Iterator; import java.util.List; -import java.util.Map; -import java.util.Map.Entry; import java.util.Set; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; -import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.StringArrayProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProto; -import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportsForAppsProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProtoOrBuilder; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; @@ -51,9 +46,8 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest { private MasterKey lastKnownContainerTokenMasterKey = null; private MasterKey lastKnownNMTokenMasterKey = null; private Set labels = null; - private Map - logAggregationReportsForApps = null; - + private List logAggregationReportsForApps = null; + public NodeHeartbeatRequestPBImpl() { builder = NodeHeartbeatRequestProto.newBuilder(); } @@ -110,12 +104,35 @@ private void mergeLocalToBuilder() { private void addLogAggregationStatusForAppsToProto() { maybeInitBuilder(); builder.clearLogAggregationReportsForApps(); - for (Entry entry : logAggregationReportsForApps - .entrySet()) { - builder.addLogAggregationReportsForApps(LogAggregationReportsForAppsProto - .newBuilder().setAppId(convertToProtoFormat(entry.getKey())) - .setLogAggregationReport(convertToProtoFormat(entry.getValue()))); + if (this.logAggregationReportsForApps == null) { + return; } + Iterable it = + new Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + private Iterator iter = + logAggregationReportsForApps.iterator(); + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public LogAggregationReportProto next() { + return convertToProtoFormat(iter.next()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + builder.addAllLogAggregationReportsForApps(it); } private LogAggregationReportProto convertToProtoFormat( @@ -246,17 +263,8 @@ private void initNodeLabels() { labels = new HashSet(nodeLabels.getElementsList()); } - private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) { - return new ApplicationIdPBImpl(p); - } - - private ApplicationIdProto convertToProtoFormat(ApplicationId t) { - return ((ApplicationIdPBImpl) t).getProto(); - } - @Override - public Map - getLogAggregationReportsForApps() { + public List getLogAggregationReportsForApps() { if (this.logAggregationReportsForApps != null) { return this.logAggregationReportsForApps; } @@ -266,15 +274,11 @@ private ApplicationIdProto convertToProtoFormat(ApplicationId t) { private void initLogAggregationReportsForApps() { NodeHeartbeatRequestProtoOrBuilder p = viaProto ? proto : builder; - List list = + List list = p.getLogAggregationReportsForAppsList(); - this.logAggregationReportsForApps = - new HashMap(); - for (LogAggregationReportsForAppsProto c : list) { - ApplicationId appId = convertFromProtoFormat(c.getAppId()); - LogAggregationReport report = - convertFromProtoFormat(c.getLogAggregationReport()); - this.logAggregationReportsForApps.put(appId, report); + this.logAggregationReportsForApps = new ArrayList(); + for (LogAggregationReportProto c : list) { + this.logAggregationReportsForApps.add(convertFromProtoFormat(c)); } } @@ -285,14 +289,10 @@ private LogAggregationReport convertFromProtoFormat( @Override public void setLogAggregationReportsForApps( - Map logAggregationStatusForApps) { - if (logAggregationStatusForApps == null - || logAggregationStatusForApps.isEmpty()) { - return; + List logAggregationStatusForApps) { + if(logAggregationStatusForApps == null) { + builder.clearLogAggregationReportsForApps(); } - maybeInitBuilder(); - this.logAggregationReportsForApps = - new HashMap(); - this.logAggregationReportsForApps.putAll(logAggregationStatusForApps); + this.logAggregationReportsForApps = logAggregationStatusForApps; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/AppBlock.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/AppBlock.java index dd5a4c8363..f46197ef08 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/AppBlock.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/AppBlock.java @@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.LogAggregationStatus; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.ContainerNotFoundException; @@ -192,8 +193,17 @@ public ApplicationReport run() throws Exception { : "ApplicationMaster"); if (webUiType != null && webUiType.equals(YarnWebParams.RM_WEB_UI)) { - overviewTable._("Log Aggregation Status", - root_url("logaggregationstatus", app.getAppId()), "Status"); + LogAggregationStatus status = getLogAggregationStatus(); + if (status == null) { + overviewTable._("Log Aggregation Status", "N/A"); + } else if (status == LogAggregationStatus.DISABLED + || status == LogAggregationStatus.NOT_START + || status == LogAggregationStatus.SUCCEEDED) { + overviewTable._("Log Aggregation Status", status.name()); + } else { + overviewTable._("Log Aggregation Status", + root_url("logaggregationstatus", app.getAppId()), status.name()); + } } overviewTable._("Diagnostics:", app.getDiagnosticsInfo() == null ? "" : app.getDiagnosticsInfo()); @@ -342,4 +352,9 @@ private String clairfyAppFinalStatus(FinalApplicationStatus status) { protected void createApplicationMetricsTable(Block html) { } + + // This will be overrided in RMAppBlock + protected LogAggregationStatus getLogAggregationStatus() { + return null; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index d34c9f7e51..c027ac080e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -50,19 +50,13 @@ message NodeHeartbeatRequestProto { optional MasterKeyProto last_known_container_token_master_key = 2; optional MasterKeyProto last_known_nm_token_master_key = 3; optional StringArrayProto nodeLabels = 4; - repeated LogAggregationReportsForAppsProto log_aggregation_reports_for_apps = 5; -} - -message LogAggregationReportsForAppsProto { - optional ApplicationIdProto appId = 1; - optional LogAggregationReportProto log_aggregation_report = 2; + repeated LogAggregationReportProto log_aggregation_reports_for_apps = 5; } message LogAggregationReportProto { -optional ApplicationIdProto application_id = 1; -optional NodeIdProto node_id = 2; -optional LogAggregationStatusProto log_aggregation_status = 3; -optional string diagnostics = 4 [default = "N/A"]; + optional ApplicationIdProto application_id = 1; + optional LogAggregationStatusProto log_aggregation_status = 2; + optional string diagnostics = 3 [default = "N/A"]; } message NodeHeartbeatResponseProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 0eb7ff4ea4..8046228da8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -75,7 +75,6 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider; -import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.YarnVersionInfo; import com.google.common.annotations.VisibleForTesting; @@ -666,7 +665,7 @@ public void run() { if (logAggregationEnabled) { // pull log aggregation status for application running in this NM - Map logAggregationReports = + List logAggregationReports = getLogAggregationReportsForApps(context .getLogAggregationStatusForApps()); if (logAggregationReports != null @@ -810,47 +809,14 @@ private void updateMasterKeys(NodeHeartbeatResponse response) { statusUpdater.start(); } - private Map - getLogAggregationReportsForApps( - ConcurrentLinkedQueue lastestLogAggregationStatus) { - Map latestLogAggregationReports = - new HashMap(); + private List getLogAggregationReportsForApps( + ConcurrentLinkedQueue lastestLogAggregationStatus) { LogAggregationReport status; while ((status = lastestLogAggregationStatus.poll()) != null) { this.logAggregationReportForAppsTempList.add(status); } - for (LogAggregationReport logAggregationReport - : this.logAggregationReportForAppsTempList) { - LogAggregationReport report = null; - if (latestLogAggregationReports.containsKey(logAggregationReport - .getApplicationId())) { - report = - latestLogAggregationReports.get(logAggregationReport - .getApplicationId()); - report.setLogAggregationStatus(logAggregationReport - .getLogAggregationStatus()); - String message = report.getDiagnosticMessage(); - if (logAggregationReport.getDiagnosticMessage() != null - && !logAggregationReport.getDiagnosticMessage().isEmpty()) { - if (message != null) { - message += logAggregationReport.getDiagnosticMessage(); - } else { - message = logAggregationReport.getDiagnosticMessage(); - } - report.setDiagnosticMessage(message); - } - } else { - report = Records.newRecord(LogAggregationReport.class); - report.setApplicationId(logAggregationReport.getApplicationId()); - report.setNodeId(this.nodeId); - report.setLogAggregationStatus(logAggregationReport - .getLogAggregationStatus()); - report - .setDiagnosticMessage(logAggregationReport.getDiagnosticMessage()); - } - latestLogAggregationReports.put(logAggregationReport.getApplicationId(), - report); - } - return latestLogAggregationReports; + List reports = new ArrayList(); + reports.addAll(logAggregationReportForAppsTempList); + return reports; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java index 3111f100e2..dd2ab2571a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java @@ -306,6 +306,7 @@ private void uploadLogsForContainers(boolean appFinished) { + currentTime); String diagnosticMessage = ""; + boolean logAggregationSucceedInThisCycle = true; final boolean rename = uploadedLogsInThisCycle; try { userUgi.doAs(new PrivilegedExceptionAction() { @@ -338,20 +339,28 @@ public Object run() throws Exception { + LogAggregationUtils.getNodeString(nodeId) + " at " + Times.format(currentTime) + "\n"; renameTemporaryLogFileFailed = true; + logAggregationSucceedInThisCycle = false; } LogAggregationReport report = Records.newRecord(LogAggregationReport.class); report.setApplicationId(appId); - report.setNodeId(nodeId); report.setDiagnosticMessage(diagnosticMessage); - if (appFinished) { - report.setLogAggregationStatus(renameTemporaryLogFileFailed - ? LogAggregationStatus.FAILED : LogAggregationStatus.SUCCEEDED); - } else { - report.setLogAggregationStatus(LogAggregationStatus.RUNNING); - } + report.setLogAggregationStatus(logAggregationSucceedInThisCycle + ? LogAggregationStatus.RUNNING + : LogAggregationStatus.RUNNING_WITH_FAILURE); this.context.getLogAggregationStatusForApps().add(report); + if (appFinished) { + // If the app is finished, one extra final report with log aggregation + // status SUCCEEDED/FAILED will be sent to RM to inform the RM + // that the log aggregation in this NM is completed. + LogAggregationReport finalReport = + Records.newRecord(LogAggregationReport.class); + finalReport.setApplicationId(appId); + finalReport.setLogAggregationStatus(renameTemporaryLogFileFailed + ? LogAggregationStatus.FAILED : LogAggregationStatus.SUCCEEDED); + this.context.getLogAggregationStatusForApps().add(report); + } } finally { if (writer != null) { writer.close(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 8abc47802f..f3dacd6a49 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -22,12 +22,15 @@ import java.net.URI; import java.net.URISyntaxException; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; @@ -36,6 +39,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; +import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -152,6 +156,13 @@ public class RMAppImpl implements RMApp, Recoverable { private final Map logAggregationStatus = new HashMap(); private LogAggregationStatus logAggregationStatusForAppReport; + private int logAggregationSucceed = 0; + private int logAggregationFailed = 0; + private Map> logAggregationDiagnosticsForNMs = + new HashMap>(); + private Map> logAggregationFailureMessagesForNMs = + new HashMap>(); + private final int maxLogAggregationDiagnosticsInMemory; // These states stored are only valid when app is at killing or final_saving. private RMAppState stateBeforeKilling; @@ -437,6 +448,14 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext, this.logAggregationEnabled = conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED); + if (this.logAggregationEnabled) { + this.logAggregationStatusForAppReport = LogAggregationStatus.NOT_START; + } else { + this.logAggregationStatusForAppReport = LogAggregationStatus.DISABLED; + } + maxLogAggregationDiagnosticsInMemory = conf.getInt( + YarnConfiguration.RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY, + YarnConfiguration.DEFAULT_RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY); } @Override @@ -834,10 +853,9 @@ public void transition(RMAppImpl app, RMAppEvent event) { if (!app.logAggregationStatus.containsKey(nodeAddedEvent.getNodeId())) { app.logAggregationStatus.put(nodeAddedEvent.getNodeId(), - LogAggregationReport.newInstance(app.applicationId, nodeAddedEvent - .getNodeId(), app.logAggregationEnabled - ? LogAggregationStatus.NOT_START : LogAggregationStatus.DISABLED, - "")); + LogAggregationReport.newInstance(app.applicationId, + app.logAggregationEnabled ? LogAggregationStatus.NOT_START + : LogAggregationStatus.DISABLED, "")); } }; } @@ -1401,18 +1419,20 @@ public Map getLogAggregationReportsForApp() { Map outputs = new HashMap(); outputs.putAll(logAggregationStatus); - for (Entry output : outputs.entrySet()) { - if (!output.getValue().getLogAggregationStatus() - .equals(LogAggregationStatus.TIME_OUT) - && !output.getValue().getLogAggregationStatus() - .equals(LogAggregationStatus.SUCCEEDED) - && !output.getValue().getLogAggregationStatus() - .equals(LogAggregationStatus.FAILED) - && isAppInFinalState(this) - && System.currentTimeMillis() > this.logAggregationStartTime - + this.logAggregationStatusTimeout) { - output.getValue().setLogAggregationStatus( - LogAggregationStatus.TIME_OUT); + if (!isLogAggregationFinished()) { + for (Entry output : outputs.entrySet()) { + if (!output.getValue().getLogAggregationStatus() + .equals(LogAggregationStatus.TIME_OUT) + && !output.getValue().getLogAggregationStatus() + .equals(LogAggregationStatus.SUCCEEDED) + && !output.getValue().getLogAggregationStatus() + .equals(LogAggregationStatus.FAILED) + && isAppInFinalState(this) + && System.currentTimeMillis() > this.logAggregationStartTime + + this.logAggregationStatusTimeout) { + output.getValue().setLogAggregationStatus( + LogAggregationStatus.TIME_OUT); + } } } return outputs; @@ -1424,32 +1444,46 @@ && isAppInFinalState(this) public void aggregateLogReport(NodeId nodeId, LogAggregationReport report) { try { this.writeLock.lock(); - if (this.logAggregationEnabled) { + if (this.logAggregationEnabled && !isLogAggregationFinished()) { LogAggregationReport curReport = this.logAggregationStatus.get(nodeId); + boolean stateChangedToFinal = false; if (curReport == null) { this.logAggregationStatus.put(nodeId, report); + if (isLogAggregationFinishedForNM(report)) { + stateChangedToFinal = true; + } } else { - if (curReport.getLogAggregationStatus().equals( - LogAggregationStatus.TIME_OUT)) { - if (report.getLogAggregationStatus().equals( - LogAggregationStatus.SUCCEEDED) - || report.getLogAggregationStatus().equals( - LogAggregationStatus.FAILED)) { - curReport.setLogAggregationStatus(report - .getLogAggregationStatus()); + if (isLogAggregationFinishedForNM(report)) { + if (!isLogAggregationFinishedForNM(curReport)) { + stateChangedToFinal = true; } - } else { - curReport.setLogAggregationStatus(report.getLogAggregationStatus()); } - - if (report.getDiagnosticMessage() != null - && !report.getDiagnosticMessage().isEmpty()) { - curReport - .setDiagnosticMessage(curReport.getDiagnosticMessage() == null - ? report.getDiagnosticMessage() : curReport - .getDiagnosticMessage() + report.getDiagnosticMessage()); + if (report.getLogAggregationStatus() != LogAggregationStatus.RUNNING + || curReport.getLogAggregationStatus() != + LogAggregationStatus.RUNNING_WITH_FAILURE) { + if (curReport.getLogAggregationStatus() + == LogAggregationStatus.TIME_OUT + && report.getLogAggregationStatus() + == LogAggregationStatus.RUNNING) { + // If the log aggregation status got from latest nm heartbeat + // is Running, and current log aggregation status is TimeOut, + // based on whether there are any failure messages for this NM, + // we will reset the log aggregation status as RUNNING or + // RUNNING_WITH_FAILURE + if (logAggregationFailureMessagesForNMs.get(nodeId) != null && + !logAggregationFailureMessagesForNMs.get(nodeId).isEmpty()) { + report.setLogAggregationStatus( + LogAggregationStatus.RUNNING_WITH_FAILURE); + } + } + curReport.setLogAggregationStatus(report + .getLogAggregationStatus()); } } + updateLogAggregationDiagnosticMessages(nodeId, report); + if (isAppInFinalState(this) && stateChangedToFinal) { + updateLogAggregationStatus(nodeId); + } } } finally { this.writeLock.unlock(); @@ -1458,29 +1492,32 @@ public void aggregateLogReport(NodeId nodeId, LogAggregationReport report) { @Override public LogAggregationStatus getLogAggregationStatusForAppReport() { - if (!logAggregationEnabled) { - return LogAggregationStatus.DISABLED; - } - if (this.logAggregationStatusForAppReport == LogAggregationStatus.FAILED - || this.logAggregationStatusForAppReport == LogAggregationStatus.SUCCEEDED) { - return this.logAggregationStatusForAppReport; - } try { this.readLock.lock(); + if (! logAggregationEnabled) { + return LogAggregationStatus.DISABLED; + } + if (isLogAggregationFinished()) { + return this.logAggregationStatusForAppReport; + } Map reports = getLogAggregationReportsForApp(); if (reports.size() == 0) { - return null; + return this.logAggregationStatusForAppReport; } int logNotStartCount = 0; int logCompletedCount = 0; int logTimeOutCount = 0; int logFailedCount = 0; + int logRunningWithFailure = 0; for (Entry report : reports.entrySet()) { switch (report.getValue().getLogAggregationStatus()) { case NOT_START: logNotStartCount++; break; + case RUNNING_WITH_FAILURE: + logRunningWithFailure ++; + break; case SUCCEEDED: logCompletedCount++; break; @@ -1506,19 +1543,122 @@ public LogAggregationStatus getLogAggregationStatusForAppReport() { // the log aggregation is finished. And the log aggregation status will // not be updated anymore. if (logFailedCount > 0 && isAppInFinalState(this)) { - this.logAggregationStatusForAppReport = LogAggregationStatus.FAILED; return LogAggregationStatus.FAILED; } else if (logTimeOutCount > 0) { return LogAggregationStatus.TIME_OUT; } if (isAppInFinalState(this)) { - this.logAggregationStatusForAppReport = LogAggregationStatus.SUCCEEDED; return LogAggregationStatus.SUCCEEDED; } + } else if (logRunningWithFailure > 0) { + return LogAggregationStatus.RUNNING_WITH_FAILURE; } return LogAggregationStatus.RUNNING; } finally { this.readLock.unlock(); } } + + private boolean isLogAggregationFinished() { + return this.logAggregationStatusForAppReport + .equals(LogAggregationStatus.SUCCEEDED) + || this.logAggregationStatusForAppReport + .equals(LogAggregationStatus.FAILED); + + } + + private boolean isLogAggregationFinishedForNM(LogAggregationReport report) { + return report.getLogAggregationStatus() == LogAggregationStatus.SUCCEEDED + || report.getLogAggregationStatus() == LogAggregationStatus.FAILED; + } + + private void updateLogAggregationDiagnosticMessages(NodeId nodeId, + LogAggregationReport report) { + if (report.getDiagnosticMessage() != null + && !report.getDiagnosticMessage().isEmpty()) { + if (report.getLogAggregationStatus() + == LogAggregationStatus.RUNNING ) { + List diagnostics = logAggregationDiagnosticsForNMs.get(nodeId); + if (diagnostics == null) { + diagnostics = new ArrayList(); + logAggregationDiagnosticsForNMs.put(nodeId, diagnostics); + } else { + if (diagnostics.size() + == maxLogAggregationDiagnosticsInMemory) { + diagnostics.remove(0); + } + } + diagnostics.add(report.getDiagnosticMessage()); + this.logAggregationStatus.get(nodeId).setDiagnosticMessage( + StringUtils.join(diagnostics, "\n")); + } else if (report.getLogAggregationStatus() + == LogAggregationStatus.RUNNING_WITH_FAILURE) { + List failureMessages = + logAggregationFailureMessagesForNMs.get(nodeId); + if (failureMessages == null) { + failureMessages = new ArrayList(); + logAggregationFailureMessagesForNMs.put(nodeId, failureMessages); + } else { + if (failureMessages.size() + == maxLogAggregationDiagnosticsInMemory) { + failureMessages.remove(0); + } + } + failureMessages.add(report.getDiagnosticMessage()); + } + } + } + + private void updateLogAggregationStatus(NodeId nodeId) { + LogAggregationStatus status = + this.logAggregationStatus.get(nodeId).getLogAggregationStatus(); + if (status.equals(LogAggregationStatus.SUCCEEDED)) { + this.logAggregationSucceed++; + } else if (status.equals(LogAggregationStatus.FAILED)) { + this.logAggregationFailed++; + } + if (this.logAggregationSucceed == this.logAggregationStatus.size()) { + this.logAggregationStatusForAppReport = + LogAggregationStatus.SUCCEEDED; + // Since the log aggregation status for this application for all NMs + // is SUCCEEDED, it means all logs are aggregated successfully. + // We could remove all the cached log aggregation reports + this.logAggregationStatus.clear(); + this.logAggregationDiagnosticsForNMs.clear(); + this.logAggregationFailureMessagesForNMs.clear(); + } else if (this.logAggregationSucceed + this.logAggregationFailed + == this.logAggregationStatus.size()) { + this.logAggregationStatusForAppReport = LogAggregationStatus.FAILED; + // We have collected the log aggregation status for all NMs. + // The log aggregation status is FAILED which means the log + // aggregation fails in some NMs. We are only interested in the + // nodes where the log aggregation is failed. So we could remove + // the log aggregation details for those succeeded NMs + for (Iterator> it = + this.logAggregationStatus.entrySet().iterator(); it.hasNext();) { + Map.Entry entry = it.next(); + if (entry.getValue().getLogAggregationStatus() + .equals(LogAggregationStatus.SUCCEEDED)) { + it.remove(); + } + } + // the log aggregation has finished/failed. + // and the status will not be updated anymore. + this.logAggregationDiagnosticsForNMs.clear(); + } + } + + public String getLogAggregationFailureMessagesForNM(NodeId nodeId) { + try { + this.readLock.lock(); + List failureMessages = + this.logAggregationFailureMessagesForNMs.get(nodeId); + if (failureMessages == null || failureMessages.isEmpty()) { + return StringUtils.EMPTY; + } + return StringUtils.join(failureMessages, "\n"); + } finally { + this.readLock.unlock(); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 3be1867c9a..a11aacf042 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -22,8 +22,6 @@ import java.util.EnumSet; import java.util.HashSet; import java.util.List; -import java.util.Map; -import java.util.Map.Entry; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.ConcurrentLinkedQueue; @@ -777,7 +775,7 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { rmNode.handleContainerStatus(statusEvent.getContainers()); - Map logAggregationReportsForApps = + List logAggregationReportsForApps = statusEvent.getLogAggregationReportsForApps(); if (logAggregationReportsForApps != null && !logAggregationReportsForApps.isEmpty()) { @@ -915,12 +913,11 @@ private void handleContainerStatus(List containerStatuses) { } private void handleLogAggregationStatus( - Map logAggregationReportsForApps) { - for (Entry report : - logAggregationReportsForApps.entrySet()) { - RMApp rmApp = this.context.getRMApps().get(report.getKey()); + List logAggregationReportsForApps) { + for (LogAggregationReport report : logAggregationReportsForApps) { + RMApp rmApp = this.context.getRMApps().get(report.getApplicationId()); if (rmApp != null) { - ((RMAppImpl)rmApp).aggregateLogReport(this.nodeId, report.getValue()); + ((RMAppImpl)rmApp).aggregateLogReport(this.nodeId, report); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java index 4bbf6100df..b95d7d3e77 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java @@ -19,8 +19,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmnode; import java.util.List; -import java.util.Map; - import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; @@ -34,7 +32,7 @@ public class RMNodeStatusEvent extends RMNodeEvent { private final List containersCollection; private final NodeHeartbeatResponse latestResponse; private final List keepAliveAppIds; - private Map logAggregationReportsForApps; + private List logAggregationReportsForApps; public RMNodeStatusEvent(NodeId nodeId, NodeHealthStatus nodeHealthStatus, List collection, List keepAliveAppIds, @@ -50,7 +48,7 @@ public RMNodeStatusEvent(NodeId nodeId, NodeHealthStatus nodeHealthStatus, public RMNodeStatusEvent(NodeId nodeId, NodeHealthStatus nodeHealthStatus, List collection, List keepAliveAppIds, NodeHeartbeatResponse latestResponse, - Map logAggregationReportsForApps) { + List logAggregationReportsForApps) { super(nodeId, RMNodeEventType.STATUS_UPDATE); this.nodeHealthStatus = nodeHealthStatus; this.containersCollection = collection; @@ -75,13 +73,12 @@ public List getKeepAliveAppIds() { return this.keepAliveAppIds; } - public Map - getLogAggregationReportsForApps() { + public List getLogAggregationReportsForApps() { return this.logAggregationReportsForApps; } public void setLogAggregationReportsForApps( - Map logAggregationReportsForApps) { + List logAggregationReportsForApps) { this.logAggregationReportsForApps = logAggregationReportsForApps; } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppBlock.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppBlock.java index 43e26be1fc..38e0e3bf72 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppBlock.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppBlock.java @@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; +import org.apache.hadoop.yarn.api.records.LogAggregationStatus; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -34,7 +35,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptInfo; import org.apache.hadoop.yarn.server.webapp.AppBlock; -import org.apache.hadoop.yarn.util.Times; import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.webapp.hamlet.Hamlet; import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.DIV; @@ -170,4 +170,13 @@ protected void generateApplicationTable(Block html, tbody._()._(); } + + @Override + protected LogAggregationStatus getLogAggregationStatus() { + RMApp rmApp = this.rm.getRMContext().getRMApps().get(appID); + if (rmApp == null) { + return null; + } + return rmApp.getLogAggregationStatusForAppReport(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppLogAggregationStatusBlock.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppLogAggregationStatusBlock.java index a2f61e348b..f7f7c97178 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppLogAggregationStatusBlock.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppLogAggregationStatusBlock.java @@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.util.Apps; import org.apache.hadoop.yarn.webapp.hamlet.Hamlet; import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.DIV; @@ -93,6 +94,9 @@ protected void render(Block html) { .td("Log Aggregation does not Start.")._(); table_description.tr().td(LogAggregationStatus.RUNNING.name()) .td("Log Aggregation is Running.")._(); + table_description.tr().td(LogAggregationStatus.RUNNING_WITH_FAILURE.name()) + .td("Log Aggregation is Running, but has failures " + + "in previous cycles")._(); table_description.tr().td(LogAggregationStatus.SUCCEEDED.name()) .td("Log Aggregation is Succeeded. All of the logs have been " + "aggregated successfully.")._(); @@ -106,24 +110,29 @@ protected void render(Block html) { table_description._(); div_description._(); - boolean logAggregationEnabled = - conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, - YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED); + RMApp rmApp = rm.getRMContext().getRMApps().get(appId); // Application Log aggregation status Table DIV div = html.div(_INFO_WRAP); TABLE> table = div.h3( "Log Aggregation: " - + (logAggregationEnabled ? "Enabled" : "Disabled")).table( + + (rmApp == null ? "N/A" : rmApp + .getLogAggregationStatusForAppReport() == null ? "N/A" : rmApp + .getLogAggregationStatusForAppReport().name())).table( "#LogAggregationStatus"); - table. - tr(). - th(_TH, "NodeId"). - th(_TH, "Log Aggregation Status"). - th(_TH, "Diagnostis Message"). - _(); - RMApp rmApp = rm.getRMContext().getRMApps().get(appId); + int maxLogAggregationDiagnosticsInMemory = conf.getInt( + YarnConfiguration.RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY, + YarnConfiguration.DEFAULT_RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY); + table + .tr() + .th(_TH, "NodeId") + .th(_TH, "Log Aggregation Status") + .th(_TH, "Last " + + maxLogAggregationDiagnosticsInMemory + " Diagnostic Messages") + .th(_TH, "Last " + + maxLogAggregationDiagnosticsInMemory + " Failure Messages")._(); + if (rmApp != null) { Map logAggregationReports = rmApp.getLogAggregationReportsForApp(); @@ -136,10 +145,14 @@ protected void render(Block html) { String message = report.getValue() == null ? null : report.getValue() .getDiagnosticMessage(); + String failureMessage = + report.getValue() == null ? null : ((RMAppImpl)rmApp) + .getLogAggregationFailureMessagesForNM(report.getKey()); table.tr() .td(report.getKey().toString()) .td(status == null ? "N/A" : status.toString()) - .td(message == null ? "N/A" : message)._(); + .td(message == null ? "N/A" : message) + .td(failureMessage == null ? "N/A" : failureMessage)._(); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java index 4eec63f679..9af4290f5d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java @@ -23,7 +23,7 @@ import static org.mockito.Mockito.mock; import java.util.ArrayList; -import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -155,26 +155,26 @@ public void testLogAggregationStatus() throws Exception { .getLogAggregationStatus()); } - Map node1ReportForApp = - new HashMap(); + List node1ReportForApp = + new ArrayList(); String messageForNode1_1 = "node1 logAggregation status updated at " + System.currentTimeMillis(); LogAggregationReport report1 = - LogAggregationReport.newInstance(appId, nodeId1, - LogAggregationStatus.RUNNING, messageForNode1_1); - node1ReportForApp.put(appId, report1); + LogAggregationReport.newInstance(appId, LogAggregationStatus.RUNNING, + messageForNode1_1); + node1ReportForApp.add(report1); node1.handle(new RMNodeStatusEvent(node1.getNodeID(), NodeHealthStatus .newInstance(true, null, 0), new ArrayList(), null, null, node1ReportForApp)); - Map node2ReportForApp = - new HashMap(); + List node2ReportForApp = + new ArrayList(); String messageForNode2_1 = "node2 logAggregation status updated at " + System.currentTimeMillis(); LogAggregationReport report2 = - LogAggregationReport.newInstance(appId, nodeId2, + LogAggregationReport.newInstance(appId, LogAggregationStatus.RUNNING, messageForNode2_1); - node2ReportForApp.put(appId, report2); + node2ReportForApp.add(report2); node2.handle(new RMNodeStatusEvent(node2.getNodeID(), NodeHealthStatus .newInstance(true, null, 0), new ArrayList(), null, null, node2ReportForApp)); @@ -205,14 +205,14 @@ public void testLogAggregationStatus() throws Exception { } // node1 updates its log aggregation status again - Map node1ReportForApp2 = - new HashMap(); + List node1ReportForApp2 = + new ArrayList(); String messageForNode1_2 = "node1 logAggregation status updated at " + System.currentTimeMillis(); LogAggregationReport report1_2 = - LogAggregationReport.newInstance(appId, nodeId1, + LogAggregationReport.newInstance(appId, LogAggregationStatus.RUNNING, messageForNode1_2); - node1ReportForApp2.put(appId, report1_2); + node1ReportForApp2.add(report1_2); node1.handle(new RMNodeStatusEvent(node1.getNodeID(), NodeHealthStatus .newInstance(true, null, 0), new ArrayList(), null, null, node1ReportForApp2)); @@ -230,8 +230,9 @@ public void testLogAggregationStatus() throws Exception { if (report.getKey().equals(node1.getNodeID())) { Assert.assertEquals(LogAggregationStatus.RUNNING, report.getValue() .getLogAggregationStatus()); - Assert.assertEquals(messageForNode1_1 + messageForNode1_2, report - .getValue().getDiagnosticMessage()); + Assert.assertEquals( + messageForNode1_1 + "\n" + messageForNode1_2, report + .getValue().getDiagnosticMessage()); } else if (report.getKey().equals(node2.getNodeID())) { Assert.assertEquals(LogAggregationStatus.RUNNING, report.getValue() .getLogAggregationStatus()); @@ -268,15 +269,19 @@ public void testLogAggregationStatus() throws Exception { // Finally, node1 finished its log aggregation and sent out its final // log aggregation status. The log aggregation status for node1 should // be changed from TIME_OUT to SUCCEEDED - Map node1ReportForApp3 = - new HashMap(); - String messageForNode1_3 = - "node1 final logAggregation status updated at " - + System.currentTimeMillis(); - LogAggregationReport report1_3 = - LogAggregationReport.newInstance(appId, nodeId1, - LogAggregationStatus.SUCCEEDED, messageForNode1_3); - node1ReportForApp3.put(appId, report1_3); + List node1ReportForApp3 = + new ArrayList(); + LogAggregationReport report1_3; + for (int i = 0; i < 10 ; i ++) { + report1_3 = + LogAggregationReport.newInstance(appId, + LogAggregationStatus.RUNNING, "test_message_" + i); + node1ReportForApp3.add(report1_3); + } + node1ReportForApp3.add(LogAggregationReport.newInstance(appId, + LogAggregationStatus.SUCCEEDED, "")); + // For every logAggregationReport cached in memory, we can only save at most + // 10 diagnostic messages/failure messages node1.handle(new RMNodeStatusEvent(node1.getNodeID(), NodeHealthStatus .newInstance(true, null, 0), new ArrayList(), null, null, node1ReportForApp3)); @@ -290,8 +295,14 @@ public void testLogAggregationStatus() throws Exception { if (report.getKey().equals(node1.getNodeID())) { Assert.assertEquals(LogAggregationStatus.SUCCEEDED, report.getValue() .getLogAggregationStatus()); - Assert.assertEquals(messageForNode1_1 + messageForNode1_2 - + messageForNode1_3, report.getValue().getDiagnosticMessage()); + StringBuilder builder = new StringBuilder(); + for (int i = 0; i < 9; i ++) { + builder.append("test_message_" + i); + builder.append("\n"); + } + builder.append("test_message_" + 9); + Assert.assertEquals(builder.toString(), report.getValue() + .getDiagnosticMessage()); } else if (report.getKey().equals(node2.getNodeID())) { Assert.assertEquals(LogAggregationStatus.TIME_OUT, report.getValue() .getLogAggregationStatus()); @@ -301,6 +312,32 @@ public void testLogAggregationStatus() throws Exception { .fail("should not contain log aggregation report for other nodes"); } } + + // update log aggregationStatus for node2 as FAILED, + // so the log aggregation status for the App will become FAILED, + // and we only keep the log aggregation reports whose status is FAILED, + // so the log aggregation report for node1 will be removed. + List node2ReportForApp2 = + new ArrayList(); + LogAggregationReport report2_2 = + LogAggregationReport.newInstance(appId, + LogAggregationStatus.RUNNING_WITH_FAILURE, "Fail_Message"); + LogAggregationReport report2_3 = + LogAggregationReport.newInstance(appId, + LogAggregationStatus.FAILED, ""); + node2ReportForApp2.add(report2_2); + node2ReportForApp2.add(report2_3); + node2.handle(new RMNodeStatusEvent(node2.getNodeID(), NodeHealthStatus + .newInstance(true, null, 0), new ArrayList(), null, + null, node2ReportForApp2)); + Assert.assertEquals(LogAggregationStatus.FAILED, + rmApp.getLogAggregationStatusForAppReport()); + logAggregationStatus = rmApp.getLogAggregationReportsForApp(); + Assert.assertTrue(logAggregationStatus.size() == 1); + Assert.assertTrue(logAggregationStatus.containsKey(node2.getNodeID())); + Assert.assertTrue(!logAggregationStatus.containsKey(node1.getNodeID())); + Assert.assertEquals("Fail_Message", + ((RMAppImpl)rmApp).getLogAggregationFailureMessagesForNM(nodeId2)); } @Test (timeout = 10000) @@ -317,9 +354,11 @@ public void testGetLogAggregationStatusForAppReport() { // Enable the log aggregation conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true); rmApp = (RMAppImpl)createRMApp(conf); - // If we do not know any NodeManagers for this application , - // the log aggregation status will return null - Assert.assertNull(rmApp.getLogAggregationStatusForAppReport()); + // If we do not know any NodeManagers for this application , and + // the log aggregation is enabled, the log aggregation status will + // return NOT_START + Assert.assertEquals(LogAggregationStatus.NOT_START, + rmApp.getLogAggregationStatusForAppReport()); NodeId nodeId1 = NodeId.newInstance("localhost", 1111); NodeId nodeId2 = NodeId.newInstance("localhost", 2222); @@ -329,24 +368,24 @@ public void testGetLogAggregationStatusForAppReport() { // If the log aggregation status for all NMs are NOT_START, // the log aggregation status for this app will return NOT_START rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance( - rmApp.getApplicationId(), nodeId1, LogAggregationStatus.NOT_START, "")); + rmApp.getApplicationId(), LogAggregationStatus.NOT_START, "")); rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance( - rmApp.getApplicationId(), nodeId1, LogAggregationStatus.NOT_START, "")); + rmApp.getApplicationId(), LogAggregationStatus.NOT_START, "")); rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance( - rmApp.getApplicationId(), nodeId1, LogAggregationStatus.NOT_START, "")); + rmApp.getApplicationId(), LogAggregationStatus.NOT_START, "")); rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance( - rmApp.getApplicationId(), nodeId1, LogAggregationStatus.NOT_START, "")); + rmApp.getApplicationId(), LogAggregationStatus.NOT_START, "")); Assert.assertEquals(LogAggregationStatus.NOT_START, rmApp.getLogAggregationStatusForAppReport()); rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance( - rmApp.getApplicationId(), nodeId1, LogAggregationStatus.NOT_START, "")); + rmApp.getApplicationId(), LogAggregationStatus.NOT_START, "")); rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance( - rmApp.getApplicationId(), nodeId1, LogAggregationStatus.RUNNING, "")); + rmApp.getApplicationId(), LogAggregationStatus.RUNNING, "")); rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance( - rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, "")); + rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, "")); rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance( - rmApp.getApplicationId(), nodeId1, LogAggregationStatus.FAILED, "")); + rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, "")); Assert.assertEquals(LogAggregationStatus.RUNNING, rmApp.getLogAggregationStatusForAppReport()); @@ -357,13 +396,13 @@ public void testGetLogAggregationStatusForAppReport() { // others are SUCCEEDED, the log aggregation status for this app will // return TIME_OUT rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance( - rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, "")); + rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, "")); rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance( - rmApp.getApplicationId(), nodeId1, LogAggregationStatus.TIME_OUT, "")); + rmApp.getApplicationId(), LogAggregationStatus.TIME_OUT, "")); rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance( - rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, "")); + rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, "")); rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance( - rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, "")); + rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, "")); Assert.assertEquals(LogAggregationStatus.TIME_OUT, rmApp.getLogAggregationStatusForAppReport()); @@ -371,17 +410,59 @@ public void testGetLogAggregationStatusForAppReport() { // is at the final state, the log aggregation status for this app will // return SUCCEEDED rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance( - rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, "")); + rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, "")); rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance( - rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, "")); + rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, "")); rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance( - rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, "")); + rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, "")); rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance( - rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, "")); + rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, "")); Assert.assertEquals(LogAggregationStatus.SUCCEEDED, rmApp.getLogAggregationStatusForAppReport()); rmApp = (RMAppImpl)createRMApp(conf); + // If the log aggregation status for at least one of NMs are RUNNING, + // the log aggregation status for this app will return RUNNING + rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance( + rmApp.getApplicationId(), LogAggregationStatus.NOT_START, "")); + rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance( + rmApp.getApplicationId(), LogAggregationStatus.RUNNING, "")); + rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance( + rmApp.getApplicationId(), LogAggregationStatus.NOT_START, "")); + rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance( + rmApp.getApplicationId(), LogAggregationStatus.NOT_START, "")); + Assert.assertEquals(LogAggregationStatus.RUNNING, + rmApp.getLogAggregationStatusForAppReport()); + + // If the log aggregation status for at least one of NMs + // are RUNNING_WITH_FAILURE, the log aggregation status + // for this app will return RUNNING_WITH_FAILURE + rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance( + rmApp.getApplicationId(), LogAggregationStatus.NOT_START, "")); + rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance( + rmApp.getApplicationId(), LogAggregationStatus.RUNNING, "")); + rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance( + rmApp.getApplicationId(), LogAggregationStatus.NOT_START, "")); + rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance( + rmApp.getApplicationId(), LogAggregationStatus.RUNNING_WITH_FAILURE, + "")); + Assert.assertEquals(LogAggregationStatus.RUNNING_WITH_FAILURE, + rmApp.getLogAggregationStatusForAppReport()); + + // For node4, the previous log aggregation status is RUNNING_WITH_FAILURE, + // it will not be changed even it get a new log aggregation status + // as RUNNING + rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance( + rmApp.getApplicationId(), LogAggregationStatus.NOT_START, "")); + rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance( + rmApp.getApplicationId(), LogAggregationStatus.RUNNING, "")); + rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance( + rmApp.getApplicationId(), LogAggregationStatus.NOT_START, "")); + rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance( + rmApp.getApplicationId(), LogAggregationStatus.RUNNING, "")); + Assert.assertEquals(LogAggregationStatus.RUNNING_WITH_FAILURE, + rmApp.getLogAggregationStatusForAppReport()); + rmApp.handle(new RMAppEvent(rmApp.getApplicationId(), RMAppEventType.KILL)); Assert.assertTrue(RMAppImpl.isAppInFinalState(rmApp)); // If at least of one log aggregation status for one NM is FAILED, @@ -389,13 +470,13 @@ public void testGetLogAggregationStatusForAppReport() { // at the final state, the log aggregation status for this app // will return FAILED rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance( - rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, "")); + rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, "")); rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance( - rmApp.getApplicationId(), nodeId1, LogAggregationStatus.TIME_OUT, "")); + rmApp.getApplicationId(), LogAggregationStatus.TIME_OUT, "")); rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance( - rmApp.getApplicationId(), nodeId1, LogAggregationStatus.FAILED, "")); + rmApp.getApplicationId(), LogAggregationStatus.FAILED, "")); rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance( - rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, "")); + rmApp.getApplicationId(), LogAggregationStatus.FAILED, "")); Assert.assertEquals(LogAggregationStatus.FAILED, rmApp.getLogAggregationStatusForAppReport());