From 29ddce96a26c45fc747408801bf65255fbd9990f Mon Sep 17 00:00:00 2001 From: Hitesh Shah Date: Wed, 23 Jan 2013 00:57:37 +0000 Subject: [PATCH] YARN-231. RM Restart - Add FS-based persistent store implementation for RMStateStore. Contributed by Bikas Saha git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1437245 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 + .../api/records/ApplicationAttemptId.java | 6 +- .../yarn/api/records/ApplicationId.java | 6 +- .../hadoop/yarn/conf/YarnConfiguration.java | 4 + .../src/main/resources/yarn-default.xml | 11 + .../pom.xml | 6 + .../recovery/FileSystemRMStateStore.java | 233 ++++++++++++++ .../recovery/NullRMStateStore.java | 4 +- .../recovery/TestRMStateStore.java | 284 ++++++++++++++++++ 9 files changed, 552 insertions(+), 5 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 5bbfb9659d..5df7feb2ac 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -37,6 +37,9 @@ Release 2.0.3-alpha - Unreleased YARN-328. Use token request messages defined in hadoop common. (suresh) + YARN-231. RM Restart - Add FS-based persistent store implementation for + RMStateStore (Bikas Saha via hitesh) + IMPROVEMENTS YARN-223. Update process tree instead of getting new process trees. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationAttemptId.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationAttemptId.java index 58baec2d84..063a878b7b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationAttemptId.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationAttemptId.java @@ -38,6 +38,8 @@ public abstract class ApplicationAttemptId implements Comparable { + public static final String appAttemptIdStrPrefix = "appattempt_"; + /** * Get the ApplicationId of the ApplicationAttempId. * @return ApplicationId of the ApplicationAttempId @@ -111,11 +113,11 @@ public int compareTo(ApplicationAttemptId other) { @Override public String toString() { - StringBuilder sb = new StringBuilder("appattempt_"); + StringBuilder sb = new StringBuilder(appAttemptIdStrPrefix); sb.append(this.getApplicationId().getClusterTimestamp()).append("_"); sb.append(ApplicationId.appIdFormat.get().format( this.getApplicationId().getId())); sb.append("_").append(attemptIdFormat.get().format(getAttemptId())); return sb.toString(); } -} \ No newline at end of file +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationId.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationId.java index 21c6502e71..45aa015779 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationId.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationId.java @@ -38,6 +38,8 @@ @Stable public abstract class ApplicationId implements Comparable { + public static final String appIdStrPrefix = "application_"; + /** * Get the short integer identifier of the ApplicationId * which is unique for all applications started by a particular instance @@ -88,7 +90,7 @@ public int compareTo(ApplicationId other) { @Override public String toString() { - return "application_" + this.getClusterTimestamp() + "_" + return appIdStrPrefix + this.getClusterTimestamp() + "_" + appIdFormat.get().format(getId()); } @@ -119,4 +121,4 @@ public boolean equals(Object obj) { return false; return true; } -} \ No newline at end of file +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 832bc0737c..81c1fe933c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -236,6 +236,10 @@ public class YarnConfiguration extends Configuration { /** The class to use as the persistent store.*/ public static final String RM_STORE = RM_PREFIX + "store.class"; + /** URI for FileSystemRMStateStore */ + public static final String FS_RM_STATE_STORE_URI = + RM_PREFIX + "fs.rm-state-store.uri"; + /** The maximum number of completed applications RM keeps. */ public static final String RM_MAX_COMPLETED_APPLICATIONS = RM_PREFIX + "max-completed-applications"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index ab60868380..e28ac43e85 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -230,6 +230,17 @@ The class to use as the persistent store. yarn.resourcemanager.store.class + org.apache.hadoop.yarn.server.resourcemanager.recovery.FileSystemRMStateStore + + + + URI pointing to the location of the FileSystem path where + RM state will be stored. This must be supplied when using + org.apache.hadoop.yarn.server.resourcemanager.recovery.FileSystemRMStateStore + as the value for yarn.resourcemanager.store.class + yarn.resourcemanager.fs.rm-state-store.uri + ${hadoop.tmp.dir}/yarn/system/rmstore + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml index e015f01634..06f58a88e9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml @@ -41,6 +41,12 @@ org.apache.hadoop hadoop-yarn-server-web-proxy + + org.apache.hadoop + hadoop-hdfs + test-jar + test + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java new file mode 100644 index 0000000000..aca84adf0d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java @@ -0,0 +1,233 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.recovery; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptStateDataPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationStateDataPBImpl; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptStateDataProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationStateDataProto; +import org.apache.hadoop.yarn.util.ConverterUtils; + +import com.google.common.annotations.VisibleForTesting; + +@Private +@Unstable +/** + * A simple class for storing RM state in any storage that implements a basic + * FileSystem interface. Does not use directories so that simple key-value + * stores can be used. The retry policy for the real filesystem client must be + * configured separately to enable retry of filesystem operations when needed. + */ +public class FileSystemRMStateStore extends RMStateStore { + + public static final Log LOG = LogFactory.getLog(FileSystemRMStateStore.class); + + private static final String ROOT_DIR_NAME = "FSRMStateRoot"; + + + private FileSystem fs; + + private Path fsRootDirPath; + + @VisibleForTesting + Path fsWorkingPath; + + public synchronized void initInternal(Configuration conf) + throws Exception{ + + fsWorkingPath = new Path(conf.get(YarnConfiguration.FS_RM_STATE_STORE_URI)); + fsRootDirPath = new Path(fsWorkingPath, ROOT_DIR_NAME); + + // create filesystem + fs = fsWorkingPath.getFileSystem(conf); + fs.mkdirs(fsRootDirPath); + } + + @Override + protected synchronized void closeInternal() throws Exception { + fs.close(); + } + + @Override + public synchronized RMState loadState() throws Exception { + try { + RMState state = new RMState(); + FileStatus[] childNodes = fs.listStatus(fsRootDirPath); + List attempts = + new ArrayList(); + for(FileStatus childNodeStatus : childNodes) { + assert childNodeStatus.isFile(); + String childNodeName = childNodeStatus.getPath().getName(); + Path childNodePath = getNodePath(childNodeName); + byte[] childData = readFile(childNodePath, childNodeStatus.getLen()); + if(childNodeName.startsWith(ApplicationId.appIdStrPrefix)){ + // application + LOG.info("Loading application from node: " + childNodeName); + ApplicationId appId = ConverterUtils.toApplicationId(childNodeName); + ApplicationStateDataPBImpl appStateData = + new ApplicationStateDataPBImpl( + ApplicationStateDataProto.parseFrom(childData)); + ApplicationState appState = new ApplicationState( + appStateData.getSubmitTime(), + appStateData.getApplicationSubmissionContext()); + // assert child node name is same as actual applicationId + assert appId.equals(appState.context.getApplicationId()); + state.appState.put(appId, appState); + } else if(childNodeName.startsWith( + ApplicationAttemptId.appAttemptIdStrPrefix)) { + // attempt + LOG.info("Loading application attempt from node: " + childNodeName); + ApplicationAttemptId attemptId = + ConverterUtils.toApplicationAttemptId(childNodeName); + ApplicationAttemptStateDataPBImpl attemptStateData = + new ApplicationAttemptStateDataPBImpl( + ApplicationAttemptStateDataProto.parseFrom(childData)); + ApplicationAttemptState attemptState = new ApplicationAttemptState( + attemptId, attemptStateData.getMasterContainer()); + // assert child node name is same as application attempt id + assert attemptId.equals(attemptState.getAttemptId()); + attempts.add(attemptState); + } else { + LOG.info("Unknown child node with name: " + childNodeName); + } + } + + // go through all attempts and add them to their apps + for(ApplicationAttemptState attemptState : attempts) { + ApplicationId appId = attemptState.getAttemptId().getApplicationId(); + ApplicationState appState = state.appState.get(appId); + if(appState != null) { + appState.attempts.put(attemptState.getAttemptId(), attemptState); + } else { + // the application node may have been removed when the application + // completed but the RM might have stopped before it could remove the + // application attempt nodes + LOG.info("Application node not found for attempt: " + + attemptState.getAttemptId()); + deleteFile(getNodePath(attemptState.getAttemptId().toString())); + } + } + + return state; + } catch (Exception e) { + LOG.error("Failed to load state.", e); + throw e; + } + } + + @Override + public synchronized void storeApplicationState(String appId, + ApplicationStateDataPBImpl appStateDataPB) + throws Exception { + Path nodeCreatePath = getNodePath(appId); + + LOG.info("Storing info for app: " + appId + " at: " + nodeCreatePath); + byte[] appStateData = appStateDataPB.getProto().toByteArray(); + try { + // currently throw all exceptions. May need to respond differently for HA + // based on whether we have lost the right to write to FS + writeFile(nodeCreatePath, appStateData); + } catch (Exception e) { + LOG.info("Error storing info for app: " + appId, e); + throw e; + } + } + + @Override + public synchronized void storeApplicationAttemptState(String attemptId, + ApplicationAttemptStateDataPBImpl attemptStateDataPB) + throws Exception { + Path nodeCreatePath = getNodePath(attemptId); + LOG.info("Storing info for attempt: " + attemptId + + " at: " + nodeCreatePath); + byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray(); + try { + // currently throw all exceptions. May need to respond differently for HA + // based on whether we have lost the right to write to FS + writeFile(nodeCreatePath, attemptStateData); + } catch (Exception e) { + LOG.info("Error storing info for attempt: " + attemptId, e); + throw e; + } + } + + @Override + public synchronized void removeApplicationState(ApplicationState appState) + throws Exception { + String appId = appState.getAppId().toString(); + Path nodeRemovePath = getNodePath(appId); + LOG.info("Removing info for app: " + appId + " at: " + nodeRemovePath); + deleteFile(nodeRemovePath); + for(ApplicationAttemptId attemptId : appState.attempts.keySet()) { + removeApplicationAttemptState(attemptId.toString()); + } + } + + public synchronized void removeApplicationAttemptState(String attemptId) + throws Exception { + Path nodeRemovePath = getNodePath(attemptId); + LOG.info("Removing info for attempt: " + attemptId + + " at: " + nodeRemovePath); + deleteFile(nodeRemovePath); + } + + // FileSystem related code + + private void deleteFile(Path deletePath) throws Exception { + if(!fs.delete(deletePath, true)) { + throw new Exception("Failed to delete " + deletePath); + } + } + + private byte[] readFile(Path inputPath, long len) throws Exception { + FSDataInputStream fsIn = fs.open(inputPath); + // state data will not be that "long" + byte[] data = new byte[(int)len]; + fsIn.readFully(data); + return data; + } + + private void writeFile(Path outputPath, byte[] data) throws Exception { + FSDataOutputStream fsOut = fs.create(outputPath, false); + fsOut.write(data); + fsOut.flush(); + fsOut.close(); + } + + @VisibleForTesting + Path getNodePath(String nodeName) { + return new Path(fsRootDirPath, nodeName); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java index 6b614606c2..9bbdc3af04 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java @@ -18,10 +18,12 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; +import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationStateDataPBImpl; +@Unstable public class NullRMStateStore extends RMStateStore { @Override @@ -36,7 +38,7 @@ protected void closeInternal() throws Exception { @Override public RMState loadState() throws Exception { - return null; + throw new UnsupportedOperationException("Cannot load state from null store"); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java new file mode 100644 index 0000000000..440908feac --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java @@ -0,0 +1,284 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.recovery; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +import java.util.HashMap; +import java.util.Map; + +import org.junit.Test; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStoredEvent; +import org.apache.hadoop.yarn.util.ConverterUtils; + +public class TestRMStateStore { + + public static final Log LOG = LogFactory.getLog(TestRMStateStore.class); + + class TestDispatcher implements Dispatcher, EventHandler { + + ApplicationAttemptId attemptId; + Exception storedException; + + boolean notified = false; + + @SuppressWarnings("rawtypes") + @Override + public void register(Class eventType, EventHandler handler) { + } + + @Override + public void handle(RMAppAttemptStoredEvent event) { + assertEquals(attemptId, event.getApplicationAttemptId()); + assertEquals(storedException, event.getStoredException()); + notified = true; + synchronized (this) { + notifyAll(); + } + } + + @SuppressWarnings("rawtypes") + @Override + public EventHandler getEventHandler() { + return this; + } + + } + + interface RMStateStoreHelper { + RMStateStore getRMStateStore() throws Exception; + void addOrphanAttemptIfNeeded(RMStateStore testStore, + TestDispatcher dispatcher) throws Exception; + boolean isFinalStateValid() throws Exception; + } + + @Test + public void testFSRMStateStore() throws Exception { + HdfsConfiguration conf = new HdfsConfiguration(); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + try { + TestFSRMStateStoreTester fsTester = new TestFSRMStateStoreTester(cluster); + testRMStateStore(fsTester); + } finally { + cluster.shutdown(); + } + } + + class TestFSRMStateStoreTester implements RMStateStoreHelper { + Path workingDirPathURI; + FileSystemRMStateStore store; + MiniDFSCluster cluster; + + class TestFileSystemRMStore extends FileSystemRMStateStore { + TestFileSystemRMStore(Configuration conf) throws Exception { + init(conf); + assertTrue(workingDirPathURI.equals(fsWorkingPath)); + } + } + + public TestFSRMStateStoreTester(MiniDFSCluster cluster) throws Exception { + Path workingDirPath = new Path("/Test"); + this.cluster = cluster; + FileSystem fs = cluster.getFileSystem(); + fs.mkdirs(workingDirPath); + Path clusterURI = new Path(cluster.getURI()); + workingDirPathURI = new Path(clusterURI, workingDirPath); + fs.close(); + } + + @Override + public RMStateStore getRMStateStore() throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + conf.set(YarnConfiguration.FS_RM_STATE_STORE_URI, workingDirPathURI.toString()); + this.store = new TestFileSystemRMStore(conf); + return store; + } + + @Override + public void addOrphanAttemptIfNeeded(RMStateStore testStore, + TestDispatcher dispatcher) throws Exception { + ApplicationAttemptId attemptId = ConverterUtils.toApplicationAttemptId( + "appattempt_1352994193343_0003_000001"); + storeAttempt(testStore, attemptId, + "container_1352994193343_0003_01_000001", dispatcher); + } + + @Override + public boolean isFinalStateValid() throws Exception { + FileSystem fs = cluster.getFileSystem(); + FileStatus[] files = fs.listStatus(workingDirPathURI); + if(files.length == 1) { + // only store root directory should exist + return true; + } + return false; + } + } + + void waitNotify(TestDispatcher dispatcher) { + long startTime = System.currentTimeMillis(); + while(!dispatcher.notified) { + synchronized (dispatcher) { + try { + dispatcher.wait(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + if(System.currentTimeMillis() - startTime > 1000*60) { + fail("Timed out attempt store notification"); + } + } + dispatcher.notified = false; + } + + void storeApp(RMStateStore store, ApplicationId appId, long time) + throws Exception { + ApplicationSubmissionContext context = new ApplicationSubmissionContextPBImpl(); + context.setApplicationId(appId); + + RMApp mockApp = mock(RMApp.class); + when(mockApp.getApplicationId()).thenReturn(appId); + when(mockApp.getSubmitTime()).thenReturn(time); + when(mockApp.getApplicationSubmissionContext()).thenReturn(context); + store.storeApplication(mockApp); + } + + ContainerId storeAttempt(RMStateStore store, ApplicationAttemptId attemptId, + String containerIdStr, TestDispatcher dispatcher) + throws Exception { + + Container container = new ContainerPBImpl(); + container.setId(ConverterUtils.toContainerId(containerIdStr)); + RMAppAttempt mockAttempt = mock(RMAppAttempt.class); + when(mockAttempt.getAppAttemptId()).thenReturn(attemptId); + when(mockAttempt.getMasterContainer()).thenReturn(container); + dispatcher.attemptId = attemptId; + dispatcher.storedException = null; + store.storeApplicationAttempt(mockAttempt); + waitNotify(dispatcher); + return container.getId(); + } + + void testRMStateStore(RMStateStoreHelper stateStoreHelper) throws Exception { + long submitTime = System.currentTimeMillis(); + RMStateStore store = stateStoreHelper.getRMStateStore(); + TestDispatcher dispatcher = new TestDispatcher(); + store.setDispatcher(dispatcher); + + ApplicationAttemptId attemptId1 = ConverterUtils + .toApplicationAttemptId("appattempt_1352994193343_0001_000001"); + ApplicationId appId1 = attemptId1.getApplicationId(); + storeApp(store, appId1, submitTime); + ContainerId containerId1 = storeAttempt(store, attemptId1, + "container_1352994193343_0001_01_000001", dispatcher); + String appAttemptIdStr2 = "appattempt_1352994193343_0001_000002"; + ApplicationAttemptId attemptId2 = + ConverterUtils.toApplicationAttemptId(appAttemptIdStr2); + ContainerId containerId2 = storeAttempt(store, attemptId2, + "container_1352994193343_0001_02_000001", dispatcher); + + ApplicationAttemptId attemptIdRemoved = ConverterUtils + .toApplicationAttemptId("appattempt_1352994193343_0002_000001"); + ApplicationId appIdRemoved = attemptIdRemoved.getApplicationId(); + storeApp(store, appIdRemoved, submitTime); + storeAttempt(store, attemptIdRemoved, + "container_1352994193343_0002_01_000001", dispatcher); + + RMApp mockRemovedApp = mock(RMApp.class); + HashMap attempts = + new HashMap(); + ApplicationSubmissionContext context = new ApplicationSubmissionContextPBImpl(); + context.setApplicationId(appIdRemoved); + when(mockRemovedApp.getSubmitTime()).thenReturn(submitTime); + when(mockRemovedApp.getApplicationSubmissionContext()).thenReturn(context); + when(mockRemovedApp.getAppAttempts()).thenReturn(attempts); + RMAppAttempt mockRemovedAttempt = mock(RMAppAttempt.class); + when(mockRemovedAttempt.getAppAttemptId()).thenReturn(attemptIdRemoved); + attempts.put(attemptIdRemoved, mockRemovedAttempt); + store.removeApplication(mockRemovedApp); + + // add orphan attempt file to simulate incomplete removal of app state + stateStoreHelper.addOrphanAttemptIfNeeded(store, dispatcher); + + // let things settle down + Thread.sleep(1000); + store.close(); + + // load state + store = stateStoreHelper.getRMStateStore(); + RMState state = store.loadState(); + Map rmAppState = state.getApplicationState(); + + // removed app or orphan attempt is not loaded + assertEquals(1, rmAppState.size()); + + ApplicationState appState = rmAppState.get(appId1); + // app is loaded + assertNotNull(appState); + // app is loaded correctly + assertEquals(submitTime, appState.getSubmitTime()); + // submission context is loaded correctly + assertEquals(appId1, + appState.getApplicationSubmissionContext().getApplicationId()); + ApplicationAttemptState attemptState = appState.getAttempt(attemptId1); + // attempt1 is loaded correctly + assertNotNull(attemptState); + assertEquals(attemptId1, attemptState.getAttemptId()); + // attempt1 container is loaded correctly + assertEquals(containerId1, attemptState.getMasterContainer().getId()); + attemptState = appState.getAttempt(attemptId2); + // attempt2 is loaded correctly + assertNotNull(attemptState); + assertEquals(attemptId2, attemptState.getAttemptId()); + // attempt2 container is loaded correctly + assertEquals(containerId2, attemptState.getMasterContainer().getId()); + + // assert store is in expected state after everything is cleaned + assertTrue(stateStoreHelper.isFinalStateValid()); + + store.close(); + } + +}