From c94f2cec3a4d46718aa4cd144fb32e328406b9b3 Mon Sep 17 00:00:00 2001 From: Jian He Date: Mon, 9 Jun 2014 19:44:31 +0000 Subject: [PATCH] Augmented RMStateStore with state machine. Contributed by Binglin Chang. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1601491 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 2 + .../recovery/FileSystemRMStateStore.java | 10 +- .../recovery/MemoryRMStateStore.java | 12 +- .../recovery/NullRMStateStore.java | 12 +- .../recovery/RMStateStore.java | 274 +++++++++++------- .../recovery/ZKRMStateStore.java | 10 +- .../records/ApplicationAttemptStateData.java | 78 +++-- .../records/ApplicationStateData.java | 55 +++- .../pb/ApplicationAttemptStateDataPBImpl.java | 54 ++-- .../impl/pb/ApplicationStateDataPBImpl.java | 46 ++- .../server/resourcemanager/TestRMRestart.java | 14 +- .../recovery/TestFSRMStateStore.java | 7 +- 12 files changed, 355 insertions(+), 219 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index ed85bf2f30..e3a1f6ed59 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -149,6 +149,8 @@ Release 2.5.0 - UNRELEASED YARN-2132. ZKRMStateStore.ZKAction#runWithRetries doesn't log the exception it encounters. (Vamsee Yarlagadda via kasha) + YARN-2030. Augmented RMStateStore with state machine. (Binglin Chang via jianhe) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java index 1f6e175ced..7f4dad83fe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java @@ -47,6 +47,8 @@ import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMStateVersionProto; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; @@ -314,7 +316,7 @@ private void loadRMDTSecretManagerState(RMState rmState) throws Exception { @Override public synchronized void storeApplicationStateInternal(ApplicationId appId, - ApplicationStateDataPBImpl appStateDataPB) throws Exception { + ApplicationStateData appStateDataPB) throws Exception { String appIdStr = appId.toString(); Path appDirPath = getAppDir(rmAppRoot, appIdStr); fs.mkdirs(appDirPath); @@ -334,7 +336,7 @@ public synchronized void storeApplicationStateInternal(ApplicationId appId, @Override public synchronized void updateApplicationStateInternal(ApplicationId appId, - ApplicationStateDataPBImpl appStateDataPB) throws Exception { + ApplicationStateData appStateDataPB) throws Exception { String appIdStr = appId.toString(); Path appDirPath = getAppDir(rmAppRoot, appIdStr); Path nodeCreatePath = getNodePath(appDirPath, appIdStr); @@ -354,7 +356,7 @@ public synchronized void updateApplicationStateInternal(ApplicationId appId, @Override public synchronized void storeApplicationAttemptStateInternal( ApplicationAttemptId appAttemptId, - ApplicationAttemptStateDataPBImpl attemptStateDataPB) + ApplicationAttemptStateData attemptStateDataPB) throws Exception { Path appDirPath = getAppDir(rmAppRoot, appAttemptId.getApplicationId().toString()); @@ -375,7 +377,7 @@ public synchronized void storeApplicationAttemptStateInternal( @Override public synchronized void updateApplicationAttemptStateInternal( ApplicationAttemptId appAttemptId, - ApplicationAttemptStateDataPBImpl attemptStateDataPB) + ApplicationAttemptStateData attemptStateDataPB) throws Exception { Path appDirPath = getAppDir(rmAppRoot, appAttemptId.getApplicationId().toString()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java index c9f3541f53..a43b20da39 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java @@ -32,9 +32,9 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; import com.google.common.annotations.VisibleForTesting; @@ -80,7 +80,7 @@ protected synchronized void closeInternal() throws Exception { @Override public void storeApplicationStateInternal(ApplicationId appId, - ApplicationStateDataPBImpl appStateData) + ApplicationStateData appStateData) throws Exception { ApplicationState appState = new ApplicationState(appStateData.getSubmitTime(), @@ -92,7 +92,7 @@ public void storeApplicationStateInternal(ApplicationId appId, @Override public void updateApplicationStateInternal(ApplicationId appId, - ApplicationStateDataPBImpl appStateData) throws Exception { + ApplicationStateData appStateData) throws Exception { ApplicationState updatedAppState = new ApplicationState(appStateData.getSubmitTime(), appStateData.getStartTime(), @@ -112,7 +112,7 @@ public void updateApplicationStateInternal(ApplicationId appId, @Override public synchronized void storeApplicationAttemptStateInternal( ApplicationAttemptId appAttemptId, - ApplicationAttemptStateDataPBImpl attemptStateData) + ApplicationAttemptStateData attemptStateData) throws Exception { Credentials credentials = null; if(attemptStateData.getAppAttemptTokens() != null){ @@ -137,7 +137,7 @@ public synchronized void storeApplicationAttemptStateInternal( @Override public synchronized void updateApplicationAttemptStateInternal( ApplicationAttemptId appAttemptId, - ApplicationAttemptStateDataPBImpl attemptStateData) + ApplicationAttemptStateData attemptStateData) throws Exception { Credentials credentials = null; if (attemptStateData.getAppAttemptTokens() != null) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java index a12099f46f..6a0426c0e8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java @@ -25,9 +25,9 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; @Unstable public class NullRMStateStore extends RMStateStore { @@ -54,13 +54,13 @@ public RMState loadState() throws Exception { @Override protected void storeApplicationStateInternal(ApplicationId appId, - ApplicationStateDataPBImpl appStateData) throws Exception { + ApplicationStateData appStateData) throws Exception { // Do nothing } @Override protected void storeApplicationAttemptStateInternal(ApplicationAttemptId attemptId, - ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception { + ApplicationAttemptStateData attemptStateData) throws Exception { // Do nothing } @@ -102,13 +102,13 @@ public void removeRMDTMasterKeyState(DelegationKey delegationKey) throws Excepti @Override protected void updateApplicationStateInternal(ApplicationId appId, - ApplicationStateDataPBImpl appStateData) throws Exception { + ApplicationStateData appStateData) throws Exception { // Do nothing } @Override protected void updateApplicationAttemptStateInternal(ApplicationAttemptId attemptId, - ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception { + ApplicationAttemptStateData attemptStateData) throws Exception { } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index fc4537c793..affc6f9d86 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -18,7 +18,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; -import java.nio.ByteBuffer; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -31,7 +30,6 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.Token; @@ -50,6 +48,8 @@ import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEvent; import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEventType; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; @@ -61,6 +61,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent; +import org.apache.hadoop.yarn.state.InvalidStateTransitonException; +import org.apache.hadoop.yarn.state.SingleArcTransition; +import org.apache.hadoop.yarn.state.StateMachine; +import org.apache.hadoop.yarn.state.StateMachineFactory; @Private @Unstable @@ -83,8 +87,163 @@ public abstract class RMStateStore extends AbstractService { public static final Log LOG = LogFactory.getLog(RMStateStore.class); + private enum RMStateStoreState { + DEFAULT + }; + + private static final StateMachineFactory + stateMachineFactory = new StateMachineFactory( + RMStateStoreState.DEFAULT) + .addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT, + RMStateStoreEventType.STORE_APP, new StoreAppTransition()) + .addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT, + RMStateStoreEventType.UPDATE_APP, new UpdateAppTransition()) + .addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT, + RMStateStoreEventType.REMOVE_APP, new RemoveAppTransition()) + .addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT, + RMStateStoreEventType.STORE_APP_ATTEMPT, new StoreAppAttemptTransition()) + .addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT, + RMStateStoreEventType.UPDATE_APP_ATTEMPT, new UpdateAppAttemptTransition()); + + private final StateMachine stateMachine; + + private static class StoreAppTransition + implements SingleArcTransition { + @Override + public void transition(RMStateStore store, RMStateStoreEvent event) { + if (!(event instanceof RMStateStoreAppEvent)) { + // should never happen + LOG.error("Illegal event type: " + event.getClass()); + return; + } + ApplicationState appState = ((RMStateStoreAppEvent) event).getAppState(); + ApplicationId appId = appState.getAppId(); + ApplicationStateData appStateData = ApplicationStateData + .newInstance(appState); + LOG.info("Storing info for app: " + appId); + try { + store.storeApplicationStateInternal(appId, appStateData); + store.notifyDoneStoringApplication(appId, null); + } catch (Exception e) { + LOG.error("Error storing app: " + appId, e); + store.notifyStoreOperationFailed(e); + } + }; + } + + private static class UpdateAppTransition implements + SingleArcTransition { + @Override + public void transition(RMStateStore store, RMStateStoreEvent event) { + if (!(event instanceof RMStateUpdateAppEvent)) { + // should never happen + LOG.error("Illegal event type: " + event.getClass()); + return; + } + ApplicationState appState = ((RMStateUpdateAppEvent) event).getAppState(); + ApplicationId appId = appState.getAppId(); + ApplicationStateData appStateData = ApplicationStateData + .newInstance(appState); + LOG.info("Updating info for app: " + appId); + try { + store.updateApplicationStateInternal(appId, appStateData); + store.notifyDoneUpdatingApplication(appId, null); + } catch (Exception e) { + LOG.error("Error updating app: " + appId, e); + store.notifyStoreOperationFailed(e); + } + }; + } + + private static class RemoveAppTransition implements + SingleArcTransition { + @Override + public void transition(RMStateStore store, RMStateStoreEvent event) { + if (!(event instanceof RMStateStoreRemoveAppEvent)) { + // should never happen + LOG.error("Illegal event type: " + event.getClass()); + return; + } + ApplicationState appState = ((RMStateStoreRemoveAppEvent) event) + .getAppState(); + ApplicationId appId = appState.getAppId(); + LOG.info("Removing info for app: " + appId); + try { + store.removeApplicationStateInternal(appState); + } catch (Exception e) { + LOG.error("Error removing app: " + appId, e); + store.notifyStoreOperationFailed(e); + } + }; + } + + private static class StoreAppAttemptTransition implements + SingleArcTransition { + @Override + public void transition(RMStateStore store, RMStateStoreEvent event) { + if (!(event instanceof RMStateStoreAppAttemptEvent)) { + // should never happen + LOG.error("Illegal event type: " + event.getClass()); + return; + } + ApplicationAttemptState attemptState = + ((RMStateStoreAppAttemptEvent) event).getAppAttemptState(); + try { + ApplicationAttemptStateData attemptStateData = + ApplicationAttemptStateData.newInstance(attemptState); + if (LOG.isDebugEnabled()) { + LOG.debug("Storing info for attempt: " + attemptState.getAttemptId()); + } + store.storeApplicationAttemptStateInternal(attemptState.getAttemptId(), + attemptStateData); + store.notifyDoneStoringApplicationAttempt(attemptState.getAttemptId(), + null); + } catch (Exception e) { + LOG.error("Error storing appAttempt: " + attemptState.getAttemptId(), e); + store.notifyStoreOperationFailed(e); + } + }; + } + + private static class UpdateAppAttemptTransition implements + SingleArcTransition { + @Override + public void transition(RMStateStore store, RMStateStoreEvent event) { + if (!(event instanceof RMStateUpdateAppAttemptEvent)) { + // should never happen + LOG.error("Illegal event type: " + event.getClass()); + return; + } + ApplicationAttemptState attemptState = + ((RMStateUpdateAppAttemptEvent) event).getAppAttemptState(); + try { + ApplicationAttemptStateData attemptStateData = ApplicationAttemptStateData + .newInstance(attemptState); + if (LOG.isDebugEnabled()) { + LOG.debug("Updating info for attempt: " + attemptState.getAttemptId()); + } + store.updateApplicationAttemptStateInternal(attemptState.getAttemptId(), + attemptStateData); + store.notifyDoneUpdatingApplicationAttempt(attemptState.getAttemptId(), + null); + } catch (Exception e) { + LOG.error("Error updating appAttempt: " + attemptState.getAttemptId(), e); + store.notifyStoreOperationFailed(e); + } + }; + } + public RMStateStore() { super(RMStateStore.class.getName()); + stateMachine = stateMachineFactory.make(this); } /** @@ -390,10 +549,10 @@ public synchronized void updateApplicationState(ApplicationState appState) { * application. */ protected abstract void storeApplicationStateInternal(ApplicationId appId, - ApplicationStateDataPBImpl appStateData) throws Exception; + ApplicationStateData appStateData) throws Exception; protected abstract void updateApplicationStateInternal(ApplicationId appId, - ApplicationStateDataPBImpl appStateData) throws Exception; + ApplicationStateData appStateData) throws Exception; @SuppressWarnings("unchecked") /** @@ -428,11 +587,11 @@ public synchronized void updateApplicationAttemptState( */ protected abstract void storeApplicationAttemptStateInternal( ApplicationAttemptId attemptId, - ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception; + ApplicationAttemptStateData attemptStateData) throws Exception; protected abstract void updateApplicationAttemptStateInternal( ApplicationAttemptId attemptId, - ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception; + ApplicationAttemptStateData attemptStateData) throws Exception; /** * RMDTSecretManager call this to store the state of a delegation token @@ -596,105 +755,10 @@ public Credentials getCredentialsFromAppAttempt(RMAppAttempt appAttempt) { // Dispatcher related code protected void handleStoreEvent(RMStateStoreEvent event) { - if (event.getType().equals(RMStateStoreEventType.STORE_APP) - || event.getType().equals(RMStateStoreEventType.UPDATE_APP)) { - ApplicationState appState = null; - if (event.getType().equals(RMStateStoreEventType.STORE_APP)) { - appState = ((RMStateStoreAppEvent) event).getAppState(); - } else { - assert event.getType().equals(RMStateStoreEventType.UPDATE_APP); - appState = ((RMStateUpdateAppEvent) event).getAppState(); - } - - Exception storedException = null; - ApplicationStateDataPBImpl appStateData = - (ApplicationStateDataPBImpl) ApplicationStateDataPBImpl - .newApplicationStateData(appState.getSubmitTime(), - appState.getStartTime(), appState.getUser(), - appState.getApplicationSubmissionContext(), appState.getState(), - appState.getDiagnostics(), appState.getFinishTime()); - - ApplicationId appId = - appState.getApplicationSubmissionContext().getApplicationId(); - - LOG.info("Storing info for app: " + appId); - try { - if (event.getType().equals(RMStateStoreEventType.STORE_APP)) { - storeApplicationStateInternal(appId, appStateData); - notifyDoneStoringApplication(appId, storedException); - } else { - assert event.getType().equals(RMStateStoreEventType.UPDATE_APP); - updateApplicationStateInternal(appId, appStateData); - notifyDoneUpdatingApplication(appId, storedException); - } - } catch (Exception e) { - LOG.error("Error storing/updating app: " + appId, e); - notifyStoreOperationFailed(e); - } - } else if (event.getType().equals(RMStateStoreEventType.STORE_APP_ATTEMPT) - || event.getType().equals(RMStateStoreEventType.UPDATE_APP_ATTEMPT)) { - - ApplicationAttemptState attemptState = null; - if (event.getType().equals(RMStateStoreEventType.STORE_APP_ATTEMPT)) { - attemptState = - ((RMStateStoreAppAttemptEvent) event).getAppAttemptState(); - } else { - assert event.getType().equals(RMStateStoreEventType.UPDATE_APP_ATTEMPT); - attemptState = - ((RMStateUpdateAppAttemptEvent) event).getAppAttemptState(); - } - - Exception storedException = null; - Credentials credentials = attemptState.getAppAttemptCredentials(); - ByteBuffer appAttemptTokens = null; - try { - if (credentials != null) { - DataOutputBuffer dob = new DataOutputBuffer(); - credentials.writeTokenStorageToStream(dob); - appAttemptTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); - } - ApplicationAttemptStateDataPBImpl attemptStateData = - (ApplicationAttemptStateDataPBImpl) ApplicationAttemptStateDataPBImpl - .newApplicationAttemptStateData(attemptState.getAttemptId(), - attemptState.getMasterContainer(), appAttemptTokens, - attemptState.getStartTime(), attemptState.getState(), - attemptState.getFinalTrackingUrl(), - attemptState.getDiagnostics(), - attemptState.getFinalApplicationStatus()); - if (LOG.isDebugEnabled()) { - LOG.debug("Storing info for attempt: " + attemptState.getAttemptId()); - } - if (event.getType().equals(RMStateStoreEventType.STORE_APP_ATTEMPT)) { - storeApplicationAttemptStateInternal(attemptState.getAttemptId(), - attemptStateData); - notifyDoneStoringApplicationAttempt(attemptState.getAttemptId(), - storedException); - } else { - assert event.getType().equals( - RMStateStoreEventType.UPDATE_APP_ATTEMPT); - updateApplicationAttemptStateInternal(attemptState.getAttemptId(), - attemptStateData); - notifyDoneUpdatingApplicationAttempt(attemptState.getAttemptId(), - storedException); - } - } catch (Exception e) { - LOG.error( - "Error storing/updating appAttempt: " + attemptState.getAttemptId(), e); - notifyStoreOperationFailed(e); - } - } else if (event.getType().equals(RMStateStoreEventType.REMOVE_APP)) { - ApplicationState appState = - ((RMStateStoreRemoveAppEvent) event).getAppState(); - ApplicationId appId = appState.getAppId(); - LOG.info("Removing info for app: " + appId); - try { - removeApplicationStateInternal(appState); - } catch (Exception e) { - LOG.error("Error removing app: " + appId, e); - notifyStoreOperationFailed(e); - } - } else { - LOG.error("Unknown RMStateStoreEvent type: " + event.getType()); + try { + this.stateMachine.doTransition(event.getType(), event); + } catch (InvalidStateTransitonException e) { + LOG.error("Can't handle this event at current state", e); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java index 31c8885d4f..63ae990732 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java @@ -49,6 +49,8 @@ import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMStateVersionProto; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.RMZKUtils; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; @@ -551,7 +553,7 @@ private void loadApplicationAttemptState(ApplicationState appState, @Override public synchronized void storeApplicationStateInternal(ApplicationId appId, - ApplicationStateDataPBImpl appStateDataPB) throws Exception { + ApplicationStateData appStateDataPB) throws Exception { String nodeCreatePath = getNodePath(rmAppRoot, appId.toString()); if (LOG.isDebugEnabled()) { @@ -565,7 +567,7 @@ public synchronized void storeApplicationStateInternal(ApplicationId appId, @Override public synchronized void updateApplicationStateInternal(ApplicationId appId, - ApplicationStateDataPBImpl appStateDataPB) throws Exception { + ApplicationStateData appStateDataPB) throws Exception { String nodeUpdatePath = getNodePath(rmAppRoot, appId.toString()); if (LOG.isDebugEnabled()) { @@ -587,7 +589,7 @@ public synchronized void updateApplicationStateInternal(ApplicationId appId, @Override public synchronized void storeApplicationAttemptStateInternal( ApplicationAttemptId appAttemptId, - ApplicationAttemptStateDataPBImpl attemptStateDataPB) + ApplicationAttemptStateData attemptStateDataPB) throws Exception { String appDirPath = getNodePath(rmAppRoot, appAttemptId.getApplicationId().toString()); @@ -605,7 +607,7 @@ public synchronized void storeApplicationAttemptStateInternal( @Override public synchronized void updateApplicationAttemptStateInternal( ApplicationAttemptId appAttemptId, - ApplicationAttemptStateDataPBImpl attemptStateDataPB) + ApplicationAttemptStateData attemptStateDataPB) throws Exception { String appIdStr = appAttemptId.getApplicationId().toString(); String appAttemptIdStr = appAttemptId.toString(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java index 255800e86b..6af048b2e3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java @@ -18,31 +18,73 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery.records; +import java.io.IOException; import java.nio.ByteBuffer; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.util.Records; /* * Contains the state data that needs to be persisted for an ApplicationAttempt */ @Public @Unstable -public interface ApplicationAttemptStateData { - +public abstract class ApplicationAttemptStateData { + public static ApplicationAttemptStateData newInstance( + ApplicationAttemptId attemptId, Container container, + ByteBuffer attemptTokens, long startTime, RMAppAttemptState finalState, + String finalTrackingUrl, String diagnostics, + FinalApplicationStatus amUnregisteredFinalStatus) { + ApplicationAttemptStateData attemptStateData = + Records.newRecord(ApplicationAttemptStateData.class); + attemptStateData.setAttemptId(attemptId); + attemptStateData.setMasterContainer(container); + attemptStateData.setAppAttemptTokens(attemptTokens); + attemptStateData.setState(finalState); + attemptStateData.setFinalTrackingUrl(finalTrackingUrl); + attemptStateData.setDiagnostics(diagnostics); + attemptStateData.setStartTime(startTime); + attemptStateData.setFinalApplicationStatus(amUnregisteredFinalStatus); + return attemptStateData; + } + + public static ApplicationAttemptStateData newInstance( + ApplicationAttemptState attemptState) throws IOException { + Credentials credentials = attemptState.getAppAttemptCredentials(); + ByteBuffer appAttemptTokens = null; + if (credentials != null) { + DataOutputBuffer dob = new DataOutputBuffer(); + credentials.writeTokenStorageToStream(dob); + appAttemptTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); + } + return newInstance(attemptState.getAttemptId(), + attemptState.getMasterContainer(), appAttemptTokens, + attemptState.getStartTime(), attemptState.getState(), + attemptState.getFinalTrackingUrl(), + attemptState.getDiagnostics(), + attemptState.getFinalApplicationStatus()); + } + + public abstract ApplicationAttemptStateDataProto getProto(); + /** * The ApplicationAttemptId for the application attempt * @return ApplicationAttemptId for the application attempt */ @Public @Unstable - public ApplicationAttemptId getAttemptId(); + public abstract ApplicationAttemptId getAttemptId(); - public void setAttemptId(ApplicationAttemptId attemptId); + public abstract void setAttemptId(ApplicationAttemptId attemptId); /* * The master container running the application attempt @@ -50,9 +92,9 @@ public interface ApplicationAttemptStateData { */ @Public @Unstable - public Container getMasterContainer(); + public abstract Container getMasterContainer(); - public void setMasterContainer(Container container); + public abstract void setMasterContainer(Container container); /** * The application attempt tokens that belong to this attempt @@ -60,17 +102,17 @@ public interface ApplicationAttemptStateData { */ @Public @Unstable - public ByteBuffer getAppAttemptTokens(); + public abstract ByteBuffer getAppAttemptTokens(); - public void setAppAttemptTokens(ByteBuffer attemptTokens); + public abstract void setAppAttemptTokens(ByteBuffer attemptTokens); /** * Get the final state of the application attempt. * @return the final state of the application attempt. */ - public RMAppAttemptState getState(); + public abstract RMAppAttemptState getState(); - public void setState(RMAppAttemptState state); + public abstract void setState(RMAppAttemptState state); /** * Get the original not-proxied final tracking url for the @@ -79,34 +121,34 @@ public interface ApplicationAttemptStateData { * @return the original not-proxied final tracking url for the * application */ - public String getFinalTrackingUrl(); + public abstract String getFinalTrackingUrl(); /** * Set the final tracking Url of the AM. * @param url */ - public void setFinalTrackingUrl(String url); + public abstract void setFinalTrackingUrl(String url); /** * Get the diagnositic information of the attempt * @return diagnositic information of the attempt */ - public String getDiagnostics(); + public abstract String getDiagnostics(); - public void setDiagnostics(String diagnostics); + public abstract void setDiagnostics(String diagnostics); /** * Get the start time of the application. * @return start time of the application */ - public long getStartTime(); + public abstract long getStartTime(); - public void setStartTime(long startTime); + public abstract void setStartTime(long startTime); /** * Get the final finish status of the application. * @return final finish status of the application */ - public FinalApplicationStatus getFinalApplicationStatus(); + public abstract FinalApplicationStatus getFinalApplicationStatus(); - public void setFinalApplicationStatus(FinalApplicationStatus finishState); + public abstract void setFinalApplicationStatus(FinalApplicationStatus finishState); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java index 9fce6cf12d..55b726ffd0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java @@ -24,7 +24,10 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.util.Records; /** * Contains all the state data that needs to be stored persistently @@ -32,19 +35,43 @@ */ @Public @Unstable -public interface ApplicationStateData { - +public abstract class ApplicationStateData { + public static ApplicationStateData newInstance(long submitTime, + long startTime, String user, + ApplicationSubmissionContext submissionContext, + RMAppState state, String diagnostics, long finishTime) { + ApplicationStateData appState = Records.newRecord(ApplicationStateData.class); + appState.setSubmitTime(submitTime); + appState.setStartTime(startTime); + appState.setUser(user); + appState.setApplicationSubmissionContext(submissionContext); + appState.setState(state); + appState.setDiagnostics(diagnostics); + appState.setFinishTime(finishTime); + return appState; + } + + public static ApplicationStateData newInstance( + ApplicationState appState) { + return newInstance(appState.getSubmitTime(), appState.getStartTime(), + appState.getUser(), appState.getApplicationSubmissionContext(), + appState.getState(), appState.getDiagnostics(), + appState.getFinishTime()); + } + + public abstract ApplicationStateDataProto getProto(); + /** * The time at which the application was received by the Resource Manager * @return submitTime */ @Public @Unstable - public long getSubmitTime(); + public abstract long getSubmitTime(); @Public @Unstable - public void setSubmitTime(long submitTime); + public abstract void setSubmitTime(long submitTime); /** * Get the start time of the application. @@ -63,11 +90,11 @@ public interface ApplicationStateData { */ @Public @Unstable - public void setUser(String user); + public abstract void setUser(String user); @Public @Unstable - public String getUser(); + public abstract String getUser(); /** * The {@link ApplicationSubmissionContext} for the application @@ -76,34 +103,34 @@ public interface ApplicationStateData { */ @Public @Unstable - public ApplicationSubmissionContext getApplicationSubmissionContext(); + public abstract ApplicationSubmissionContext getApplicationSubmissionContext(); @Public @Unstable - public void setApplicationSubmissionContext( + public abstract void setApplicationSubmissionContext( ApplicationSubmissionContext context); /** * Get the final state of the application. * @return the final state of the application. */ - public RMAppState getState(); + public abstract RMAppState getState(); - public void setState(RMAppState state); + public abstract void setState(RMAppState state); /** * Get the diagnostics information for the application master. * @return the diagnostics information for the application master. */ - public String getDiagnostics(); + public abstract String getDiagnostics(); - public void setDiagnostics(String diagnostics); + public abstract void setDiagnostics(String diagnostics); /** * The finish time of the application. * @return the finish time of the application., */ - public long getFinishTime(); + public abstract long getFinishTime(); - public void setFinishTime(long finishTime); + public abstract void setFinishTime(long finishTime); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java index 75ac2eef9a..e3ebe5e089 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java @@ -25,10 +25,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; -import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; -import org.apache.hadoop.yarn.factories.RecordFactory; -import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.proto.YarnProtos.FinalApplicationStatusProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProtoOrBuilder; @@ -36,12 +33,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; -public class ApplicationAttemptStateDataPBImpl -extends ProtoBase -implements ApplicationAttemptStateData { - private static final RecordFactory recordFactory = RecordFactoryProvider - .getRecordFactory(null); +import com.google.protobuf.TextFormat; +public class ApplicationAttemptStateDataPBImpl extends + ApplicationAttemptStateData { ApplicationAttemptStateDataProto proto = ApplicationAttemptStateDataProto.getDefaultInstance(); ApplicationAttemptStateDataProto.Builder builder = null; @@ -60,7 +55,8 @@ public ApplicationAttemptStateDataPBImpl( this.proto = proto; viaProto = true; } - + + @Override public ApplicationAttemptStateDataProto getProto() { mergeLocalToProto(); proto = viaProto ? proto : builder.build(); @@ -76,7 +72,8 @@ private void mergeLocalToBuilder() { builder.setMasterContainer(((ContainerPBImpl)masterContainer).getProto()); } if(this.appAttemptTokens != null) { - builder.setAppAttemptTokens(convertToProtoFormat(this.appAttemptTokens)); + builder.setAppAttemptTokens(ProtoUtils.convertToProtoFormat( + this.appAttemptTokens)); } } @@ -148,7 +145,8 @@ public ByteBuffer getAppAttemptTokens() { if(!p.hasAppAttemptTokens()) { return null; } - this.appAttemptTokens = convertFromProtoFormat(p.getAppAttemptTokens()); + this.appAttemptTokens = ProtoUtils.convertFromProtoFormat( + p.getAppAttemptTokens()); return appAttemptTokens; } @@ -249,24 +247,26 @@ public void setFinalApplicationStatus(FinalApplicationStatus finishState) { builder.setFinalApplicationStatus(convertToProtoFormat(finishState)); } - public static ApplicationAttemptStateData newApplicationAttemptStateData( - ApplicationAttemptId attemptId, Container container, - ByteBuffer attemptTokens, long startTime, RMAppAttemptState finalState, - String finalTrackingUrl, String diagnostics, - FinalApplicationStatus amUnregisteredFinalStatus) { - ApplicationAttemptStateData attemptStateData = - recordFactory.newRecordInstance(ApplicationAttemptStateData.class); - attemptStateData.setAttemptId(attemptId); - attemptStateData.setMasterContainer(container); - attemptStateData.setAppAttemptTokens(attemptTokens); - attemptStateData.setState(finalState); - attemptStateData.setFinalTrackingUrl(finalTrackingUrl); - attemptStateData.setDiagnostics(diagnostics); - attemptStateData.setStartTime(startTime); - attemptStateData.setFinalApplicationStatus(amUnregisteredFinalStatus); - return attemptStateData; + @Override + public int hashCode() { + return getProto().hashCode(); } + @Override + public boolean equals(Object other) { + if (other == null) + return false; + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + private static String RM_APP_ATTEMPT_PREFIX = "RMATTEMPT_"; public static RMAppAttemptStateProto convertToProtoFormat(RMAppAttemptState e) { return RMAppAttemptStateProto.valueOf(RM_APP_ATTEMPT_PREFIX + e.name()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java index ede8ca7c46..8aaf1a4a7c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java @@ -20,21 +20,15 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; -import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase; -import org.apache.hadoop.yarn.factories.RecordFactory; -import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMAppStateProto; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; -public class ApplicationStateDataPBImpl -extends ProtoBase -implements ApplicationStateData { - private static final RecordFactory recordFactory = RecordFactoryProvider - .getRecordFactory(null); +import com.google.protobuf.TextFormat; +public class ApplicationStateDataPBImpl extends ApplicationStateData { ApplicationStateDataProto proto = ApplicationStateDataProto.getDefaultInstance(); ApplicationStateDataProto.Builder builder = null; @@ -51,7 +45,8 @@ public ApplicationStateDataPBImpl( this.proto = proto; viaProto = true; } - + + @Override public ApplicationStateDataProto getProto() { mergeLocalToProto(); proto = viaProto ? proto : builder.build(); @@ -136,7 +131,7 @@ public ApplicationSubmissionContext getApplicationSubmissionContext() { } applicationSubmissionContext = new ApplicationSubmissionContextPBImpl( - p.getApplicationSubmissionContext()); + p.getApplicationSubmissionContext()); return applicationSubmissionContext; } @@ -200,21 +195,24 @@ public void setFinishTime(long finishTime) { builder.setFinishTime(finishTime); } - public static ApplicationStateData newApplicationStateData(long submitTime, - long startTime, String user, - ApplicationSubmissionContext submissionContext, RMAppState state, - String diagnostics, long finishTime) { + @Override + public int hashCode() { + return getProto().hashCode(); + } - ApplicationStateData appState = - recordFactory.newRecordInstance(ApplicationStateData.class); - appState.setSubmitTime(submitTime); - appState.setStartTime(startTime); - appState.setUser(user); - appState.setApplicationSubmissionContext(submissionContext); - appState.setState(state); - appState.setDiagnostics(diagnostics); - appState.setFinishTime(finishTime); - return appState; + @Override + public boolean equals(Object other) { + if (other == null) + return false; + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); } private static String RM_APP_PREFIX = "RMAPP_"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index 3bdb66c4b4..9c2d87e444 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -84,8 +84,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreEvent; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; @@ -612,7 +612,7 @@ public void testRMRestartWaitForPreviousSucceededAttempt() throws Exception { @Override public void updateApplicationStateInternal(ApplicationId appId, - ApplicationStateDataPBImpl appStateData) throws Exception { + ApplicationStateData appStateData) throws Exception { if (count == 0) { // do nothing; simulate app final state is not saved. LOG.info(appId + " final state is not saved."); @@ -760,14 +760,14 @@ public void testRMRestartKilledAppWithNoAttempts() throws Exception { @Override public synchronized void storeApplicationAttemptStateInternal( ApplicationAttemptId attemptId, - ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception { + ApplicationAttemptStateData attemptStateData) throws Exception { // ignore attempt saving request. } @Override public synchronized void updateApplicationAttemptStateInternal( ApplicationAttemptId attemptId, - ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception { + ApplicationAttemptStateData attemptStateData) throws Exception { // ignore attempt saving request. } }; @@ -1862,7 +1862,7 @@ public class TestMemoryRMStateStore extends MemoryRMStateStore { @Override public void updateApplicationStateInternal(ApplicationId appId, - ApplicationStateDataPBImpl appStateData) throws Exception { + ApplicationStateData appStateData) throws Exception { updateApp = ++count; super.updateApplicationStateInternal(appId, appStateData); } @@ -1871,7 +1871,7 @@ public void updateApplicationStateInternal(ApplicationId appId, public synchronized void updateApplicationAttemptStateInternal( ApplicationAttemptId attemptId, - ApplicationAttemptStateDataPBImpl attemptStateData) + ApplicationAttemptStateData attemptStateData) throws Exception { updateAttempt = ++count; super.updateApplicationAttemptStateInternal(attemptId, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java index 792b73e5a6..da25c5beda 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java @@ -24,7 +24,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.junit.Assert; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -37,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl; @@ -213,9 +213,8 @@ public void run() { try { store.storeApplicationStateInternal( ApplicationId.newInstance(100L, 1), - (ApplicationStateDataPBImpl) ApplicationStateDataPBImpl - .newApplicationStateData(111, 111, "user", null, - RMAppState.ACCEPTED, "diagnostics", 333)); + ApplicationStateData.newInstance(111, 111, "user", null, + RMAppState.ACCEPTED, "diagnostics", 333)); } catch (Exception e) { // TODO 0 datanode exception will not be retried by dfs client, fix // that separately.