YARN-1402. Update related Web UI and CLI with exposing client API to check log aggregation status. Contributed by Xuan Gong.
This commit is contained in:
parent
c6b5203cfd
commit
1db355a875
@ -84,6 +84,9 @@ Release 2.8.0 - UNRELEASED
|
||||
YARN-3354. Add node label expression in ContainerTokenIdentifier to support
|
||||
RM recovery. (Wangda Tan via jianhe)
|
||||
|
||||
YARN-1402. Update related Web UI and CLI with exposing client API to check
|
||||
log aggregation status. (Xuan Gong via junping_du)
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
YARN-1880. Cleanup TestApplicationClientProtocolOnHA
|
||||
|
@ -361,5 +361,17 @@ public static ApplicationReport newInstance(ApplicationId applicationId,
|
||||
@Public
|
||||
@Stable
|
||||
public abstract Token getAMRMToken();
|
||||
|
||||
|
||||
/**
|
||||
* Get log aggregation status for the application
|
||||
* @return Application's log aggregation status
|
||||
*/
|
||||
@Public
|
||||
@Stable
|
||||
public abstract LogAggregationStatus getLogAggregationStatus();
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public abstract void setLogAggregationStatus(
|
||||
LogAggregationStatus logAggregationStatus);
|
||||
}
|
||||
|
@ -16,16 +16,40 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.api.records;
|
||||
package org.apache.hadoop.yarn.api.records;
|
||||
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
|
||||
/**
|
||||
* <p>Status of Log aggregation.</p>
|
||||
*/
|
||||
public enum LogAggregationStatus {
|
||||
|
||||
/** Log Aggregation is Disabled. */
|
||||
DISABLED,
|
||||
|
||||
/** Log Aggregation does not Start. */
|
||||
NOT_START,
|
||||
|
||||
/** Log Aggregation is Running. */
|
||||
RUNNING,
|
||||
FINISHED,
|
||||
|
||||
/**
|
||||
* Log Aggregation is Succeeded. All of the logs have been aggregated
|
||||
* successfully.
|
||||
*/
|
||||
SUCCEEDED,
|
||||
|
||||
/**
|
||||
* Log Aggregation is completed. But at least one of the logs have not been
|
||||
* aggregated.
|
||||
*/
|
||||
FAILED,
|
||||
|
||||
/**
|
||||
* The application is finished, but the log aggregation status is not updated
|
||||
* for a long time.
|
||||
* @see YarnConfiguration#LOG_AGGREGATION_STATUS_TIME_OUT_MS
|
||||
*/
|
||||
TIME_OUT
|
||||
}
|
@ -194,6 +194,16 @@ message ApplicationReportProto {
|
||||
optional string applicationType = 18;
|
||||
optional hadoop.common.TokenProto am_rm_token = 19;
|
||||
repeated string applicationTags = 20;
|
||||
optional LogAggregationStatusProto log_aggregation_status = 21;
|
||||
}
|
||||
|
||||
enum LogAggregationStatusProto {
|
||||
LOG_DISABLED = 1;
|
||||
LOG_NOT_START = 2;
|
||||
LOG_RUNNING = 3;
|
||||
LOG_SUCCEEDED = 4;
|
||||
LOG_FAILED = 5;
|
||||
LOG_TIME_OUT = 6;
|
||||
}
|
||||
|
||||
message ApplicationAttemptReportProto {
|
||||
|
@ -530,6 +530,9 @@ private int printApplicationReport(String applicationId)
|
||||
} else {
|
||||
appReportStr.println("N/A");
|
||||
}
|
||||
appReportStr.print("\tLog Aggregation Status : ");
|
||||
appReportStr.println(appReport.getLogAggregationStatus() == null ? "N/A"
|
||||
: appReport.getLogAggregationStatus());
|
||||
appReportStr.print("\tDiagnostics : ");
|
||||
appReportStr.print(appReport.getDiagnostics());
|
||||
} else {
|
||||
|
@ -52,6 +52,7 @@
|
||||
import org.apache.hadoop.yarn.api.records.ContainerReport;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||
import org.apache.hadoop.yarn.api.records.NodeState;
|
||||
@ -104,6 +105,7 @@ public void testGetApplicationReport() throws Exception {
|
||||
YarnApplicationState.FINISHED, "diagnostics", "url", 0, 0,
|
||||
FinalApplicationStatus.SUCCEEDED, usageReport, "N/A", 0.53789f, "YARN",
|
||||
null);
|
||||
newApplicationReport.setLogAggregationStatus(LogAggregationStatus.SUCCEEDED);
|
||||
when(client.getApplicationReport(any(ApplicationId.class))).thenReturn(
|
||||
newApplicationReport);
|
||||
int result = cli.run(new String[] { "application", "-status", applicationId.toString() });
|
||||
@ -127,6 +129,7 @@ public void testGetApplicationReport() throws Exception {
|
||||
pw.println("\tAM Host : host");
|
||||
pw.println("\tAggregate Resource Allocation : " +
|
||||
(i == 0 ? "N/A" : "123456 MB-seconds, 4567 vcore-seconds"));
|
||||
pw.println("\tLog Aggregation Status : SUCCEEDED");
|
||||
pw.println("\tDiagnostics : diagnostics");
|
||||
pw.close();
|
||||
String appReportStr = baos.toString("UTF-8");
|
||||
|
@ -26,6 +26,7 @@
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.Token;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptIdProto;
|
||||
@ -34,6 +35,7 @@
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationReportProtoOrBuilder;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationResourceUsageReportProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.FinalApplicationStatusProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationStatusProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.YarnApplicationStateProto;
|
||||
|
||||
import com.google.protobuf.TextFormat;
|
||||
@ -548,4 +550,35 @@ private TokenPBImpl convertFromProtoFormat(TokenProto p) {
|
||||
private TokenProto convertToProtoFormat(Token t) {
|
||||
return ((TokenPBImpl)t).getProto();
|
||||
}
|
||||
|
||||
@Override
|
||||
public LogAggregationStatus getLogAggregationStatus() {
|
||||
ApplicationReportProtoOrBuilder p = viaProto ? proto : builder;
|
||||
if (!p.hasLogAggregationStatus()) {
|
||||
return null;
|
||||
}
|
||||
return convertFromProtoFormat(p.getLogAggregationStatus());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setLogAggregationStatus(
|
||||
LogAggregationStatus logAggregationStatus) {
|
||||
maybeInitBuilder();
|
||||
if (logAggregationStatus == null) {
|
||||
builder.clearLogAggregationStatus();
|
||||
return;
|
||||
}
|
||||
builder.setLogAggregationStatus(
|
||||
convertToProtoFormat(logAggregationStatus));
|
||||
}
|
||||
|
||||
private LogAggregationStatus convertFromProtoFormat(
|
||||
LogAggregationStatusProto s) {
|
||||
return ProtoUtils.convertFromProtoFormat(s);
|
||||
}
|
||||
|
||||
private LogAggregationStatusProto
|
||||
convertToProtoFormat(LogAggregationStatus s) {
|
||||
return ProtoUtils.convertToProtoFormat(s);
|
||||
}
|
||||
}
|
||||
|
@ -30,6 +30,7 @@
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
||||
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeState;
|
||||
import org.apache.hadoop.yarn.api.records.QueueACL;
|
||||
@ -44,6 +45,7 @@
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.FinalApplicationStatusProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceTypeProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceVisibilityProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationStatusProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.NodeStateProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.QueueACLProto;
|
||||
@ -253,4 +255,19 @@ public static ReservationRequestInterpreter convertFromProtoFormat(
|
||||
return ReservationRequestInterpreter.valueOf(e.name());
|
||||
}
|
||||
|
||||
/*
|
||||
* Log Aggregation Status
|
||||
*/
|
||||
private static final String LOG_AGGREGATION_STATUS_PREFIX = "LOG_";
|
||||
public static LogAggregationStatusProto convertToProtoFormat(
|
||||
LogAggregationStatus e) {
|
||||
return LogAggregationStatusProto.valueOf(LOG_AGGREGATION_STATUS_PREFIX
|
||||
+ e.name());
|
||||
}
|
||||
|
||||
public static LogAggregationStatus convertFromProtoFormat(
|
||||
LogAggregationStatusProto e) {
|
||||
return LogAggregationStatus.valueOf(e.name().replace(
|
||||
LOG_AGGREGATION_STATUS_PREFIX, ""));
|
||||
}
|
||||
}
|
||||
|
@ -21,8 +21,8 @@
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.server.api.records.LogAggregationStatus;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
|
||||
/**
|
||||
|
@ -21,16 +21,17 @@
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationStatusProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.LogAggregationStatusProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProtoOrBuilder;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
|
||||
import org.apache.hadoop.yarn.server.api.records.LogAggregationStatus;
|
||||
|
||||
import com.google.protobuf.TextFormat;
|
||||
|
||||
@ -43,8 +44,6 @@ public class LogAggregationReportPBImpl extends LogAggregationReport {
|
||||
LogAggregationReportProto.Builder builder = null;
|
||||
boolean viaProto = false;
|
||||
|
||||
private static final String LOGAGGREGATION_STATUS_PREFIX = "LOG_";
|
||||
|
||||
private ApplicationId applicationId;
|
||||
private NodeId nodeId;
|
||||
|
||||
@ -166,14 +165,12 @@ public LogAggregationStatus getLogAggregationStatus() {
|
||||
|
||||
private LogAggregationStatus convertFromProtoFormat(
|
||||
LogAggregationStatusProto s) {
|
||||
return LogAggregationStatus.valueOf(s.name().replace(
|
||||
LOGAGGREGATION_STATUS_PREFIX, ""));
|
||||
return ProtoUtils.convertFromProtoFormat(s);
|
||||
}
|
||||
|
||||
private LogAggregationStatusProto
|
||||
convertToProtoFormat(LogAggregationStatus s) {
|
||||
return LogAggregationStatusProto.valueOf(LOGAGGREGATION_STATUS_PREFIX
|
||||
+ s.name());
|
||||
return ProtoUtils.convertToProtoFormat(s);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -52,13 +52,4 @@ message NodeHealthStatusProto {
|
||||
message VersionProto {
|
||||
optional int32 major_version = 1;
|
||||
optional int32 minor_version = 2;
|
||||
}
|
||||
|
||||
enum LogAggregationStatusProto {
|
||||
LOG_DISABLED = 1;
|
||||
LOG_NOT_START = 2;
|
||||
LOG_RUNNING = 3;
|
||||
LOG_FINISHED = 4;
|
||||
LOG_TIME_OUT = 5;
|
||||
}
|
||||
|
||||
}
|
@ -49,6 +49,7 @@
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
|
||||
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
@ -58,7 +59,6 @@
|
||||
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy;
|
||||
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
|
||||
import org.apache.hadoop.yarn.server.api.records.LogAggregationStatus;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
|
||||
@ -347,7 +347,7 @@ public Object run() throws Exception {
|
||||
report.setDiagnosticMessage(diagnosticMessage);
|
||||
if (appFinished) {
|
||||
report.setLogAggregationStatus(renameTemporaryLogFileFailed
|
||||
? LogAggregationStatus.FAILED : LogAggregationStatus.FINISHED);
|
||||
? LogAggregationStatus.FAILED : LogAggregationStatus.SUCCEEDED);
|
||||
} else {
|
||||
report.setLogAggregationStatus(LogAggregationStatus.RUNNING);
|
||||
}
|
||||
|
@ -28,6 +28,7 @@
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.ReservationId;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
@ -245,4 +246,6 @@ ApplicationReport createAndGetApplicationReport(String clientUserName,
|
||||
ResourceRequest getAMResourceRequest();
|
||||
|
||||
Map<NodeId, LogAggregationReport> getLogAggregationReportsForApp();
|
||||
|
||||
LogAggregationStatus getLogAggregationStatusForAppReport();
|
||||
}
|
||||
|
@ -50,6 +50,7 @@
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeState;
|
||||
import org.apache.hadoop.yarn.api.records.ReservationId;
|
||||
@ -64,7 +65,6 @@
|
||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
|
||||
import org.apache.hadoop.yarn.server.api.records.LogAggregationStatus;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType;
|
||||
@ -151,6 +151,7 @@ public class RMAppImpl implements RMApp, Recoverable {
|
||||
private final long logAggregationStatusTimeout;
|
||||
private final Map<NodeId, LogAggregationReport> logAggregationStatus =
|
||||
new HashMap<NodeId, LogAggregationReport>();
|
||||
private LogAggregationStatus logAggregationStatusForAppReport;
|
||||
|
||||
// These states stored are only valid when app is at killing or final_saving.
|
||||
private RMAppState stateBeforeKilling;
|
||||
@ -578,6 +579,7 @@ public ApplicationReport createAndGetApplicationReport(String clientUserName,
|
||||
String trackingUrl = UNAVAILABLE;
|
||||
String host = UNAVAILABLE;
|
||||
String origTrackingUrl = UNAVAILABLE;
|
||||
LogAggregationStatus logAggregationStatus = null;
|
||||
int rpcPort = -1;
|
||||
ApplicationResourceUsageReport appUsageReport =
|
||||
RMServerUtils.DUMMY_APPLICATION_RESOURCE_USAGE_REPORT;
|
||||
@ -608,6 +610,7 @@ public ApplicationReport createAndGetApplicationReport(String clientUserName,
|
||||
rpcPort = this.currentAttempt.getRpcPort();
|
||||
appUsageReport = currentAttempt.getApplicationResourceUsageReport();
|
||||
progress = currentAttempt.getProgress();
|
||||
logAggregationStatus = this.getLogAggregationStatusForAppReport();
|
||||
}
|
||||
diags = this.diagnostics.toString();
|
||||
|
||||
@ -635,13 +638,15 @@ public ApplicationReport createAndGetApplicationReport(String clientUserName,
|
||||
DUMMY_APPLICATION_ATTEMPT_NUMBER);
|
||||
}
|
||||
|
||||
return BuilderUtils.newApplicationReport(this.applicationId,
|
||||
currentApplicationAttemptId, this.user, this.queue,
|
||||
this.name, host, rpcPort, clientToAMToken,
|
||||
ApplicationReport report = BuilderUtils.newApplicationReport(
|
||||
this.applicationId, currentApplicationAttemptId, this.user,
|
||||
this.queue, this.name, host, rpcPort, clientToAMToken,
|
||||
createApplicationState(), diags,
|
||||
trackingUrl, this.startTime, this.finishTime, finishState,
|
||||
appUsageReport, origTrackingUrl, progress, this.applicationType,
|
||||
amrmToken, applicationTags);
|
||||
report.setLogAggregationStatus(logAggregationStatus);
|
||||
return report;
|
||||
} finally {
|
||||
this.readLock.unlock();
|
||||
}
|
||||
@ -827,11 +832,13 @@ public void transition(RMAppImpl app, RMAppEvent event) {
|
||||
// otherwise, add it to ranNodes for further process
|
||||
app.ranNodes.add(nodeAddedEvent.getNodeId());
|
||||
|
||||
app.logAggregationStatus.put(nodeAddedEvent.getNodeId(),
|
||||
LogAggregationReport.newInstance(app.applicationId, nodeAddedEvent
|
||||
.getNodeId(), app.logAggregationEnabled
|
||||
? LogAggregationStatus.NOT_START : LogAggregationStatus.DISABLED,
|
||||
""));
|
||||
if (!app.logAggregationStatus.containsKey(nodeAddedEvent.getNodeId())) {
|
||||
app.logAggregationStatus.put(nodeAddedEvent.getNodeId(),
|
||||
LogAggregationReport.newInstance(app.applicationId, nodeAddedEvent
|
||||
.getNodeId(), app.logAggregationEnabled
|
||||
? LogAggregationStatus.NOT_START : LogAggregationStatus.DISABLED,
|
||||
""));
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@ -1398,7 +1405,9 @@ public Map<NodeId, LogAggregationReport> getLogAggregationReportsForApp() {
|
||||
if (!output.getValue().getLogAggregationStatus()
|
||||
.equals(LogAggregationStatus.TIME_OUT)
|
||||
&& !output.getValue().getLogAggregationStatus()
|
||||
.equals(LogAggregationStatus.FINISHED)
|
||||
.equals(LogAggregationStatus.SUCCEEDED)
|
||||
&& !output.getValue().getLogAggregationStatus()
|
||||
.equals(LogAggregationStatus.FAILED)
|
||||
&& isAppInFinalState(this)
|
||||
&& System.currentTimeMillis() > this.logAggregationStartTime
|
||||
+ this.logAggregationStatusTimeout) {
|
||||
@ -1423,7 +1432,9 @@ public void aggregateLogReport(NodeId nodeId, LogAggregationReport report) {
|
||||
if (curReport.getLogAggregationStatus().equals(
|
||||
LogAggregationStatus.TIME_OUT)) {
|
||||
if (report.getLogAggregationStatus().equals(
|
||||
LogAggregationStatus.FINISHED)) {
|
||||
LogAggregationStatus.SUCCEEDED)
|
||||
|| report.getLogAggregationStatus().equals(
|
||||
LogAggregationStatus.FAILED)) {
|
||||
curReport.setLogAggregationStatus(report
|
||||
.getLogAggregationStatus());
|
||||
}
|
||||
@ -1444,4 +1455,70 @@ public void aggregateLogReport(NodeId nodeId, LogAggregationReport report) {
|
||||
this.writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public LogAggregationStatus getLogAggregationStatusForAppReport() {
|
||||
if (!logAggregationEnabled) {
|
||||
return LogAggregationStatus.DISABLED;
|
||||
}
|
||||
if (this.logAggregationStatusForAppReport == LogAggregationStatus.FAILED
|
||||
|| this.logAggregationStatusForAppReport == LogAggregationStatus.SUCCEEDED) {
|
||||
return this.logAggregationStatusForAppReport;
|
||||
}
|
||||
try {
|
||||
this.readLock.lock();
|
||||
Map<NodeId, LogAggregationReport> reports =
|
||||
getLogAggregationReportsForApp();
|
||||
if (reports.size() == 0) {
|
||||
return null;
|
||||
}
|
||||
int logNotStartCount = 0;
|
||||
int logCompletedCount = 0;
|
||||
int logTimeOutCount = 0;
|
||||
int logFailedCount = 0;
|
||||
for (Entry<NodeId, LogAggregationReport> report : reports.entrySet()) {
|
||||
switch (report.getValue().getLogAggregationStatus()) {
|
||||
case NOT_START:
|
||||
logNotStartCount++;
|
||||
break;
|
||||
case SUCCEEDED:
|
||||
logCompletedCount++;
|
||||
break;
|
||||
case FAILED:
|
||||
logFailedCount++;
|
||||
logCompletedCount++;
|
||||
break;
|
||||
case TIME_OUT:
|
||||
logTimeOutCount++;
|
||||
logCompletedCount++;
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (logNotStartCount == reports.size()) {
|
||||
return LogAggregationStatus.NOT_START;
|
||||
} else if (logCompletedCount == reports.size()) {
|
||||
// We should satisfy two condition in order to return SUCCEEDED or FAILED
|
||||
// 1) make sure the application is in final state
|
||||
// 2) logs status from all NMs are SUCCEEDED/FAILED/TIMEOUT
|
||||
// The SUCCEEDED/FAILED status is the final status which means
|
||||
// the log aggregation is finished. And the log aggregation status will
|
||||
// not be updated anymore.
|
||||
if (logFailedCount > 0 && isAppInFinalState(this)) {
|
||||
this.logAggregationStatusForAppReport = LogAggregationStatus.FAILED;
|
||||
return LogAggregationStatus.FAILED;
|
||||
} else if (logTimeOutCount > 0) {
|
||||
return LogAggregationStatus.TIME_OUT;
|
||||
}
|
||||
if (isAppInFinalState(this)) {
|
||||
this.logAggregationStatusForAppReport = LogAggregationStatus.SUCCEEDED;
|
||||
return LogAggregationStatus.SUCCEEDED;
|
||||
}
|
||||
}
|
||||
return LogAggregationStatus.RUNNING;
|
||||
} finally {
|
||||
this.readLock.unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -30,10 +30,10 @@
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
|
||||
import org.apache.hadoop.yarn.server.api.records.LogAggregationStatus;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.util.Apps;
|
||||
@ -93,15 +93,16 @@ protected void render(Block html) {
|
||||
.td("Log Aggregation does not Start.")._();
|
||||
table_description.tr().td(LogAggregationStatus.RUNNING.name())
|
||||
.td("Log Aggregation is Running.")._();
|
||||
table_description.tr().td(LogAggregationStatus.FINISHED.name())
|
||||
.td("Log Aggregation is Finished. All of the logs have been "
|
||||
table_description.tr().td(LogAggregationStatus.SUCCEEDED.name())
|
||||
.td("Log Aggregation is Succeeded. All of the logs have been "
|
||||
+ "aggregated successfully.")._();
|
||||
table_description.tr().td(LogAggregationStatus.FAILED.name())
|
||||
.td("Log Aggregation is Failed. At least one of the logs "
|
||||
+ "have not been aggregated.")._();
|
||||
table_description.tr().td(LogAggregationStatus.TIME_OUT.name())
|
||||
.td("Does not get the Log aggregation status for a long time. "
|
||||
+ "Not sure what is the current Log Aggregation Status.")._();
|
||||
.td("The application is finished, but the log aggregation status is "
|
||||
+ "not updated for a long time. Not sure whether the log aggregation "
|
||||
+ "is finished or not.")._();
|
||||
table_description._();
|
||||
div_description._();
|
||||
|
||||
|
@ -28,6 +28,7 @@
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||
@ -94,6 +95,8 @@ public class AppInfo {
|
||||
|
||||
protected List<ResourceRequest> resourceRequests;
|
||||
|
||||
protected LogAggregationStatus logAggregationStatus;
|
||||
|
||||
public AppInfo() {
|
||||
} // JAXB needs this
|
||||
|
||||
@ -141,7 +144,7 @@ public AppInfo(ResourceManager rm, RMApp app, Boolean hasAccess,
|
||||
this.finishedTime = app.getFinishTime();
|
||||
this.elapsedTime = Times.elapsed(app.getStartTime(),
|
||||
app.getFinishTime());
|
||||
|
||||
this.logAggregationStatus = app.getLogAggregationStatusForAppReport();
|
||||
RMAppAttempt attempt = app.getCurrentAppAttempt();
|
||||
if (attempt != null) {
|
||||
Container masterContainer = attempt.getMasterContainer();
|
||||
@ -314,4 +317,8 @@ public long getVcoreSeconds() {
|
||||
public List<ResourceRequest> getResourceRequests() {
|
||||
return this.resourceRequests;
|
||||
}
|
||||
|
||||
public LogAggregationStatus getLogAggregationStatus() {
|
||||
return this.logAggregationStatus;
|
||||
}
|
||||
}
|
||||
|
@ -32,6 +32,7 @@
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.ReservationId;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
@ -196,6 +197,11 @@ public ResourceRequest getAMResourceRequest() {
|
||||
public Map<NodeId, LogAggregationReport> getLogAggregationReportsForApp() {
|
||||
throw new UnsupportedOperationException("Not supported yet.");
|
||||
}
|
||||
|
||||
@Override
|
||||
public LogAggregationStatus getLogAggregationStatusForAppReport() {
|
||||
throw new UnsupportedOperationException("Not supported yet.");
|
||||
}
|
||||
}
|
||||
|
||||
public static RMApp newApplication(int i) {
|
||||
|
@ -31,6 +31,7 @@
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
@ -38,7 +39,6 @@
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.event.InlineDispatcher;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
|
||||
import org.apache.hadoop.yarn.server.api.records.LogAggregationStatus;
|
||||
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
|
||||
@ -267,7 +267,7 @@ public void testLogAggregationStatus() throws Exception {
|
||||
|
||||
// Finally, node1 finished its log aggregation and sent out its final
|
||||
// log aggregation status. The log aggregation status for node1 should
|
||||
// be changed from TIME_OUT to Finished
|
||||
// be changed from TIME_OUT to SUCCEEDED
|
||||
Map<ApplicationId, LogAggregationReport> node1ReportForApp3 =
|
||||
new HashMap<ApplicationId, LogAggregationReport>();
|
||||
String messageForNode1_3 =
|
||||
@ -275,7 +275,7 @@ public void testLogAggregationStatus() throws Exception {
|
||||
+ System.currentTimeMillis();
|
||||
LogAggregationReport report1_3 =
|
||||
LogAggregationReport.newInstance(appId, nodeId1,
|
||||
LogAggregationStatus.FINISHED, messageForNode1_3);
|
||||
LogAggregationStatus.SUCCEEDED, messageForNode1_3);
|
||||
node1ReportForApp3.put(appId, report1_3);
|
||||
node1.handle(new RMNodeStatusEvent(node1.getNodeID(), NodeHealthStatus
|
||||
.newInstance(true, null, 0), new ArrayList<ContainerStatus>(), null,
|
||||
@ -288,7 +288,7 @@ public void testLogAggregationStatus() throws Exception {
|
||||
for (Entry<NodeId, LogAggregationReport> report : logAggregationStatus
|
||||
.entrySet()) {
|
||||
if (report.getKey().equals(node1.getNodeID())) {
|
||||
Assert.assertEquals(LogAggregationStatus.FINISHED, report.getValue()
|
||||
Assert.assertEquals(LogAggregationStatus.SUCCEEDED, report.getValue()
|
||||
.getLogAggregationStatus());
|
||||
Assert.assertEquals(messageForNode1_1 + messageForNode1_2
|
||||
+ messageForNode1_3, report.getValue().getDiagnosticMessage());
|
||||
@ -303,6 +303,104 @@ public void testLogAggregationStatus() throws Exception {
|
||||
}
|
||||
}
|
||||
|
||||
@Test (timeout = 10000)
|
||||
public void testGetLogAggregationStatusForAppReport() {
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
|
||||
// Disable the log aggregation
|
||||
conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false);
|
||||
RMAppImpl rmApp = (RMAppImpl)createRMApp(conf);
|
||||
// The log aggregation status should be DISABLED.
|
||||
Assert.assertEquals(LogAggregationStatus.DISABLED,
|
||||
rmApp.getLogAggregationStatusForAppReport());
|
||||
|
||||
// Enable the log aggregation
|
||||
conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
|
||||
rmApp = (RMAppImpl)createRMApp(conf);
|
||||
// If we do not know any NodeManagers for this application ,
|
||||
// the log aggregation status will return null
|
||||
Assert.assertNull(rmApp.getLogAggregationStatusForAppReport());
|
||||
|
||||
NodeId nodeId1 = NodeId.newInstance("localhost", 1111);
|
||||
NodeId nodeId2 = NodeId.newInstance("localhost", 2222);
|
||||
NodeId nodeId3 = NodeId.newInstance("localhost", 3333);
|
||||
NodeId nodeId4 = NodeId.newInstance("localhost", 4444);
|
||||
|
||||
// If the log aggregation status for all NMs are NOT_START,
|
||||
// the log aggregation status for this app will return NOT_START
|
||||
rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance(
|
||||
rmApp.getApplicationId(), nodeId1, LogAggregationStatus.NOT_START, ""));
|
||||
rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance(
|
||||
rmApp.getApplicationId(), nodeId1, LogAggregationStatus.NOT_START, ""));
|
||||
rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance(
|
||||
rmApp.getApplicationId(), nodeId1, LogAggregationStatus.NOT_START, ""));
|
||||
rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance(
|
||||
rmApp.getApplicationId(), nodeId1, LogAggregationStatus.NOT_START, ""));
|
||||
Assert.assertEquals(LogAggregationStatus.NOT_START,
|
||||
rmApp.getLogAggregationStatusForAppReport());
|
||||
|
||||
rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance(
|
||||
rmApp.getApplicationId(), nodeId1, LogAggregationStatus.NOT_START, ""));
|
||||
rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance(
|
||||
rmApp.getApplicationId(), nodeId1, LogAggregationStatus.RUNNING, ""));
|
||||
rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance(
|
||||
rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, ""));
|
||||
rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance(
|
||||
rmApp.getApplicationId(), nodeId1, LogAggregationStatus.FAILED, ""));
|
||||
Assert.assertEquals(LogAggregationStatus.RUNNING,
|
||||
rmApp.getLogAggregationStatusForAppReport());
|
||||
|
||||
rmApp.handle(new RMAppEvent(rmApp.getApplicationId(), RMAppEventType.KILL));
|
||||
Assert.assertTrue(RMAppImpl.isAppInFinalState(rmApp));
|
||||
|
||||
// If at least of one log aggregation status for one NM is TIME_OUT,
|
||||
// others are SUCCEEDED, the log aggregation status for this app will
|
||||
// return TIME_OUT
|
||||
rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance(
|
||||
rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, ""));
|
||||
rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance(
|
||||
rmApp.getApplicationId(), nodeId1, LogAggregationStatus.TIME_OUT, ""));
|
||||
rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance(
|
||||
rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, ""));
|
||||
rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance(
|
||||
rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, ""));
|
||||
Assert.assertEquals(LogAggregationStatus.TIME_OUT,
|
||||
rmApp.getLogAggregationStatusForAppReport());
|
||||
|
||||
// If the log aggregation status for all NMs are SUCCEEDED and Application
|
||||
// is at the final state, the log aggregation status for this app will
|
||||
// return SUCCEEDED
|
||||
rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance(
|
||||
rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, ""));
|
||||
rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance(
|
||||
rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, ""));
|
||||
rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance(
|
||||
rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, ""));
|
||||
rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance(
|
||||
rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, ""));
|
||||
Assert.assertEquals(LogAggregationStatus.SUCCEEDED,
|
||||
rmApp.getLogAggregationStatusForAppReport());
|
||||
|
||||
rmApp = (RMAppImpl)createRMApp(conf);
|
||||
rmApp.handle(new RMAppEvent(rmApp.getApplicationId(), RMAppEventType.KILL));
|
||||
Assert.assertTrue(RMAppImpl.isAppInFinalState(rmApp));
|
||||
// If at least of one log aggregation status for one NM is FAILED,
|
||||
// others are either SUCCEEDED or TIME_OUT, and this application is
|
||||
// at the final state, the log aggregation status for this app
|
||||
// will return FAILED
|
||||
rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance(
|
||||
rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, ""));
|
||||
rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance(
|
||||
rmApp.getApplicationId(), nodeId1, LogAggregationStatus.TIME_OUT, ""));
|
||||
rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance(
|
||||
rmApp.getApplicationId(), nodeId1, LogAggregationStatus.FAILED, ""));
|
||||
rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance(
|
||||
rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, ""));
|
||||
Assert.assertEquals(LogAggregationStatus.FAILED,
|
||||
rmApp.getLogAggregationStatusForAppReport());
|
||||
|
||||
}
|
||||
|
||||
private RMApp createRMApp(Configuration conf) {
|
||||
ApplicationSubmissionContext submissionContext =
|
||||
ApplicationSubmissionContext.newInstance(appId, "test", "default",
|
||||
|
@ -29,6 +29,7 @@
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.ReservationId;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
@ -277,4 +278,9 @@ public ResourceRequest getAMResourceRequest() {
|
||||
public Map<NodeId, LogAggregationReport> getLogAggregationReportsForApp() {
|
||||
throw new UnsupportedOperationException("Not supported yet.");
|
||||
}
|
||||
|
||||
@Override
|
||||
public LogAggregationStatus getLogAggregationStatusForAppReport() {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
@ -1307,14 +1307,15 @@ public void verifyAppsXML(NodeList nodes, RMApp app) throws JSONException,
|
||||
WebServicesTestUtils.getXmlInt(element, "preemptedResourceMB"),
|
||||
WebServicesTestUtils.getXmlInt(element, "preemptedResourceVCores"),
|
||||
WebServicesTestUtils.getXmlInt(element, "numNonAMContainerPreempted"),
|
||||
WebServicesTestUtils.getXmlInt(element, "numAMContainerPreempted"));
|
||||
WebServicesTestUtils.getXmlInt(element, "numAMContainerPreempted"),
|
||||
WebServicesTestUtils.getXmlString(element, "logAggregationStatus"));
|
||||
}
|
||||
}
|
||||
|
||||
public void verifyAppInfo(JSONObject info, RMApp app) throws JSONException,
|
||||
Exception {
|
||||
|
||||
assertEquals("incorrect number of elements", 27, info.length());
|
||||
assertEquals("incorrect number of elements", 28, info.length());
|
||||
|
||||
verifyAppInfoGeneric(app, info.getString("id"), info.getString("user"),
|
||||
info.getString("name"), info.getString("applicationType"),
|
||||
@ -1329,7 +1330,8 @@ public void verifyAppInfo(JSONObject info, RMApp app) throws JSONException,
|
||||
info.getInt("preemptedResourceMB"),
|
||||
info.getInt("preemptedResourceVCores"),
|
||||
info.getInt("numNonAMContainerPreempted"),
|
||||
info.getInt("numAMContainerPreempted"));
|
||||
info.getInt("numAMContainerPreempted"),
|
||||
info.getString("logAggregationStatus"));
|
||||
}
|
||||
|
||||
public void verifyAppInfoGeneric(RMApp app, String id, String user,
|
||||
@ -1339,7 +1341,8 @@ public void verifyAppInfoGeneric(RMApp app, String id, String user,
|
||||
long elapsedTime, String amHostHttpAddress, String amContainerLogs,
|
||||
int allocatedMB, int allocatedVCores, int numContainers,
|
||||
int preemptedResourceMB, int preemptedResourceVCores,
|
||||
int numNonAMContainerPreempted, int numAMContainerPreempted) throws JSONException,
|
||||
int numNonAMContainerPreempted, int numAMContainerPreempted,
|
||||
String logAggregationStatus) throws JSONException,
|
||||
Exception {
|
||||
|
||||
WebServicesTestUtils.checkStringMatch("id", app.getApplicationId()
|
||||
@ -1386,6 +1389,9 @@ public void verifyAppInfoGeneric(RMApp app, String id, String user,
|
||||
assertEquals("numAMContainerPreempted doesn't match", app
|
||||
.getRMAppMetrics().getNumAMContainersPreempted(),
|
||||
numAMContainerPreempted);
|
||||
assertEquals("Log aggregation Status doesn't match", app
|
||||
.getLogAggregationStatusForAppReport().toString(),
|
||||
logAggregationStatus);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
Loading…
Reference in New Issue
Block a user