From 7f97fd131981e88ffc76c11a01ea9d190576a6d6 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Sat, 19 Oct 2013 18:14:14 +0000 Subject: [PATCH] YARN-1185. Fixed FileSystemRMStateStore to not leave partial files that prevent subsequent ResourceManager recovery. Contributed by Omkar Vinit Joshi. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1533803 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 + .../recovery/FileSystemRMStateStore.java | 31 ++++- ...teStore.java => RMStateStoreTestBase.java} | 103 +-------------- .../recovery/TestFSRMStateStore.java | 120 ++++++++++++++++++ .../recovery/TestZKRMStateStore.java | 80 ++++++++++++ ...TestZKRMStateStoreZKClientConnections.java | 13 +- 6 files changed, 245 insertions(+), 105 deletions(-) rename hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/{TestRMStateStore.java => RMStateStoreTestBase.java} (80%) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 1d7cdba261..430734b357 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -120,6 +120,9 @@ Release 2.2.1 - UNRELEASED YARN-1295. In UnixLocalWrapperScriptBuilder, using bash -c can cause Text file busy errors (Sandy Ryza) + YARN-1185. Fixed FileSystemRMStateStore to not leave partial files that + prevent subsequent ResourceManager recovery. (Omkar Vinit Joshi via vinodkv) + Release 2.2.0 - 2013-10-13 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java index 062f5cc553..e85ba924a1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java @@ -22,6 +22,7 @@ import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; +import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -118,6 +119,9 @@ private void loadRMAppState(RMState rmState) throws Exception { for (FileStatus childNodeStatus : fs.listStatus(appDir.getPath())) { assert childNodeStatus.isFile(); String childNodeName = childNodeStatus.getPath().getName(); + if (checkAndRemovePartialRecord(childNodeStatus.getPath())) { + continue; + } byte[] childData = readFile(childNodeStatus.getPath(), childNodeStatus.getLen()); if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) { @@ -178,12 +182,28 @@ private void loadRMAppState(RMState rmState) throws Exception { } } + private boolean checkAndRemovePartialRecord(Path record) throws IOException { + // If the file ends with .tmp then it shows that it failed + // during saving state into state store. The file will be deleted as a + // part of this call + if (record.getName().endsWith(".tmp")) { + LOG.error("incomplete rm state store entry found :" + + record); + fs.delete(record, false); + return true; + } + return false; + } + private void loadRMDTSecretManagerState(RMState rmState) throws Exception { FileStatus[] childNodes = fs.listStatus(rmDTSecretManagerRoot); for(FileStatus childNodeStatus : childNodes) { assert childNodeStatus.isFile(); String childNodeName = childNodeStatus.getPath().getName(); + if (checkAndRemovePartialRecord(childNodeStatus.getPath())) { + continue; + } if(childNodeName.startsWith(DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX)) { rmState.rmSecretManagerState.dtSequenceNumber = Integer.parseInt(childNodeName.split("_")[1]); @@ -344,10 +364,19 @@ private byte[] readFile(Path inputPath, long len) throws Exception { return data; } + /* + * In order to make this write atomic as a part of write we will first write + * data to .tmp file and then rename it. Here we are assuming that rename is + * atomic for underlying file system. + */ private void writeFile(Path outputPath, byte[] data) throws Exception { - FSDataOutputStream fsOut = fs.create(outputPath, false); + Path tempPath = + new Path(outputPath.getParent(), outputPath.getName() + ".tmp"); + FSDataOutputStream fsOut = null; + fsOut = fs.create(tempPath, false); fsOut.write(data); fsOut.close(); + fs.rename(tempPath, outputPath); } private boolean renameFile(Path src, Path dst) throws Exception { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java similarity index 80% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java index d75fc7d9e1..72ef37fa23 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java @@ -39,6 +39,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -75,9 +76,9 @@ import org.junit.Test; -public class TestRMStateStore extends ClientBaseWithFixes{ +public class RMStateStoreTestBase extends ClientBaseWithFixes{ - public static final Log LOG = LogFactory.getLog(TestRMStateStore.class); + public static final Log LOG = LogFactory.getLog(RMStateStoreTestBase.class); static class TestDispatcher implements Dispatcher, EventHandler { @@ -116,104 +117,6 @@ interface RMStateStoreHelper { boolean isFinalStateValid() throws Exception; } - @Test - public void testZKRMStateStoreRealZK() throws Exception { - TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester(); - testRMAppStateStore(zkTester); - testRMDTSecretManagerStateStore(zkTester); - } - - @Test - public void testFSRMStateStore() throws Exception { - HdfsConfiguration conf = new HdfsConfiguration(); - MiniDFSCluster cluster = - new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); - try { - TestFSRMStateStoreTester fsTester = new TestFSRMStateStoreTester(cluster); - testRMAppStateStore(fsTester); - testRMDTSecretManagerStateStore(fsTester); - } finally { - cluster.shutdown(); - } - } - - class TestZKRMStateStoreTester implements RMStateStoreHelper { - ZooKeeper client; - ZKRMStateStore store; - - class TestZKRMStateStore extends ZKRMStateStore { - public TestZKRMStateStore(Configuration conf, String workingZnode) - throws Exception { - init(conf); - start(); - assertTrue(znodeWorkingPath.equals(workingZnode)); - } - - @Override - public ZooKeeper getNewZooKeeper() throws IOException { - return client; - } - } - - public RMStateStore getRMStateStore() throws Exception { - String workingZnode = "/Test"; - YarnConfiguration conf = new YarnConfiguration(); - conf.set(YarnConfiguration.ZK_RM_STATE_STORE_ADDRESS, hostPort); - conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode); - this.client = createClient(); - this.store = new TestZKRMStateStore(conf, workingZnode); - return this.store; - } - - @Override - public boolean isFinalStateValid() throws Exception { - List nodes = client.getChildren(store.znodeWorkingPath, false); - return nodes.size() == 1; - } - } - - class TestFSRMStateStoreTester implements RMStateStoreHelper { - Path workingDirPathURI; - FileSystemRMStateStore store; - MiniDFSCluster cluster; - - class TestFileSystemRMStore extends FileSystemRMStateStore { - TestFileSystemRMStore(Configuration conf) throws Exception { - init(conf); - Assert.assertNull(fs); - assertTrue(workingDirPathURI.equals(fsWorkingPath)); - start(); - Assert.assertNotNull(fs); - } - } - - public TestFSRMStateStoreTester(MiniDFSCluster cluster) throws Exception { - Path workingDirPath = new Path("/Test"); - this.cluster = cluster; - FileSystem fs = cluster.getFileSystem(); - fs.mkdirs(workingDirPath); - Path clusterURI = new Path(cluster.getURI()); - workingDirPathURI = new Path(clusterURI, workingDirPath); - fs.close(); - } - - @Override - public RMStateStore getRMStateStore() throws Exception { - YarnConfiguration conf = new YarnConfiguration(); - conf.set(YarnConfiguration.FS_RM_STATE_STORE_URI, - workingDirPathURI.toString()); - this.store = new TestFileSystemRMStore(conf); - return store; - } - - @Override - public boolean isFinalStateValid() throws Exception { - FileSystem fs = cluster.getFileSystem(); - FileStatus[] files = fs.listStatus(workingDirPathURI); - return files.length == 1; - } - } - void waitNotify(TestDispatcher dispatcher) { long startTime = System.currentTimeMillis(); while(!dispatcher.notified) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java new file mode 100644 index 0000000000..a1a6eab3fd --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.recovery; + +import static org.junit.Assert.assertTrue; +import junit.framework.Assert; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.util.ConverterUtils; +import org.junit.Test; + +public class TestFSRMStateStore extends RMStateStoreTestBase { + + public static final Log LOG = LogFactory.getLog(TestFSRMStateStore.class); + + class TestFSRMStateStoreTester implements RMStateStoreHelper { + + Path workingDirPathURI; + FileSystemRMStateStore store; + MiniDFSCluster cluster; + + class TestFileSystemRMStore extends FileSystemRMStateStore { + + TestFileSystemRMStore(Configuration conf) throws Exception { + init(conf); + Assert.assertNull(fs); + assertTrue(workingDirPathURI.equals(fsWorkingPath)); + start(); + Assert.assertNotNull(fs); + } + } + + public TestFSRMStateStoreTester(MiniDFSCluster cluster) throws Exception { + Path workingDirPath = new Path("/Test"); + this.cluster = cluster; + FileSystem fs = cluster.getFileSystem(); + fs.mkdirs(workingDirPath); + Path clusterURI = new Path(cluster.getURI()); + workingDirPathURI = new Path(clusterURI, workingDirPath); + fs.close(); + } + + @Override + public RMStateStore getRMStateStore() throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + conf.set(YarnConfiguration.FS_RM_STATE_STORE_URI, + workingDirPathURI.toString()); + this.store = new TestFileSystemRMStore(conf); + return store; + } + + @Override + public boolean isFinalStateValid() throws Exception { + FileSystem fs = cluster.getFileSystem(); + FileStatus[] files = fs.listStatus(workingDirPathURI); + return files.length == 1; + } + } + + @Test + public void testFSRMStateStore() throws Exception { + HdfsConfiguration conf = new HdfsConfiguration(); + MiniDFSCluster cluster = + new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + try { + TestFSRMStateStoreTester fsTester = new TestFSRMStateStoreTester(cluster); + // If the state store is FileSystemRMStateStore then add corrupted entry. + // It should discard the entry and remove it from file system. + FSDataOutputStream fsOut = null; + FileSystemRMStateStore fileSystemRMStateStore = + (FileSystemRMStateStore) fsTester.getRMStateStore(); + String appAttemptIdStr3 = "appattempt_1352994193343_0001_000003"; + ApplicationAttemptId attemptId3 = + ConverterUtils.toApplicationAttemptId(appAttemptIdStr3); + Path rootDir = + new Path(fileSystemRMStateStore.fsWorkingPath, "FSRMStateRoot"); + Path appRootDir = new Path(rootDir, "RMAppRoot"); + Path appDir = + new Path(appRootDir, attemptId3.getApplicationId().toString()); + Path tempAppAttemptFile = + new Path(appDir, attemptId3.toString() + ".tmp"); + fsOut = fileSystemRMStateStore.fs.create(tempAppAttemptFile, false); + fsOut.write("Some random data ".getBytes()); + fsOut.close(); + + testRMAppStateStore(fsTester); + Assert.assertFalse(fileSystemRMStateStore.fsWorkingPath + .getFileSystem(conf).exists(tempAppAttemptFile)); + testRMDTSecretManagerStateStore(fsTester); + } finally { + cluster.shutdown(); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java new file mode 100644 index 0000000000..a6929a8936 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.recovery; + +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.zookeeper.ZooKeeper; +import org.junit.Test; + +public class TestZKRMStateStore extends RMStateStoreTestBase { + + public static final Log LOG = LogFactory.getLog(TestZKRMStateStore.class); + + class TestZKRMStateStoreTester implements RMStateStoreHelper { + + ZooKeeper client; + ZKRMStateStore store; + + class TestZKRMStateStoreInternal extends ZKRMStateStore { + + public TestZKRMStateStoreInternal(Configuration conf, String workingZnode) + throws Exception { + init(conf); + start(); + assertTrue(znodeWorkingPath.equals(workingZnode)); + } + + @Override + public ZooKeeper getNewZooKeeper() throws IOException { + return client; + } + } + + public RMStateStore getRMStateStore() throws Exception { + String workingZnode = "/Test"; + YarnConfiguration conf = new YarnConfiguration(); + conf.set(YarnConfiguration.ZK_RM_STATE_STORE_ADDRESS, hostPort); + conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode); + this.client = createClient(); + this.store = new TestZKRMStateStoreInternal(conf, workingZnode); + return this.store; + } + + @Override + public boolean isFinalStateValid() throws Exception { + List nodes = client.getChildren(store.znodeWorkingPath, false); + return nodes.size() == 1; + } + } + + @Test + public void testZKRMStateStoreRealZK() throws Exception { + TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester(); + testRMAppStateStore(zkTester); + testRMDTSecretManagerStateStore(zkTester); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java index 7c807a5b60..82e550c917 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java @@ -24,7 +24,7 @@ import org.apache.hadoop.ha.ClientBaseWithFixes; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.TestRMStateStore.TestDispatcher; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreTestBase.TestDispatcher; import org.apache.hadoop.util.ZKUtil; import org.apache.zookeeper.CreateMode; @@ -43,17 +43,20 @@ public class TestZKRMStateStoreZKClientConnections extends ClientBaseWithFixes { + private static final int ZK_OP_WAIT_TIME = 3000; private Log LOG = LogFactory.getLog(TestZKRMStateStoreZKClientConnections.class); class TestZKClient { + ZKRMStateStore store; boolean forExpire = false; TestForwardingWatcher watcher; CyclicBarrier syncBarrier = new CyclicBarrier(2); protected class TestZKRMStateStore extends ZKRMStateStore { + public TestZKRMStateStore(Configuration conf, String workingZnode) throws Exception { init(conf); @@ -87,6 +90,7 @@ public synchronized void processWatchEvent(WatchedEvent event) private class TestForwardingWatcher extends ClientBaseWithFixes.CountdownWatcher { + public void process(WatchedEvent event) { super.process(event); try { @@ -187,7 +191,7 @@ public void testZKSessionTimeout() throws Exception { } } - @Test (timeout = 20000) + @Test(timeout = 20000) public void testSetZKAcl() { TestZKClient zkClientTester = new TestZKClient(); YarnConfiguration conf = new YarnConfiguration(); @@ -196,10 +200,11 @@ public void testSetZKAcl() { zkClientTester.store.zkClient.delete(zkClientTester.store .znodeWorkingPath, -1); fail("Shouldn't be able to delete path"); - } catch (Exception e) {/* expected behavior */} + } catch (Exception e) {/* expected behavior */ + } } - @Test (timeout = 20000) + @Test(timeout = 20000) public void testInvalidZKAclConfiguration() { TestZKClient zkClientTester = new TestZKClient(); YarnConfiguration conf = new YarnConfiguration();