MAPREDUCE-6010. HistoryServerFileSystemStateStore fails to update tokens. Contributed by Jason Lowe

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1617984 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jason Darrell Lowe 2014-08-14 15:55:03 +00:00
parent e7b540eced
commit 9579554988
3 changed files with 166 additions and 23 deletions

View File

@ -202,6 +202,9 @@ Release 2.6.0 - UNRELEASED
MAPREDUCE-6021. MR AM should have working directory in LD_LIBRARY_PATH
(jlowe)
MAPREDUCE-6010. HistoryServerFileSystemStateStore fails to update tokens
(jlowe)
Release 2.5.0 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -24,6 +24,8 @@
import java.io.DataOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -64,6 +66,7 @@ public class HistoryServerFileSystemStateStoreService
private static final String TOKEN_MASTER_KEY_FILE_PREFIX = "key_";
private static final String TOKEN_FILE_PREFIX = "token_";
private static final String TMP_FILE_PREFIX = "tmp-";
private static final String UPDATE_TMP_FILE_PREFIX = "update-";
private static final FsPermission DIR_PERMISSIONS =
new FsPermission((short)0700);
private static final FsPermission FILE_PERMISSIONS = Shell.WINDOWS
@ -90,7 +93,7 @@ protected void initStorage(Configuration conf)
@Override
protected void startStorage() throws IOException {
fs = rootStatePath.getFileSystem(getConfig());
fs = createFileSystem();
createDir(rootStatePath);
tokenStatePath = new Path(rootStatePath, TOKEN_STATE_DIR_NAME);
createDir(tokenStatePath);
@ -101,6 +104,10 @@ protected void startStorage() throws IOException {
}
}
FileSystem createFileSystem() throws IOException {
return rootStatePath.getFileSystem(getConfig());
}
@Override
protected void closeStorage() throws IOException {
// don't close the filesystem as it's part of the filesystem cache
@ -127,7 +134,7 @@ public void storeToken(MRDelegationTokenIdentifier tokenId,
throw new IOException(tokenPath + " already exists");
}
createFile(tokenPath, buildTokenData(tokenId, renewDate));
createNewFile(tokenPath, buildTokenData(tokenId, renewDate));
}
@Override
@ -136,7 +143,25 @@ public void updateToken(MRDelegationTokenIdentifier tokenId,
if (LOG.isDebugEnabled()) {
LOG.debug("Updating token " + tokenId.getSequenceNumber());
}
createFile(getTokenPath(tokenId), buildTokenData(tokenId, renewDate));
// Files cannot be atomically replaced, therefore we write a temporary
// update file, remove the original token file, then rename the update
// file to the token file. During recovery either the token file will be
// used or if that is missing and an update file is present then the
// update file is used.
Path tokenPath = getTokenPath(tokenId);
Path tmp = new Path(tokenPath.getParent(),
UPDATE_TMP_FILE_PREFIX + tokenPath.getName());
writeFile(tmp, buildTokenData(tokenId, renewDate));
try {
deleteFile(tokenPath);
} catch (IOException e) {
fs.delete(tmp, false);
throw e;
}
if (!fs.rename(tmp, tokenPath)) {
throw new IOException("Could not rename " + tmp + " to " + tokenPath);
}
}
@Override
@ -168,7 +193,7 @@ public void storeTokenMasterKey(DelegationKey key) throws IOException {
IOUtils.cleanup(LOG, dataStream);
}
createFile(keyPath, memStream.toByteArray());
createNewFile(keyPath, memStream.toByteArray());
}
@Override
@ -213,23 +238,33 @@ private void createDir(Path dir) throws IOException {
}
}
private void createFile(Path file, byte[] data) throws IOException {
final int WRITE_BUFFER_SIZE = 4096;
private void createNewFile(Path file, byte[] data)
throws IOException {
Path tmp = new Path(file.getParent(), TMP_FILE_PREFIX + file.getName());
FSDataOutputStream out = fs.create(tmp, FILE_PERMISSIONS, true,
WRITE_BUFFER_SIZE, fs.getDefaultReplication(tmp),
fs.getDefaultBlockSize(tmp), null);
writeFile(tmp, data);
try {
if (!fs.rename(tmp, file)) {
throw new IOException("Could not rename " + tmp + " to " + file);
}
} catch (IOException e) {
fs.delete(tmp, false);
throw e;
}
}
private void writeFile(Path file, byte[] data) throws IOException {
final int WRITE_BUFFER_SIZE = 4096;
FSDataOutputStream out = fs.create(file, FILE_PERMISSIONS, true,
WRITE_BUFFER_SIZE, fs.getDefaultReplication(file),
fs.getDefaultBlockSize(file), null);
try {
try {
out.write(data);
} finally {
IOUtils.cleanup(LOG, out);
}
if (!fs.rename(tmp, file)) {
throw new IOException("Could not rename " + tmp + " to " + file);
}
} catch (IOException e) {
fs.delete(tmp, false);
fs.delete(file, false);
throw e;
}
}
@ -284,6 +319,19 @@ private void loadTokenMasterKey(HistoryServerState state, Path keyFile,
state.tokenMasterKeyState.add(key);
}
private void loadTokenFromBucket(int bucketId,
HistoryServerState state, Path tokenFile, long numTokenFileBytes)
throws IOException {
MRDelegationTokenIdentifier token =
loadToken(state, tokenFile, numTokenFileBytes);
int tokenBucketId = getBucketId(token);
if (tokenBucketId != bucketId) {
throw new IOException("Token " + tokenFile
+ " should be in bucket " + tokenBucketId + ", found in bucket "
+ bucketId);
}
}
private MRDelegationTokenIdentifier loadToken(HistoryServerState state,
Path tokenFile, long numTokenFileBytes) throws IOException {
MRDelegationTokenIdentifier tokenId = new MRDelegationTokenIdentifier();
@ -308,18 +356,29 @@ private int loadTokensFromBucket(HistoryServerState state, Path bucket)
final int bucketId = Integer.parseInt(numStr);
int numTokens = 0;
FileStatus[] tokenStats = fs.listStatus(bucket);
Set<String> loadedTokens = new HashSet<String>(tokenStats.length);
for (FileStatus stat : tokenStats) {
String name = stat.getPath().getName();
if (name.startsWith(TOKEN_FILE_PREFIX)) {
MRDelegationTokenIdentifier token =
loadToken(state, stat.getPath(), stat.getLen());
int tokenBucketId = getBucketId(token);
if (tokenBucketId != bucketId) {
throw new IOException("Token " + stat.getPath()
+ " should be in bucket " + tokenBucketId + ", found in bucket "
+ bucketId);
}
loadTokenFromBucket(bucketId, state, stat.getPath(), stat.getLen());
loadedTokens.add(name);
++numTokens;
} else if (name.startsWith(UPDATE_TMP_FILE_PREFIX)) {
String tokenName = name.substring(UPDATE_TMP_FILE_PREFIX.length());
if (loadedTokens.contains(tokenName)) {
// already have the token, update may be partial so ignore it
fs.delete(stat.getPath(), false);
} else {
// token is missing, so try to parse the update temp file
loadTokenFromBucket(bucketId, state, stat.getPath(), stat.getLen());
fs.rename(stat.getPath(),
new Path(stat.getPath().getParent(), tokenName));
loadedTokens.add(tokenName);
++numTokens;
}
} else if (name.startsWith(TMP_FILE_PREFIX)) {
// cleanup incomplete temp files
fs.delete(stat.getPath(), false);
} else {
LOG.warn("Skipping unexpected file in history server token bucket: "
+ stat.getPath());

View File

@ -21,12 +21,19 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.argThat;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.isA;
import static org.mockito.Mockito.spy;
import java.io.File;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.v2.api.MRDelegationTokenIdentifier;
import org.apache.hadoop.mapreduce.v2.hs.HistoryServerStateStoreService.HistoryServerState;
@ -35,6 +42,7 @@
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatcher;
public class TestHistoryServerFileSystemStateStoreService {
@ -74,8 +82,8 @@ private HistoryServerStateStoreService createAndStartStore()
return store;
}
@Test
public void testTokenStore() throws IOException {
private void testTokenStore(String stateStoreUri) throws IOException {
conf.set(JHAdminConfig.MR_HS_FS_STATE_STORE_URI, stateStoreUri);
HistoryServerStateStoreService store = createAndStartStore();
HistoryServerState state = store.loadState();
@ -161,4 +169,77 @@ public void testTokenStore() throws IOException {
assertTrue("missing master key 3",
state.tokenMasterKeyState.contains(key3));
}
@Test
public void testTokenStore() throws IOException {
testTokenStore(testDir.getAbsoluteFile().toURI().toString());
}
@Test
public void testTokenStoreHdfs() throws IOException {
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
conf = cluster.getConfiguration(0);
try {
testTokenStore("/tmp/historystore");
} finally {
cluster.shutdown();
}
}
@Test
public void testUpdatedTokenRecovery() throws IOException {
IOException intentionalErr = new IOException("intentional error");
FileSystem fs = FileSystem.getLocal(conf);
final FileSystem spyfs = spy(fs);
// make the update token process fail halfway through where we're left
// with just the temporary update file and no token file
ArgumentMatcher<Path> updateTmpMatcher = new ArgumentMatcher<Path>() {
@Override
public boolean matches(Object argument) {
if (argument instanceof Path) {
return ((Path) argument).getName().startsWith("update");
}
return false;
}
};
doThrow(intentionalErr)
.when(spyfs).rename(argThat(updateTmpMatcher), isA(Path.class));
conf.set(JHAdminConfig.MR_HS_FS_STATE_STORE_URI,
testDir.getAbsoluteFile().toURI().toString());
HistoryServerStateStoreService store =
new HistoryServerFileSystemStateStoreService() {
@Override
FileSystem createFileSystem() throws IOException {
return spyfs;
}
};
store.init(conf);
store.start();
final MRDelegationTokenIdentifier token1 =
new MRDelegationTokenIdentifier(new Text("tokenOwner1"),
new Text("tokenRenewer1"), new Text("tokenUser1"));
token1.setSequenceNumber(1);
final Long tokenDate1 = 1L;
store.storeToken(token1, tokenDate1);
final Long newTokenDate1 = 975318642L;
try {
store.updateToken(token1, newTokenDate1);
fail("intentional error not thrown");
} catch (IOException e) {
assertEquals(intentionalErr, e);
}
store.close();
// verify the update file is seen and parsed upon recovery when
// original token file is missing
store = createAndStartStore();
HistoryServerState state = store.loadState();
assertEquals("incorrect loaded token count", 1, state.tokenState.size());
assertTrue("missing token 1", state.tokenState.containsKey(token1));
assertEquals("incorrect token 1 date", newTokenDate1,
state.tokenState.get(token1));
store.close();
}
}