diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index bb3d67a836..17bf08cd49 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -202,6 +202,9 @@ Release 2.6.0 - UNRELEASED MAPREDUCE-6021. MR AM should have working directory in LD_LIBRARY_PATH (jlowe) + MAPREDUCE-6010. HistoryServerFileSystemStateStore fails to update tokens + (jlowe) + Release 2.5.0 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerFileSystemStateStoreService.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerFileSystemStateStoreService.java index d92a9af4b0..dcea333b5f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerFileSystemStateStoreService.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerFileSystemStateStoreService.java @@ -24,6 +24,8 @@ import java.io.DataOutputStream; import java.io.FileNotFoundException; import java.io.IOException; +import java.util.HashSet; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -64,6 +66,7 @@ public class HistoryServerFileSystemStateStoreService private static final String TOKEN_MASTER_KEY_FILE_PREFIX = "key_"; private static final String TOKEN_FILE_PREFIX = "token_"; private static final String TMP_FILE_PREFIX = "tmp-"; + private static final String UPDATE_TMP_FILE_PREFIX = "update-"; private static final FsPermission DIR_PERMISSIONS = new FsPermission((short)0700); private static final FsPermission FILE_PERMISSIONS = Shell.WINDOWS @@ -90,7 +93,7 @@ protected void initStorage(Configuration conf) @Override protected void startStorage() throws IOException { - fs = rootStatePath.getFileSystem(getConfig()); + fs = createFileSystem(); createDir(rootStatePath); tokenStatePath = new Path(rootStatePath, TOKEN_STATE_DIR_NAME); createDir(tokenStatePath); @@ -101,6 +104,10 @@ protected void startStorage() throws IOException { } } + FileSystem createFileSystem() throws IOException { + return rootStatePath.getFileSystem(getConfig()); + } + @Override protected void closeStorage() throws IOException { // don't close the filesystem as it's part of the filesystem cache @@ -127,7 +134,7 @@ public void storeToken(MRDelegationTokenIdentifier tokenId, throw new IOException(tokenPath + " already exists"); } - createFile(tokenPath, buildTokenData(tokenId, renewDate)); + createNewFile(tokenPath, buildTokenData(tokenId, renewDate)); } @Override @@ -136,7 +143,25 @@ public void updateToken(MRDelegationTokenIdentifier tokenId, if (LOG.isDebugEnabled()) { LOG.debug("Updating token " + tokenId.getSequenceNumber()); } - createFile(getTokenPath(tokenId), buildTokenData(tokenId, renewDate)); + + // Files cannot be atomically replaced, therefore we write a temporary + // update file, remove the original token file, then rename the update + // file to the token file. During recovery either the token file will be + // used or if that is missing and an update file is present then the + // update file is used. + Path tokenPath = getTokenPath(tokenId); + Path tmp = new Path(tokenPath.getParent(), + UPDATE_TMP_FILE_PREFIX + tokenPath.getName()); + writeFile(tmp, buildTokenData(tokenId, renewDate)); + try { + deleteFile(tokenPath); + } catch (IOException e) { + fs.delete(tmp, false); + throw e; + } + if (!fs.rename(tmp, tokenPath)) { + throw new IOException("Could not rename " + tmp + " to " + tokenPath); + } } @Override @@ -168,7 +193,7 @@ public void storeTokenMasterKey(DelegationKey key) throws IOException { IOUtils.cleanup(LOG, dataStream); } - createFile(keyPath, memStream.toByteArray()); + createNewFile(keyPath, memStream.toByteArray()); } @Override @@ -213,23 +238,33 @@ private void createDir(Path dir) throws IOException { } } - private void createFile(Path file, byte[] data) throws IOException { - final int WRITE_BUFFER_SIZE = 4096; + private void createNewFile(Path file, byte[] data) + throws IOException { Path tmp = new Path(file.getParent(), TMP_FILE_PREFIX + file.getName()); - FSDataOutputStream out = fs.create(tmp, FILE_PERMISSIONS, true, - WRITE_BUFFER_SIZE, fs.getDefaultReplication(tmp), - fs.getDefaultBlockSize(tmp), null); + writeFile(tmp, data); + try { + if (!fs.rename(tmp, file)) { + throw new IOException("Could not rename " + tmp + " to " + file); + } + } catch (IOException e) { + fs.delete(tmp, false); + throw e; + } + } + + private void writeFile(Path file, byte[] data) throws IOException { + final int WRITE_BUFFER_SIZE = 4096; + FSDataOutputStream out = fs.create(file, FILE_PERMISSIONS, true, + WRITE_BUFFER_SIZE, fs.getDefaultReplication(file), + fs.getDefaultBlockSize(file), null); try { try { out.write(data); } finally { IOUtils.cleanup(LOG, out); } - if (!fs.rename(tmp, file)) { - throw new IOException("Could not rename " + tmp + " to " + file); - } } catch (IOException e) { - fs.delete(tmp, false); + fs.delete(file, false); throw e; } } @@ -284,6 +319,19 @@ private void loadTokenMasterKey(HistoryServerState state, Path keyFile, state.tokenMasterKeyState.add(key); } + private void loadTokenFromBucket(int bucketId, + HistoryServerState state, Path tokenFile, long numTokenFileBytes) + throws IOException { + MRDelegationTokenIdentifier token = + loadToken(state, tokenFile, numTokenFileBytes); + int tokenBucketId = getBucketId(token); + if (tokenBucketId != bucketId) { + throw new IOException("Token " + tokenFile + + " should be in bucket " + tokenBucketId + ", found in bucket " + + bucketId); + } + } + private MRDelegationTokenIdentifier loadToken(HistoryServerState state, Path tokenFile, long numTokenFileBytes) throws IOException { MRDelegationTokenIdentifier tokenId = new MRDelegationTokenIdentifier(); @@ -308,18 +356,29 @@ private int loadTokensFromBucket(HistoryServerState state, Path bucket) final int bucketId = Integer.parseInt(numStr); int numTokens = 0; FileStatus[] tokenStats = fs.listStatus(bucket); + Set loadedTokens = new HashSet(tokenStats.length); for (FileStatus stat : tokenStats) { String name = stat.getPath().getName(); if (name.startsWith(TOKEN_FILE_PREFIX)) { - MRDelegationTokenIdentifier token = - loadToken(state, stat.getPath(), stat.getLen()); - int tokenBucketId = getBucketId(token); - if (tokenBucketId != bucketId) { - throw new IOException("Token " + stat.getPath() - + " should be in bucket " + tokenBucketId + ", found in bucket " - + bucketId); - } + loadTokenFromBucket(bucketId, state, stat.getPath(), stat.getLen()); + loadedTokens.add(name); ++numTokens; + } else if (name.startsWith(UPDATE_TMP_FILE_PREFIX)) { + String tokenName = name.substring(UPDATE_TMP_FILE_PREFIX.length()); + if (loadedTokens.contains(tokenName)) { + // already have the token, update may be partial so ignore it + fs.delete(stat.getPath(), false); + } else { + // token is missing, so try to parse the update temp file + loadTokenFromBucket(bucketId, state, stat.getPath(), stat.getLen()); + fs.rename(stat.getPath(), + new Path(stat.getPath().getParent(), tokenName)); + loadedTokens.add(tokenName); + ++numTokens; + } + } else if (name.startsWith(TMP_FILE_PREFIX)) { + // cleanup incomplete temp files + fs.delete(stat.getPath(), false); } else { LOG.warn("Skipping unexpected file in history server token bucket: " + stat.getPath()); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryServerFileSystemStateStoreService.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryServerFileSystemStateStoreService.java index c915996331..0a790028a1 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryServerFileSystemStateStoreService.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryServerFileSystemStateStoreService.java @@ -21,12 +21,19 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Mockito.argThat; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.isA; +import static org.mockito.Mockito.spy; import java.io.File; import java.io.IOException; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.v2.api.MRDelegationTokenIdentifier; import org.apache.hadoop.mapreduce.v2.hs.HistoryServerStateStoreService.HistoryServerState; @@ -35,6 +42,7 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.mockito.ArgumentMatcher; public class TestHistoryServerFileSystemStateStoreService { @@ -74,8 +82,8 @@ private HistoryServerStateStoreService createAndStartStore() return store; } - @Test - public void testTokenStore() throws IOException { + private void testTokenStore(String stateStoreUri) throws IOException { + conf.set(JHAdminConfig.MR_HS_FS_STATE_STORE_URI, stateStoreUri); HistoryServerStateStoreService store = createAndStartStore(); HistoryServerState state = store.loadState(); @@ -161,4 +169,77 @@ public void testTokenStore() throws IOException { assertTrue("missing master key 3", state.tokenMasterKeyState.contains(key3)); } + + @Test + public void testTokenStore() throws IOException { + testTokenStore(testDir.getAbsoluteFile().toURI().toString()); + } + + @Test + public void testTokenStoreHdfs() throws IOException { + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); + conf = cluster.getConfiguration(0); + try { + testTokenStore("/tmp/historystore"); + } finally { + cluster.shutdown(); + } + } + + @Test + public void testUpdatedTokenRecovery() throws IOException { + IOException intentionalErr = new IOException("intentional error"); + FileSystem fs = FileSystem.getLocal(conf); + final FileSystem spyfs = spy(fs); + // make the update token process fail halfway through where we're left + // with just the temporary update file and no token file + ArgumentMatcher updateTmpMatcher = new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + if (argument instanceof Path) { + return ((Path) argument).getName().startsWith("update"); + } + return false; + } + }; + doThrow(intentionalErr) + .when(spyfs).rename(argThat(updateTmpMatcher), isA(Path.class)); + + conf.set(JHAdminConfig.MR_HS_FS_STATE_STORE_URI, + testDir.getAbsoluteFile().toURI().toString()); + HistoryServerStateStoreService store = + new HistoryServerFileSystemStateStoreService() { + @Override + FileSystem createFileSystem() throws IOException { + return spyfs; + } + }; + store.init(conf); + store.start(); + + final MRDelegationTokenIdentifier token1 = + new MRDelegationTokenIdentifier(new Text("tokenOwner1"), + new Text("tokenRenewer1"), new Text("tokenUser1")); + token1.setSequenceNumber(1); + final Long tokenDate1 = 1L; + store.storeToken(token1, tokenDate1); + final Long newTokenDate1 = 975318642L; + try { + store.updateToken(token1, newTokenDate1); + fail("intentional error not thrown"); + } catch (IOException e) { + assertEquals(intentionalErr, e); + } + store.close(); + + // verify the update file is seen and parsed upon recovery when + // original token file is missing + store = createAndStartStore(); + HistoryServerState state = store.loadState(); + assertEquals("incorrect loaded token count", 1, state.tokenState.size()); + assertTrue("missing token 1", state.tokenState.containsKey(token1)); + assertEquals("incorrect token 1 date", newTokenDate1, + state.tokenState.get(token1)); + store.close(); + } }