From 1db355a875c3ecc40a244045c6812e00c8d36ef1 Mon Sep 17 00:00:00 2001 From: Junping Du Date: Fri, 17 Apr 2015 13:18:59 -0700 Subject: [PATCH] YARN-1402. Update related Web UI and CLI with exposing client API to check log aggregation status. Contributed by Xuan Gong. --- hadoop-yarn-project/CHANGES.txt | 3 + .../yarn/api/records/ApplicationReport.java | 14 ++- .../api/records/LogAggregationStatus.java | 28 ++++- .../src/main/proto/yarn_protos.proto | 10 ++ .../yarn/client/cli/ApplicationCLI.java | 3 + .../hadoop/yarn/client/cli/TestYarnCLI.java | 3 + .../impl/pb/ApplicationReportPBImpl.java | 33 ++++++ .../yarn/api/records/impl/pb/ProtoUtils.java | 17 +++ .../protocolrecords/LogAggregationReport.java | 2 +- .../impl/pb/LogAggregationReportPBImpl.java | 13 +-- .../proto/yarn_server_common_protos.proto | 11 +- .../logaggregation/AppLogAggregatorImpl.java | 4 +- .../server/resourcemanager/rmapp/RMApp.java | 3 + .../resourcemanager/rmapp/RMAppImpl.java | 99 ++++++++++++++-- .../RMAppLogAggregationStatusBlock.java | 11 +- .../resourcemanager/webapp/dao/AppInfo.java | 9 +- .../applicationsmanager/MockAsm.java | 6 + .../TestRMAppLogAggregationStatus.java | 106 +++++++++++++++++- .../resourcemanager/rmapp/MockRMApp.java | 6 + .../webapp/TestRMWebServicesApps.java | 14 ++- 20 files changed, 346 insertions(+), 49 deletions(-) rename hadoop-yarn-project/hadoop-yarn/{hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server => hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn}/api/records/LogAggregationStatus.java (60%) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index c75f0a7883..c4d7d18c96 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -84,6 +84,9 @@ Release 2.8.0 - UNRELEASED YARN-3354. Add node label expression in ContainerTokenIdentifier to support RM recovery. (Wangda Tan via jianhe) + YARN-1402. Update related Web UI and CLI with exposing client API to check + log aggregation status. (Xuan Gong via junping_du) + IMPROVEMENTS YARN-1880. Cleanup TestApplicationClientProtocolOnHA diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationReport.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationReport.java index ff4fb52a55..e5d7254565 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationReport.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationReport.java @@ -361,5 +361,17 @@ public static ApplicationReport newInstance(ApplicationId applicationId, @Public @Stable public abstract Token getAMRMToken(); - + + /** + * Get log aggregation status for the application + * @return Application's log aggregation status + */ + @Public + @Stable + public abstract LogAggregationStatus getLogAggregationStatus(); + + @Private + @Unstable + public abstract void setLogAggregationStatus( + LogAggregationStatus logAggregationStatus); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/LogAggregationStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationStatus.java similarity index 60% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/LogAggregationStatus.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationStatus.java index 496767fcab..da1230c27f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/LogAggregationStatus.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationStatus.java @@ -16,16 +16,40 @@ * limitations under the License. */ -package org.apache.hadoop.yarn.server.api.records; +package org.apache.hadoop.yarn.api.records; + +import org.apache.hadoop.yarn.conf.YarnConfiguration; /** *

Status of Log aggregation.

*/ public enum LogAggregationStatus { + + /** Log Aggregation is Disabled. */ DISABLED, + + /** Log Aggregation does not Start. */ NOT_START, + + /** Log Aggregation is Running. */ RUNNING, - FINISHED, + + /** + * Log Aggregation is Succeeded. All of the logs have been aggregated + * successfully. + */ + SUCCEEDED, + + /** + * Log Aggregation is completed. But at least one of the logs have not been + * aggregated. + */ FAILED, + + /** + * The application is finished, but the log aggregation status is not updated + * for a long time. + * @see YarnConfiguration#LOG_AGGREGATION_STATUS_TIME_OUT_MS + */ TIME_OUT } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 7781d65716..a0491fe168 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -194,6 +194,16 @@ message ApplicationReportProto { optional string applicationType = 18; optional hadoop.common.TokenProto am_rm_token = 19; repeated string applicationTags = 20; + optional LogAggregationStatusProto log_aggregation_status = 21; +} + +enum LogAggregationStatusProto { + LOG_DISABLED = 1; + LOG_NOT_START = 2; + LOG_RUNNING = 3; + LOG_SUCCEEDED = 4; + LOG_FAILED = 5; + LOG_TIME_OUT = 6; } message ApplicationAttemptReportProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java index dd4a949ed4..8ef88c39ff 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java @@ -530,6 +530,9 @@ private int printApplicationReport(String applicationId) } else { appReportStr.println("N/A"); } + appReportStr.print("\tLog Aggregation Status : "); + appReportStr.println(appReport.getLogAggregationStatus() == null ? "N/A" + : appReport.getLogAggregationStatus()); appReportStr.print("\tDiagnostics : "); appReportStr.print(appReport.getDiagnostics()); } else { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java index b8be88d363..003f086f9c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java @@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.LogAggregationStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeState; @@ -104,6 +105,7 @@ public void testGetApplicationReport() throws Exception { YarnApplicationState.FINISHED, "diagnostics", "url", 0, 0, FinalApplicationStatus.SUCCEEDED, usageReport, "N/A", 0.53789f, "YARN", null); + newApplicationReport.setLogAggregationStatus(LogAggregationStatus.SUCCEEDED); when(client.getApplicationReport(any(ApplicationId.class))).thenReturn( newApplicationReport); int result = cli.run(new String[] { "application", "-status", applicationId.toString() }); @@ -127,6 +129,7 @@ public void testGetApplicationReport() throws Exception { pw.println("\tAM Host : host"); pw.println("\tAggregate Resource Allocation : " + (i == 0 ? "N/A" : "123456 MB-seconds, 4567 vcore-seconds")); + pw.println("\tLog Aggregation Status : SUCCEEDED"); pw.println("\tDiagnostics : diagnostics"); pw.close(); String appReportStr = baos.toString("UTF-8"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationReportPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationReportPBImpl.java index dd3e2bc213..751dd90e16 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationReportPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationReportPBImpl.java @@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.LogAggregationStatus; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptIdProto; @@ -34,6 +35,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationReportProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationResourceUsageReportProto; import org.apache.hadoop.yarn.proto.YarnProtos.FinalApplicationStatusProto; +import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationStatusProto; import org.apache.hadoop.yarn.proto.YarnProtos.YarnApplicationStateProto; import com.google.protobuf.TextFormat; @@ -548,4 +550,35 @@ private TokenPBImpl convertFromProtoFormat(TokenProto p) { private TokenProto convertToProtoFormat(Token t) { return ((TokenPBImpl)t).getProto(); } + + @Override + public LogAggregationStatus getLogAggregationStatus() { + ApplicationReportProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasLogAggregationStatus()) { + return null; + } + return convertFromProtoFormat(p.getLogAggregationStatus()); + } + + @Override + public void setLogAggregationStatus( + LogAggregationStatus logAggregationStatus) { + maybeInitBuilder(); + if (logAggregationStatus == null) { + builder.clearLogAggregationStatus(); + return; + } + builder.setLogAggregationStatus( + convertToProtoFormat(logAggregationStatus)); + } + + private LogAggregationStatus convertFromProtoFormat( + LogAggregationStatusProto s) { + return ProtoUtils.convertFromProtoFormat(s); + } + + private LogAggregationStatusProto + convertToProtoFormat(LogAggregationStatus s) { + return ProtoUtils.convertToProtoFormat(s); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java index 586e9dd365..4e8a19c65d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java @@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.api.records.LogAggregationStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.QueueACL; @@ -44,6 +45,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.FinalApplicationStatusProto; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceTypeProto; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceVisibilityProto; +import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationStatusProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeStateProto; import org.apache.hadoop.yarn.proto.YarnProtos.QueueACLProto; @@ -253,4 +255,19 @@ public static ReservationRequestInterpreter convertFromProtoFormat( return ReservationRequestInterpreter.valueOf(e.name()); } + /* + * Log Aggregation Status + */ + private static final String LOG_AGGREGATION_STATUS_PREFIX = "LOG_"; + public static LogAggregationStatusProto convertToProtoFormat( + LogAggregationStatus e) { + return LogAggregationStatusProto.valueOf(LOG_AGGREGATION_STATUS_PREFIX + + e.name()); + } + + public static LogAggregationStatus convertFromProtoFormat( + LogAggregationStatusProto e) { + return LogAggregationStatus.valueOf(e.name().replace( + LOG_AGGREGATION_STATUS_PREFIX, "")); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/LogAggregationReport.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/LogAggregationReport.java index 808804b687..b2270d82fe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/LogAggregationReport.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/LogAggregationReport.java @@ -21,8 +21,8 @@ import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.LogAggregationStatus; import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.server.api.records.LogAggregationStatus; import org.apache.hadoop.yarn.util.Records; /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/LogAggregationReportPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/LogAggregationReportPBImpl.java index 7999fa7aad..75b6eab6fc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/LogAggregationReportPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/LogAggregationReportPBImpl.java @@ -21,16 +21,17 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.LogAggregationStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; +import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationStatusProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto; -import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.LogAggregationStatusProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProtoOrBuilder; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; -import org.apache.hadoop.yarn.server.api.records.LogAggregationStatus; import com.google.protobuf.TextFormat; @@ -43,8 +44,6 @@ public class LogAggregationReportPBImpl extends LogAggregationReport { LogAggregationReportProto.Builder builder = null; boolean viaProto = false; - private static final String LOGAGGREGATION_STATUS_PREFIX = "LOG_"; - private ApplicationId applicationId; private NodeId nodeId; @@ -166,14 +165,12 @@ public LogAggregationStatus getLogAggregationStatus() { private LogAggregationStatus convertFromProtoFormat( LogAggregationStatusProto s) { - return LogAggregationStatus.valueOf(s.name().replace( - LOGAGGREGATION_STATUS_PREFIX, "")); + return ProtoUtils.convertFromProtoFormat(s); } private LogAggregationStatusProto convertToProtoFormat(LogAggregationStatus s) { - return LogAggregationStatusProto.valueOf(LOGAGGREGATION_STATUS_PREFIX - + s.name()); + return ProtoUtils.convertToProtoFormat(s); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto index 6e9f4cb8c5..99149ac15e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto @@ -52,13 +52,4 @@ message NodeHealthStatusProto { message VersionProto { optional int32 major_version = 1; optional int32 minor_version = 2; -} - -enum LogAggregationStatusProto { - LOG_DISABLED = 1; - LOG_NOT_START = 2; - LOG_RUNNING = 3; - LOG_FINISHED = 4; - LOG_TIME_OUT = 5; -} - +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java index bf7d5f8b72..3f09e5d337 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java @@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.LogAggregationContext; +import org.apache.hadoop.yarn.api.records.LogAggregationStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; @@ -58,7 +59,6 @@ import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy; import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; -import org.apache.hadoop.yarn.server.api.records.LogAggregationStatus; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; @@ -347,7 +347,7 @@ public Object run() throws Exception { report.setDiagnosticMessage(diagnosticMessage); if (appFinished) { report.setLogAggregationStatus(renameTemporaryLogFileFailed - ? LogAggregationStatus.FAILED : LogAggregationStatus.FINISHED); + ? LogAggregationStatus.FAILED : LogAggregationStatus.SUCCEEDED); } else { report.setLogAggregationStatus(LogAggregationStatus.RUNNING); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java index 33eedbf60a..be9dfaf91a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java @@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.LogAggregationStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -245,4 +246,6 @@ ApplicationReport createAndGetApplicationReport(String clientUserName, ResourceRequest getAMResourceRequest(); Map getLogAggregationReportsForApp(); + + LogAggregationStatus getLogAggregationStatusForAppReport(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 47c4807a70..b4e4965c3b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -50,6 +50,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.LogAggregationStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.ReservationId; @@ -64,7 +65,6 @@ import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; -import org.apache.hadoop.yarn.server.api.records.LogAggregationStatus; import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent; import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType; @@ -151,6 +151,7 @@ public class RMAppImpl implements RMApp, Recoverable { private final long logAggregationStatusTimeout; private final Map logAggregationStatus = new HashMap(); + private LogAggregationStatus logAggregationStatusForAppReport; // These states stored are only valid when app is at killing or final_saving. private RMAppState stateBeforeKilling; @@ -578,6 +579,7 @@ public ApplicationReport createAndGetApplicationReport(String clientUserName, String trackingUrl = UNAVAILABLE; String host = UNAVAILABLE; String origTrackingUrl = UNAVAILABLE; + LogAggregationStatus logAggregationStatus = null; int rpcPort = -1; ApplicationResourceUsageReport appUsageReport = RMServerUtils.DUMMY_APPLICATION_RESOURCE_USAGE_REPORT; @@ -608,6 +610,7 @@ public ApplicationReport createAndGetApplicationReport(String clientUserName, rpcPort = this.currentAttempt.getRpcPort(); appUsageReport = currentAttempt.getApplicationResourceUsageReport(); progress = currentAttempt.getProgress(); + logAggregationStatus = this.getLogAggregationStatusForAppReport(); } diags = this.diagnostics.toString(); @@ -635,13 +638,15 @@ public ApplicationReport createAndGetApplicationReport(String clientUserName, DUMMY_APPLICATION_ATTEMPT_NUMBER); } - return BuilderUtils.newApplicationReport(this.applicationId, - currentApplicationAttemptId, this.user, this.queue, - this.name, host, rpcPort, clientToAMToken, + ApplicationReport report = BuilderUtils.newApplicationReport( + this.applicationId, currentApplicationAttemptId, this.user, + this.queue, this.name, host, rpcPort, clientToAMToken, createApplicationState(), diags, trackingUrl, this.startTime, this.finishTime, finishState, appUsageReport, origTrackingUrl, progress, this.applicationType, amrmToken, applicationTags); + report.setLogAggregationStatus(logAggregationStatus); + return report; } finally { this.readLock.unlock(); } @@ -827,11 +832,13 @@ public void transition(RMAppImpl app, RMAppEvent event) { // otherwise, add it to ranNodes for further process app.ranNodes.add(nodeAddedEvent.getNodeId()); - app.logAggregationStatus.put(nodeAddedEvent.getNodeId(), - LogAggregationReport.newInstance(app.applicationId, nodeAddedEvent - .getNodeId(), app.logAggregationEnabled - ? LogAggregationStatus.NOT_START : LogAggregationStatus.DISABLED, - "")); + if (!app.logAggregationStatus.containsKey(nodeAddedEvent.getNodeId())) { + app.logAggregationStatus.put(nodeAddedEvent.getNodeId(), + LogAggregationReport.newInstance(app.applicationId, nodeAddedEvent + .getNodeId(), app.logAggregationEnabled + ? LogAggregationStatus.NOT_START : LogAggregationStatus.DISABLED, + "")); + } }; } @@ -1398,7 +1405,9 @@ public Map getLogAggregationReportsForApp() { if (!output.getValue().getLogAggregationStatus() .equals(LogAggregationStatus.TIME_OUT) && !output.getValue().getLogAggregationStatus() - .equals(LogAggregationStatus.FINISHED) + .equals(LogAggregationStatus.SUCCEEDED) + && !output.getValue().getLogAggregationStatus() + .equals(LogAggregationStatus.FAILED) && isAppInFinalState(this) && System.currentTimeMillis() > this.logAggregationStartTime + this.logAggregationStatusTimeout) { @@ -1423,7 +1432,9 @@ public void aggregateLogReport(NodeId nodeId, LogAggregationReport report) { if (curReport.getLogAggregationStatus().equals( LogAggregationStatus.TIME_OUT)) { if (report.getLogAggregationStatus().equals( - LogAggregationStatus.FINISHED)) { + LogAggregationStatus.SUCCEEDED) + || report.getLogAggregationStatus().equals( + LogAggregationStatus.FAILED)) { curReport.setLogAggregationStatus(report .getLogAggregationStatus()); } @@ -1444,4 +1455,70 @@ public void aggregateLogReport(NodeId nodeId, LogAggregationReport report) { this.writeLock.unlock(); } } + + @Override + public LogAggregationStatus getLogAggregationStatusForAppReport() { + if (!logAggregationEnabled) { + return LogAggregationStatus.DISABLED; + } + if (this.logAggregationStatusForAppReport == LogAggregationStatus.FAILED + || this.logAggregationStatusForAppReport == LogAggregationStatus.SUCCEEDED) { + return this.logAggregationStatusForAppReport; + } + try { + this.readLock.lock(); + Map reports = + getLogAggregationReportsForApp(); + if (reports.size() == 0) { + return null; + } + int logNotStartCount = 0; + int logCompletedCount = 0; + int logTimeOutCount = 0; + int logFailedCount = 0; + for (Entry report : reports.entrySet()) { + switch (report.getValue().getLogAggregationStatus()) { + case NOT_START: + logNotStartCount++; + break; + case SUCCEEDED: + logCompletedCount++; + break; + case FAILED: + logFailedCount++; + logCompletedCount++; + break; + case TIME_OUT: + logTimeOutCount++; + logCompletedCount++; + break; + default: + break; + } + } + if (logNotStartCount == reports.size()) { + return LogAggregationStatus.NOT_START; + } else if (logCompletedCount == reports.size()) { + // We should satisfy two condition in order to return SUCCEEDED or FAILED + // 1) make sure the application is in final state + // 2) logs status from all NMs are SUCCEEDED/FAILED/TIMEOUT + // The SUCCEEDED/FAILED status is the final status which means + // the log aggregation is finished. And the log aggregation status will + // not be updated anymore. + if (logFailedCount > 0 && isAppInFinalState(this)) { + this.logAggregationStatusForAppReport = LogAggregationStatus.FAILED; + return LogAggregationStatus.FAILED; + } else if (logTimeOutCount > 0) { + return LogAggregationStatus.TIME_OUT; + } + if (isAppInFinalState(this)) { + this.logAggregationStatusForAppReport = LogAggregationStatus.SUCCEEDED; + return LogAggregationStatus.SUCCEEDED; + } + } + return LogAggregationStatus.RUNNING; + } finally { + this.readLock.unlock(); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppLogAggregationStatusBlock.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppLogAggregationStatusBlock.java index a95f76f851..a2f61e348b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppLogAggregationStatusBlock.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppLogAggregationStatusBlock.java @@ -30,10 +30,10 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.LogAggregationStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; -import org.apache.hadoop.yarn.server.api.records.LogAggregationStatus; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.util.Apps; @@ -93,15 +93,16 @@ protected void render(Block html) { .td("Log Aggregation does not Start.")._(); table_description.tr().td(LogAggregationStatus.RUNNING.name()) .td("Log Aggregation is Running.")._(); - table_description.tr().td(LogAggregationStatus.FINISHED.name()) - .td("Log Aggregation is Finished. All of the logs have been " + table_description.tr().td(LogAggregationStatus.SUCCEEDED.name()) + .td("Log Aggregation is Succeeded. All of the logs have been " + "aggregated successfully.")._(); table_description.tr().td(LogAggregationStatus.FAILED.name()) .td("Log Aggregation is Failed. At least one of the logs " + "have not been aggregated.")._(); table_description.tr().td(LogAggregationStatus.TIME_OUT.name()) - .td("Does not get the Log aggregation status for a long time. " - + "Not sure what is the current Log Aggregation Status.")._(); + .td("The application is finished, but the log aggregation status is " + + "not updated for a long time. Not sure whether the log aggregation " + + "is finished or not.")._(); table_description._(); div_description._(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppInfo.java index 79b2248281..bd3b046ab9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppInfo.java @@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.LogAggregationStatus; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.YarnApplicationState; @@ -94,6 +95,8 @@ public class AppInfo { protected List resourceRequests; + protected LogAggregationStatus logAggregationStatus; + public AppInfo() { } // JAXB needs this @@ -141,7 +144,7 @@ public AppInfo(ResourceManager rm, RMApp app, Boolean hasAccess, this.finishedTime = app.getFinishTime(); this.elapsedTime = Times.elapsed(app.getStartTime(), app.getFinishTime()); - + this.logAggregationStatus = app.getLogAggregationStatusForAppReport(); RMAppAttempt attempt = app.getCurrentAppAttempt(); if (attempt != null) { Container masterContainer = attempt.getMasterContainer(); @@ -314,4 +317,8 @@ public long getVcoreSeconds() { public List getResourceRequests() { return this.resourceRequests; } + + public LogAggregationStatus getLogAggregationStatus() { + return this.logAggregationStatus; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java index a6e469e418..a23c789ed1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java @@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.LogAggregationStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; @@ -196,6 +197,11 @@ public ResourceRequest getAMResourceRequest() { public Map getLogAggregationReportsForApp() { throw new UnsupportedOperationException("Not supported yet."); } + + @Override + public LogAggregationStatus getLogAggregationStatusForAppReport() { + throw new UnsupportedOperationException("Not supported yet."); + } } public static RMApp newApplication(int i) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java index 7397d38e63..4eec63f679 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java @@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.LogAggregationStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; @@ -38,7 +39,6 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.InlineDispatcher; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; -import org.apache.hadoop.yarn.server.api.records.LogAggregationStatus; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; @@ -267,7 +267,7 @@ public void testLogAggregationStatus() throws Exception { // Finally, node1 finished its log aggregation and sent out its final // log aggregation status. The log aggregation status for node1 should - // be changed from TIME_OUT to Finished + // be changed from TIME_OUT to SUCCEEDED Map node1ReportForApp3 = new HashMap(); String messageForNode1_3 = @@ -275,7 +275,7 @@ public void testLogAggregationStatus() throws Exception { + System.currentTimeMillis(); LogAggregationReport report1_3 = LogAggregationReport.newInstance(appId, nodeId1, - LogAggregationStatus.FINISHED, messageForNode1_3); + LogAggregationStatus.SUCCEEDED, messageForNode1_3); node1ReportForApp3.put(appId, report1_3); node1.handle(new RMNodeStatusEvent(node1.getNodeID(), NodeHealthStatus .newInstance(true, null, 0), new ArrayList(), null, @@ -288,7 +288,7 @@ public void testLogAggregationStatus() throws Exception { for (Entry report : logAggregationStatus .entrySet()) { if (report.getKey().equals(node1.getNodeID())) { - Assert.assertEquals(LogAggregationStatus.FINISHED, report.getValue() + Assert.assertEquals(LogAggregationStatus.SUCCEEDED, report.getValue() .getLogAggregationStatus()); Assert.assertEquals(messageForNode1_1 + messageForNode1_2 + messageForNode1_3, report.getValue().getDiagnosticMessage()); @@ -303,6 +303,104 @@ public void testLogAggregationStatus() throws Exception { } } + @Test (timeout = 10000) + public void testGetLogAggregationStatusForAppReport() { + YarnConfiguration conf = new YarnConfiguration(); + + // Disable the log aggregation + conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false); + RMAppImpl rmApp = (RMAppImpl)createRMApp(conf); + // The log aggregation status should be DISABLED. + Assert.assertEquals(LogAggregationStatus.DISABLED, + rmApp.getLogAggregationStatusForAppReport()); + + // Enable the log aggregation + conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true); + rmApp = (RMAppImpl)createRMApp(conf); + // If we do not know any NodeManagers for this application , + // the log aggregation status will return null + Assert.assertNull(rmApp.getLogAggregationStatusForAppReport()); + + NodeId nodeId1 = NodeId.newInstance("localhost", 1111); + NodeId nodeId2 = NodeId.newInstance("localhost", 2222); + NodeId nodeId3 = NodeId.newInstance("localhost", 3333); + NodeId nodeId4 = NodeId.newInstance("localhost", 4444); + + // If the log aggregation status for all NMs are NOT_START, + // the log aggregation status for this app will return NOT_START + rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance( + rmApp.getApplicationId(), nodeId1, LogAggregationStatus.NOT_START, "")); + rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance( + rmApp.getApplicationId(), nodeId1, LogAggregationStatus.NOT_START, "")); + rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance( + rmApp.getApplicationId(), nodeId1, LogAggregationStatus.NOT_START, "")); + rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance( + rmApp.getApplicationId(), nodeId1, LogAggregationStatus.NOT_START, "")); + Assert.assertEquals(LogAggregationStatus.NOT_START, + rmApp.getLogAggregationStatusForAppReport()); + + rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance( + rmApp.getApplicationId(), nodeId1, LogAggregationStatus.NOT_START, "")); + rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance( + rmApp.getApplicationId(), nodeId1, LogAggregationStatus.RUNNING, "")); + rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance( + rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, "")); + rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance( + rmApp.getApplicationId(), nodeId1, LogAggregationStatus.FAILED, "")); + Assert.assertEquals(LogAggregationStatus.RUNNING, + rmApp.getLogAggregationStatusForAppReport()); + + rmApp.handle(new RMAppEvent(rmApp.getApplicationId(), RMAppEventType.KILL)); + Assert.assertTrue(RMAppImpl.isAppInFinalState(rmApp)); + + // If at least of one log aggregation status for one NM is TIME_OUT, + // others are SUCCEEDED, the log aggregation status for this app will + // return TIME_OUT + rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance( + rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, "")); + rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance( + rmApp.getApplicationId(), nodeId1, LogAggregationStatus.TIME_OUT, "")); + rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance( + rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, "")); + rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance( + rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, "")); + Assert.assertEquals(LogAggregationStatus.TIME_OUT, + rmApp.getLogAggregationStatusForAppReport()); + + // If the log aggregation status for all NMs are SUCCEEDED and Application + // is at the final state, the log aggregation status for this app will + // return SUCCEEDED + rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance( + rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, "")); + rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance( + rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, "")); + rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance( + rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, "")); + rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance( + rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, "")); + Assert.assertEquals(LogAggregationStatus.SUCCEEDED, + rmApp.getLogAggregationStatusForAppReport()); + + rmApp = (RMAppImpl)createRMApp(conf); + rmApp.handle(new RMAppEvent(rmApp.getApplicationId(), RMAppEventType.KILL)); + Assert.assertTrue(RMAppImpl.isAppInFinalState(rmApp)); + // If at least of one log aggregation status for one NM is FAILED, + // others are either SUCCEEDED or TIME_OUT, and this application is + // at the final state, the log aggregation status for this app + // will return FAILED + rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance( + rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, "")); + rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance( + rmApp.getApplicationId(), nodeId1, LogAggregationStatus.TIME_OUT, "")); + rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance( + rmApp.getApplicationId(), nodeId1, LogAggregationStatus.FAILED, "")); + rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance( + rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, "")); + Assert.assertEquals(LogAggregationStatus.FAILED, + rmApp.getLogAggregationStatusForAppReport()); + + } + private RMApp createRMApp(Configuration conf) { ApplicationSubmissionContext submissionContext = ApplicationSubmissionContext.newInstance(appId, "test", "default", diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java index 81de286b36..c6ee3baa20 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java @@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.LogAggregationStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; @@ -277,4 +278,9 @@ public ResourceRequest getAMResourceRequest() { public Map getLogAggregationReportsForApp() { throw new UnsupportedOperationException("Not supported yet."); } + + @Override + public LogAggregationStatus getLogAggregationStatusForAppReport() { + return null; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesApps.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesApps.java index bd43c554ce..549b9e0ac6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesApps.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesApps.java @@ -1307,14 +1307,15 @@ public void verifyAppsXML(NodeList nodes, RMApp app) throws JSONException, WebServicesTestUtils.getXmlInt(element, "preemptedResourceMB"), WebServicesTestUtils.getXmlInt(element, "preemptedResourceVCores"), WebServicesTestUtils.getXmlInt(element, "numNonAMContainerPreempted"), - WebServicesTestUtils.getXmlInt(element, "numAMContainerPreempted")); + WebServicesTestUtils.getXmlInt(element, "numAMContainerPreempted"), + WebServicesTestUtils.getXmlString(element, "logAggregationStatus")); } } public void verifyAppInfo(JSONObject info, RMApp app) throws JSONException, Exception { - assertEquals("incorrect number of elements", 27, info.length()); + assertEquals("incorrect number of elements", 28, info.length()); verifyAppInfoGeneric(app, info.getString("id"), info.getString("user"), info.getString("name"), info.getString("applicationType"), @@ -1329,7 +1330,8 @@ public void verifyAppInfo(JSONObject info, RMApp app) throws JSONException, info.getInt("preemptedResourceMB"), info.getInt("preemptedResourceVCores"), info.getInt("numNonAMContainerPreempted"), - info.getInt("numAMContainerPreempted")); + info.getInt("numAMContainerPreempted"), + info.getString("logAggregationStatus")); } public void verifyAppInfoGeneric(RMApp app, String id, String user, @@ -1339,7 +1341,8 @@ public void verifyAppInfoGeneric(RMApp app, String id, String user, long elapsedTime, String amHostHttpAddress, String amContainerLogs, int allocatedMB, int allocatedVCores, int numContainers, int preemptedResourceMB, int preemptedResourceVCores, - int numNonAMContainerPreempted, int numAMContainerPreempted) throws JSONException, + int numNonAMContainerPreempted, int numAMContainerPreempted, + String logAggregationStatus) throws JSONException, Exception { WebServicesTestUtils.checkStringMatch("id", app.getApplicationId() @@ -1386,6 +1389,9 @@ public void verifyAppInfoGeneric(RMApp app, String id, String user, assertEquals("numAMContainerPreempted doesn't match", app .getRMAppMetrics().getNumAMContainersPreempted(), numAMContainerPreempted); + assertEquals("Log aggregation Status doesn't match", app + .getLogAggregationStatusForAppReport().toString(), + logAggregationStatus); } @Test