From cd014d57aa8b896da02b5bcadafbd404bca2bc12 Mon Sep 17 00:00:00 2001 From: Wangda Tan Date: Mon, 27 Mar 2017 13:29:09 -0700 Subject: [PATCH] YARN-6339. Improve performance for createAndGetApplicationReport. (Yunjiong Zhao via wangda) --- .../yarn/api/records/impl/pb/ProtoUtils.java | 6 ++-- .../resourcemanager/rmapp/RMAppImpl.java | 32 +++++++++++-------- .../TestRMAppLogAggregationStatus.java | 2 ++ 3 files changed, 25 insertions(+), 15 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java index ab283e7c83..926c757f42 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java @@ -296,6 +296,8 @@ public static ReservationRequestInterpreter convertFromProtoFormat( * Log Aggregation Status */ private static final String LOG_AGGREGATION_STATUS_PREFIX = "LOG_"; + private static final int LOG_AGGREGATION_STATUS_PREFIX_LEN = + LOG_AGGREGATION_STATUS_PREFIX.length(); public static LogAggregationStatusProto convertToProtoFormat( LogAggregationStatus e) { return LogAggregationStatusProto.valueOf(LOG_AGGREGATION_STATUS_PREFIX @@ -304,8 +306,8 @@ public static LogAggregationStatusProto convertToProtoFormat( public static LogAggregationStatus convertFromProtoFormat( LogAggregationStatusProto e) { - return LogAggregationStatus.valueOf(e.name().replace( - LOG_AGGREGATION_STATUS_PREFIX, "")); + return LogAggregationStatus.valueOf(e.name().substring( + LOG_AGGREGATION_STATUS_PREFIX_LEN)); } /* diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 9f00b2e5fa..f24908bd5e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -34,6 +34,7 @@ import java.util.Map.Entry; import java.util.Set; import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; @@ -177,8 +178,8 @@ public class RMAppImpl implements RMApp, Recoverable { private long logAggregationStartTime = 0; private final long logAggregationStatusTimeout; private final Map logAggregationStatus = - new HashMap(); - private LogAggregationStatus logAggregationStatusForAppReport; + new ConcurrentHashMap(); + private volatile LogAggregationStatus logAggregationStatusForAppReport; private int logAggregationSucceed = 0; private int logAggregationFailed = 0; private Map> logAggregationDiagnosticsForNMs = @@ -1697,26 +1698,23 @@ public ResourceRequest getAMResourceRequest() { public Map getLogAggregationReportsForApp() { try { this.readLock.lock(); - Map outputs = - new HashMap(); - outputs.putAll(logAggregationStatus); - if (!isLogAggregationFinished()) { - for (Entry output : outputs.entrySet()) { + if (!isLogAggregationFinished() && isAppInFinalState(this) && + System.currentTimeMillis() > this.logAggregationStartTime + + this.logAggregationStatusTimeout) { + for (Entry output : + logAggregationStatus.entrySet()) { if (!output.getValue().getLogAggregationStatus() .equals(LogAggregationStatus.TIME_OUT) && !output.getValue().getLogAggregationStatus() .equals(LogAggregationStatus.SUCCEEDED) && !output.getValue().getLogAggregationStatus() - .equals(LogAggregationStatus.FAILED) - && isAppInFinalState(this) - && System.currentTimeMillis() > this.logAggregationStartTime - + this.logAggregationStatusTimeout) { + .equals(LogAggregationStatus.FAILED)) { output.getValue().setLogAggregationStatus( LogAggregationStatus.TIME_OUT); } } } - return outputs; + return Collections.unmodifiableMap(logAggregationStatus); } finally { this.readLock.unlock(); } @@ -1824,11 +1822,17 @@ public LogAggregationStatus getLogAggregationStatusForAppReport() { // the log aggregation is finished. And the log aggregation status will // not be updated anymore. if (logFailedCount > 0 && isAppInFinalState(this)) { + this.logAggregationStatusForAppReport = + LogAggregationStatus.FAILED; return LogAggregationStatus.FAILED; } else if (logTimeOutCount > 0) { + this.logAggregationStatusForAppReport = + LogAggregationStatus.TIME_OUT; return LogAggregationStatus.TIME_OUT; } if (isAppInFinalState(this)) { + this.logAggregationStatusForAppReport = + LogAggregationStatus.SUCCEEDED; return LogAggregationStatus.SUCCEEDED; } } else if (logRunningWithFailure > 0) { @@ -1844,7 +1848,9 @@ private boolean isLogAggregationFinished() { return this.logAggregationStatusForAppReport .equals(LogAggregationStatus.SUCCEEDED) || this.logAggregationStatusForAppReport - .equals(LogAggregationStatus.FAILED); + .equals(LogAggregationStatus.FAILED) + || this.logAggregationStatusForAppReport + .equals(LogAggregationStatus.TIME_OUT); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java index 55a4eac4ee..677990b8c9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java @@ -413,6 +413,8 @@ public void testGetLogAggregationStatusForAppReport() { Assert.assertEquals(LogAggregationStatus.TIME_OUT, rmApp.getLogAggregationStatusForAppReport()); + rmApp = (RMAppImpl)createRMApp(conf); + rmApp.handle(new RMAppEvent(rmApp.getApplicationId(), RMAppEventType.KILL)); // If the log aggregation status for all NMs are SUCCEEDED and Application // is at the final state, the log aggregation status for this app will // return SUCCEEDED