Fixing HDFS state-store. Contributed by Arun Suresh.
This commit is contained in:
parent
6b710a42e0
commit
9a2a9553ee
@ -25,6 +25,7 @@
|
|||||||
import java.io.FileNotFoundException;
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.EnumSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
@ -38,6 +39,7 @@
|
|||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.PathFilter;
|
import org.apache.hadoop.fs.PathFilter;
|
||||||
|
import org.apache.hadoop.fs.XAttrSetFlag;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.security.token.delegation.DelegationKey;
|
import org.apache.hadoop.security.token.delegation.DelegationKey;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
@ -84,7 +86,8 @@ public class FileSystemRMStateStore extends RMStateStore {
|
|||||||
protected static final String AMRMTOKEN_SECRET_MANAGER_NODE =
|
protected static final String AMRMTOKEN_SECRET_MANAGER_NODE =
|
||||||
"AMRMTokenSecretManagerNode";
|
"AMRMTokenSecretManagerNode";
|
||||||
|
|
||||||
@VisibleForTesting
|
private static final String UNREADABLE_BY_SUPERUSER_XATTRIB =
|
||||||
|
"security.hdfs.unreadable.by.superuser";
|
||||||
protected FileSystem fs;
|
protected FileSystem fs;
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
protected Configuration fsConf;
|
protected Configuration fsConf;
|
||||||
@ -97,6 +100,7 @@ public class FileSystemRMStateStore extends RMStateStore {
|
|||||||
private Path dtSequenceNumberPath = null;
|
private Path dtSequenceNumberPath = null;
|
||||||
private int fsNumRetries;
|
private int fsNumRetries;
|
||||||
private long fsRetryInterval;
|
private long fsRetryInterval;
|
||||||
|
private boolean isHDFS;
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
Path fsWorkingPath;
|
Path fsWorkingPath;
|
||||||
@ -141,11 +145,17 @@ protected synchronized void startInternal() throws Exception {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fs = fsWorkingPath.getFileSystem(fsConf);
|
fs = fsWorkingPath.getFileSystem(fsConf);
|
||||||
|
isHDFS = fs.getScheme().toLowerCase().contains("hdfs");
|
||||||
mkdirsWithRetries(rmDTSecretManagerRoot);
|
mkdirsWithRetries(rmDTSecretManagerRoot);
|
||||||
mkdirsWithRetries(rmAppRoot);
|
mkdirsWithRetries(rmAppRoot);
|
||||||
mkdirsWithRetries(amrmTokenSecretManagerRoot);
|
mkdirsWithRetries(amrmTokenSecretManagerRoot);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
void setIsHDFS(boolean isHDFS) {
|
||||||
|
this.isHDFS = isHDFS;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected synchronized void closeInternal() throws Exception {
|
protected synchronized void closeInternal() throws Exception {
|
||||||
closeWithRetries();
|
closeWithRetries();
|
||||||
@ -175,9 +185,9 @@ protected synchronized void storeVersion() throws Exception {
|
|||||||
byte[] data =
|
byte[] data =
|
||||||
((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray();
|
((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray();
|
||||||
if (existsWithRetries(versionNodePath)) {
|
if (existsWithRetries(versionNodePath)) {
|
||||||
updateFile(versionNodePath, data);
|
updateFile(versionNodePath, data, false);
|
||||||
} else {
|
} else {
|
||||||
writeFileWithRetries(versionNodePath, data);
|
writeFileWithRetries(versionNodePath, data, false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -194,12 +204,12 @@ public synchronized long getAndIncrementEpoch() throws Exception {
|
|||||||
// increment epoch and store it
|
// increment epoch and store it
|
||||||
byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto()
|
byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto()
|
||||||
.toByteArray();
|
.toByteArray();
|
||||||
updateFile(epochNodePath, storeData);
|
updateFile(epochNodePath, storeData, false);
|
||||||
} else {
|
} else {
|
||||||
// initialize epoch file with 1 for the next time.
|
// initialize epoch file with 1 for the next time.
|
||||||
byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto()
|
byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto()
|
||||||
.toByteArray();
|
.toByteArray();
|
||||||
writeFileWithRetries(epochNodePath, storeData);
|
writeFileWithRetries(epochNodePath, storeData, false);
|
||||||
}
|
}
|
||||||
return currentEpoch;
|
return currentEpoch;
|
||||||
}
|
}
|
||||||
@ -253,7 +263,9 @@ private void loadRMAppState(RMState rmState) throws Exception {
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
byte[] childData = readFileWithRetries(childNodeStatus.getPath(),
|
byte[] childData = readFileWithRetries(childNodeStatus.getPath(),
|
||||||
childNodeStatus.getLen());
|
childNodeStatus.getLen());
|
||||||
|
// Set attribute if not already set
|
||||||
|
setUnreadableBySuperuserXattrib(childNodeStatus.getPath());
|
||||||
if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) {
|
if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) {
|
||||||
// application
|
// application
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
@ -326,7 +338,7 @@ public boolean accept(Path path) {
|
|||||||
assert newChildNodeStatus.isFile();
|
assert newChildNodeStatus.isFile();
|
||||||
String newChildNodeName = newChildNodeStatus.getPath().getName();
|
String newChildNodeName = newChildNodeStatus.getPath().getName();
|
||||||
String childNodeName = newChildNodeName.substring(
|
String childNodeName = newChildNodeName.substring(
|
||||||
0, newChildNodeName.length() - ".new".length());
|
0, newChildNodeName.length() - ".new".length());
|
||||||
Path childNodePath =
|
Path childNodePath =
|
||||||
new Path(newChildNodeStatus.getPath().getParent(), childNodeName);
|
new Path(newChildNodeStatus.getPath().getParent(), childNodeName);
|
||||||
replaceFile(newChildNodeStatus.getPath(), childNodePath);
|
replaceFile(newChildNodeStatus.getPath(), childNodePath);
|
||||||
@ -394,7 +406,7 @@ public synchronized void storeApplicationStateInternal(ApplicationId appId,
|
|||||||
try {
|
try {
|
||||||
// currently throw all exceptions. May need to respond differently for HA
|
// currently throw all exceptions. May need to respond differently for HA
|
||||||
// based on whether we have lost the right to write to FS
|
// based on whether we have lost the right to write to FS
|
||||||
writeFileWithRetries(nodeCreatePath, appStateData);
|
writeFileWithRetries(nodeCreatePath, appStateData, true);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.info("Error storing info for app: " + appId, e);
|
LOG.info("Error storing info for app: " + appId, e);
|
||||||
throw e;
|
throw e;
|
||||||
@ -412,7 +424,7 @@ public synchronized void updateApplicationStateInternal(ApplicationId appId,
|
|||||||
try {
|
try {
|
||||||
// currently throw all exceptions. May need to respond differently for HA
|
// currently throw all exceptions. May need to respond differently for HA
|
||||||
// based on whether we have lost the right to write to FS
|
// based on whether we have lost the right to write to FS
|
||||||
updateFile(nodeCreatePath, appStateData);
|
updateFile(nodeCreatePath, appStateData, true);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.info("Error updating info for app: " + appId, e);
|
LOG.info("Error updating info for app: " + appId, e);
|
||||||
throw e;
|
throw e;
|
||||||
@ -433,7 +445,7 @@ public synchronized void storeApplicationAttemptStateInternal(
|
|||||||
try {
|
try {
|
||||||
// currently throw all exceptions. May need to respond differently for HA
|
// currently throw all exceptions. May need to respond differently for HA
|
||||||
// based on whether we have lost the right to write to FS
|
// based on whether we have lost the right to write to FS
|
||||||
writeFileWithRetries(nodeCreatePath, attemptStateData);
|
writeFileWithRetries(nodeCreatePath, attemptStateData, true);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.info("Error storing info for attempt: " + appAttemptId, e);
|
LOG.info("Error storing info for attempt: " + appAttemptId, e);
|
||||||
throw e;
|
throw e;
|
||||||
@ -454,7 +466,7 @@ public synchronized void updateApplicationAttemptStateInternal(
|
|||||||
try {
|
try {
|
||||||
// currently throw all exceptions. May need to respond differently for HA
|
// currently throw all exceptions. May need to respond differently for HA
|
||||||
// based on whether we have lost the right to write to FS
|
// based on whether we have lost the right to write to FS
|
||||||
updateFile(nodeCreatePath, attemptStateData);
|
updateFile(nodeCreatePath, attemptStateData, true);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.info("Error updating info for attempt: " + appAttemptId, e);
|
LOG.info("Error updating info for attempt: " + appAttemptId, e);
|
||||||
throw e;
|
throw e;
|
||||||
@ -483,7 +495,7 @@ public synchronized void storeRMDelegationTokenState(
|
|||||||
public synchronized void removeRMDelegationTokenState(
|
public synchronized void removeRMDelegationTokenState(
|
||||||
RMDelegationTokenIdentifier identifier) throws Exception {
|
RMDelegationTokenIdentifier identifier) throws Exception {
|
||||||
Path nodeCreatePath = getNodePath(rmDTSecretManagerRoot,
|
Path nodeCreatePath = getNodePath(rmDTSecretManagerRoot,
|
||||||
DELEGATION_TOKEN_PREFIX + identifier.getSequenceNumber());
|
DELEGATION_TOKEN_PREFIX + identifier.getSequenceNumber());
|
||||||
LOG.info("Removing RMDelegationToken_" + identifier.getSequenceNumber());
|
LOG.info("Removing RMDelegationToken_" + identifier.getSequenceNumber());
|
||||||
deleteFileWithRetries(nodeCreatePath);
|
deleteFileWithRetries(nodeCreatePath);
|
||||||
}
|
}
|
||||||
@ -505,10 +517,10 @@ private void storeOrUpdateRMDelegationTokenState(
|
|||||||
new RMDelegationTokenIdentifierData(identifier, renewDate);
|
new RMDelegationTokenIdentifierData(identifier, renewDate);
|
||||||
if (isUpdate) {
|
if (isUpdate) {
|
||||||
LOG.info("Updating RMDelegationToken_" + identifier.getSequenceNumber());
|
LOG.info("Updating RMDelegationToken_" + identifier.getSequenceNumber());
|
||||||
updateFile(nodeCreatePath, identifierData.toByteArray());
|
updateFile(nodeCreatePath, identifierData.toByteArray(), true);
|
||||||
} else {
|
} else {
|
||||||
LOG.info("Storing RMDelegationToken_" + identifier.getSequenceNumber());
|
LOG.info("Storing RMDelegationToken_" + identifier.getSequenceNumber());
|
||||||
writeFileWithRetries(nodeCreatePath, identifierData.toByteArray());
|
writeFileWithRetries(nodeCreatePath, identifierData.toByteArray(), true);
|
||||||
|
|
||||||
// store sequence number
|
// store sequence number
|
||||||
Path latestSequenceNumberPath = getNodePath(rmDTSecretManagerRoot,
|
Path latestSequenceNumberPath = getNodePath(rmDTSecretManagerRoot,
|
||||||
@ -539,7 +551,7 @@ public synchronized void storeRMDTMasterKeyState(DelegationKey masterKey)
|
|||||||
try (DataOutputStream fsOut = new DataOutputStream(os)) {
|
try (DataOutputStream fsOut = new DataOutputStream(os)) {
|
||||||
LOG.info("Storing RMDelegationKey_" + masterKey.getKeyId());
|
LOG.info("Storing RMDelegationKey_" + masterKey.getKeyId());
|
||||||
masterKey.write(fsOut);
|
masterKey.write(fsOut);
|
||||||
writeFileWithRetries(nodeCreatePath, os.toByteArray());
|
writeFileWithRetries(nodeCreatePath, os.toByteArray(), true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -572,6 +584,16 @@ private Path getAppDir(Path root, ApplicationId appId) {
|
|||||||
return getNodePath(root, appId.toString());
|
return getNodePath(root, appId.toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
protected Path getAppDir(ApplicationId appId) {
|
||||||
|
return getAppDir(rmAppRoot, appId);
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
protected Path getAppAttemptDir(ApplicationAttemptId appAttId) {
|
||||||
|
return getNodePath(getAppDir(appAttId.getApplicationId()), appAttId
|
||||||
|
.toString());
|
||||||
|
}
|
||||||
// FileSystem related code
|
// FileSystem related code
|
||||||
|
|
||||||
private boolean checkAndRemovePartialRecordWithRetries(final Path record)
|
private boolean checkAndRemovePartialRecordWithRetries(final Path record)
|
||||||
@ -594,12 +616,13 @@ public Void run() throws Exception {
|
|||||||
}.runWithRetries();
|
}.runWithRetries();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void writeFileWithRetries(final Path outputPath,final byte[] data)
|
private void writeFileWithRetries(final Path outputPath, final byte[] data,
|
||||||
throws Exception {
|
final boolean makeUnreadableByAdmin)
|
||||||
|
throws Exception {
|
||||||
new FSAction<Void>() {
|
new FSAction<Void>() {
|
||||||
@Override
|
@Override
|
||||||
public Void run() throws Exception {
|
public Void run() throws Exception {
|
||||||
writeFile(outputPath, data);
|
writeFile(outputPath, data, makeUnreadableByAdmin);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
}.runWithRetries();
|
}.runWithRetries();
|
||||||
@ -746,7 +769,8 @@ private FileStatus getFileStatus(Path path) throws Exception {
|
|||||||
* data to .tmp file and then rename it. Here we are assuming that rename is
|
* data to .tmp file and then rename it. Here we are assuming that rename is
|
||||||
* atomic for underlying file system.
|
* atomic for underlying file system.
|
||||||
*/
|
*/
|
||||||
private void writeFile(Path outputPath, byte[] data) throws Exception {
|
protected void writeFile(Path outputPath, byte[] data, boolean
|
||||||
|
makeUnradableByAdmin) throws Exception {
|
||||||
Path tempPath =
|
Path tempPath =
|
||||||
new Path(outputPath.getParent(), outputPath.getName() + ".tmp");
|
new Path(outputPath.getParent(), outputPath.getName() + ".tmp");
|
||||||
FSDataOutputStream fsOut = null;
|
FSDataOutputStream fsOut = null;
|
||||||
@ -754,6 +778,9 @@ private void writeFile(Path outputPath, byte[] data) throws Exception {
|
|||||||
// final status.
|
// final status.
|
||||||
try {
|
try {
|
||||||
fsOut = fs.create(tempPath, true);
|
fsOut = fs.create(tempPath, true);
|
||||||
|
if (makeUnradableByAdmin) {
|
||||||
|
setUnreadableBySuperuserXattrib(tempPath);
|
||||||
|
}
|
||||||
fsOut.write(data);
|
fsOut.write(data);
|
||||||
fsOut.close();
|
fsOut.close();
|
||||||
fsOut = null;
|
fsOut = null;
|
||||||
@ -768,10 +795,11 @@ private void writeFile(Path outputPath, byte[] data) throws Exception {
|
|||||||
* data to .new file and then rename it. Here we are assuming that rename is
|
* data to .new file and then rename it. Here we are assuming that rename is
|
||||||
* atomic for underlying file system.
|
* atomic for underlying file system.
|
||||||
*/
|
*/
|
||||||
protected void updateFile(Path outputPath, byte[] data) throws Exception {
|
protected void updateFile(Path outputPath, byte[] data, boolean
|
||||||
|
makeUnradableByAdmin) throws Exception {
|
||||||
Path newPath = new Path(outputPath.getParent(), outputPath.getName() + ".new");
|
Path newPath = new Path(outputPath.getParent(), outputPath.getName() + ".new");
|
||||||
// use writeFileWithRetries to make sure .new file is created atomically
|
// use writeFileWithRetries to make sure .new file is created atomically
|
||||||
writeFileWithRetries(newPath, data);
|
writeFileWithRetries(newPath, data, makeUnradableByAdmin);
|
||||||
replaceFile(newPath, outputPath);
|
replaceFile(newPath, outputPath);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -810,9 +838,9 @@ public synchronized void storeOrUpdateAMRMTokenSecretManagerState(
|
|||||||
AMRMTokenSecretManagerState.newInstance(amrmTokenSecretManagerState);
|
AMRMTokenSecretManagerState.newInstance(amrmTokenSecretManagerState);
|
||||||
byte[] stateData = data.getProto().toByteArray();
|
byte[] stateData = data.getProto().toByteArray();
|
||||||
if (isUpdate) {
|
if (isUpdate) {
|
||||||
updateFile(nodeCreatePath, stateData);
|
updateFile(nodeCreatePath, stateData, true);
|
||||||
} else {
|
} else {
|
||||||
writeFileWithRetries(nodeCreatePath, stateData);
|
writeFileWithRetries(nodeCreatePath, stateData, true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -825,4 +853,13 @@ public int getNumRetries() {
|
|||||||
public long getRetryInterval() {
|
public long getRetryInterval() {
|
||||||
return fsRetryInterval;
|
return fsRetryInterval;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void setUnreadableBySuperuserXattrib(Path p)
|
||||||
|
throws IOException {
|
||||||
|
if (isHDFS &&
|
||||||
|
!fs.getXAttrs(p).containsKey(UNREADABLE_BY_SUPERUSER_XATTRIB)) {
|
||||||
|
fs.setXAttr(p, UNREADABLE_BY_SUPERUSER_XATTRIB, null,
|
||||||
|
EnumSet.of(XAttrSetFlag.CREATE));
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -112,6 +112,12 @@ public EventHandler getEventHandler() {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static class StoreStateVerifier {
|
||||||
|
void afterStoreApp(RMStateStore store, ApplicationId appId) {}
|
||||||
|
void afterStoreAppAttempt(RMStateStore store, ApplicationAttemptId
|
||||||
|
appAttId) {}
|
||||||
|
}
|
||||||
|
|
||||||
interface RMStateStoreHelper {
|
interface RMStateStoreHelper {
|
||||||
RMStateStore getRMStateStore() throws Exception;
|
RMStateStore getRMStateStore() throws Exception;
|
||||||
boolean isFinalStateValid() throws Exception;
|
boolean isFinalStateValid() throws Exception;
|
||||||
@ -173,7 +179,7 @@ protected ContainerId storeAttempt(RMStateStore store,
|
|||||||
when(mockAttempt.getRMAppAttemptMetrics())
|
when(mockAttempt.getRMAppAttemptMetrics())
|
||||||
.thenReturn(mockRmAppAttemptMetrics);
|
.thenReturn(mockRmAppAttemptMetrics);
|
||||||
when(mockRmAppAttemptMetrics.getAggregateAppResourceUsage())
|
when(mockRmAppAttemptMetrics.getAggregateAppResourceUsage())
|
||||||
.thenReturn(new AggregateAppResourceUsage(0,0));
|
.thenReturn(new AggregateAppResourceUsage(0, 0));
|
||||||
dispatcher.attemptId = attemptId;
|
dispatcher.attemptId = attemptId;
|
||||||
store.storeNewApplicationAttempt(mockAttempt);
|
store.storeNewApplicationAttempt(mockAttempt);
|
||||||
waitNotify(dispatcher);
|
waitNotify(dispatcher);
|
||||||
@ -181,6 +187,12 @@ protected ContainerId storeAttempt(RMStateStore store,
|
|||||||
}
|
}
|
||||||
|
|
||||||
void testRMAppStateStore(RMStateStoreHelper stateStoreHelper)
|
void testRMAppStateStore(RMStateStoreHelper stateStoreHelper)
|
||||||
|
throws Exception {
|
||||||
|
testRMAppStateStore(stateStoreHelper, new StoreStateVerifier());
|
||||||
|
}
|
||||||
|
|
||||||
|
void testRMAppStateStore(RMStateStoreHelper stateStoreHelper,
|
||||||
|
StoreStateVerifier verifier)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
long submitTime = System.currentTimeMillis();
|
long submitTime = System.currentTimeMillis();
|
||||||
long startTime = System.currentTimeMillis() + 1234;
|
long startTime = System.currentTimeMillis() + 1234;
|
||||||
@ -205,6 +217,7 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper)
|
|||||||
.toApplicationAttemptId("appattempt_1352994193343_0001_000001");
|
.toApplicationAttemptId("appattempt_1352994193343_0001_000001");
|
||||||
ApplicationId appId1 = attemptId1.getApplicationId();
|
ApplicationId appId1 = attemptId1.getApplicationId();
|
||||||
storeApp(store, appId1, submitTime, startTime);
|
storeApp(store, appId1, submitTime, startTime);
|
||||||
|
verifier.afterStoreApp(store, appId1);
|
||||||
|
|
||||||
// create application token and client token key for attempt1
|
// create application token and client token key for attempt1
|
||||||
Token<AMRMTokenIdentifier> appAttemptToken1 =
|
Token<AMRMTokenIdentifier> appAttemptToken1 =
|
||||||
@ -236,6 +249,7 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper)
|
|||||||
storeApp(store, appIdRemoved, submitTime, startTime);
|
storeApp(store, appIdRemoved, submitTime, startTime);
|
||||||
storeAttempt(store, attemptIdRemoved,
|
storeAttempt(store, attemptIdRemoved,
|
||||||
"container_1352994193343_0002_01_000001", null, null, dispatcher);
|
"container_1352994193343_0002_01_000001", null, null, dispatcher);
|
||||||
|
verifier.afterStoreAppAttempt(store, attemptIdRemoved);
|
||||||
|
|
||||||
RMApp mockRemovedApp = mock(RMApp.class);
|
RMApp mockRemovedApp = mock(RMApp.class);
|
||||||
RMAppAttemptMetrics mockRmAppAttemptMetrics =
|
RMAppAttemptMetrics mockRmAppAttemptMetrics =
|
||||||
|
@ -21,8 +21,15 @@
|
|||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.security.PrivilegedExceptionAction;
|
||||||
|
import java.util.LinkedList;
|
||||||
|
import java.util.Queue;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
|
import org.apache.hadoop.security.AccessControlException;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
@ -38,7 +45,6 @@
|
|||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
|
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
|
||||||
import org.apache.hadoop.yarn.server.records.Version;
|
import org.apache.hadoop.yarn.server.records.Version;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.TestZKRMStateStore.TestZKRMStateStoreTester;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||||
@ -56,6 +62,7 @@ class TestFSRMStateStoreTester implements RMStateStoreHelper {
|
|||||||
Path workingDirPathURI;
|
Path workingDirPathURI;
|
||||||
TestFileSystemRMStore store;
|
TestFileSystemRMStore store;
|
||||||
MiniDFSCluster cluster;
|
MiniDFSCluster cluster;
|
||||||
|
boolean adminCheckEnable;
|
||||||
|
|
||||||
class TestFileSystemRMStore extends FileSystemRMStateStore {
|
class TestFileSystemRMStore extends FileSystemRMStateStore {
|
||||||
|
|
||||||
@ -83,8 +90,9 @@ public Path getAppDir(String appId) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public TestFSRMStateStoreTester(MiniDFSCluster cluster) throws Exception {
|
public TestFSRMStateStoreTester(MiniDFSCluster cluster, boolean adminCheckEnable) throws Exception {
|
||||||
Path workingDirPath = new Path("/Test");
|
Path workingDirPath = new Path("/yarn/Test");
|
||||||
|
this.adminCheckEnable = adminCheckEnable;
|
||||||
this.cluster = cluster;
|
this.cluster = cluster;
|
||||||
FileSystem fs = cluster.getFileSystem();
|
FileSystem fs = cluster.getFileSystem();
|
||||||
fs.mkdirs(workingDirPath);
|
fs.mkdirs(workingDirPath);
|
||||||
@ -99,10 +107,10 @@ public RMStateStore getRMStateStore() throws Exception {
|
|||||||
conf.set(YarnConfiguration.FS_RM_STATE_STORE_URI,
|
conf.set(YarnConfiguration.FS_RM_STATE_STORE_URI,
|
||||||
workingDirPathURI.toString());
|
workingDirPathURI.toString());
|
||||||
conf.set(YarnConfiguration.FS_RM_STATE_STORE_RETRY_POLICY_SPEC,
|
conf.set(YarnConfiguration.FS_RM_STATE_STORE_RETRY_POLICY_SPEC,
|
||||||
"100,6000");
|
"100,6000");
|
||||||
conf.setInt(YarnConfiguration.FS_RM_STATE_STORE_NUM_RETRIES, 8);
|
conf.setInt(YarnConfiguration.FS_RM_STATE_STORE_NUM_RETRIES, 8);
|
||||||
conf.setLong(YarnConfiguration.FS_RM_STATE_STORE_RETRY_INTERVAL_MS,
|
conf.setLong(YarnConfiguration.FS_RM_STATE_STORE_RETRY_INTERVAL_MS,
|
||||||
900L);
|
900L);
|
||||||
this.store = new TestFileSystemRMStore(conf);
|
this.store = new TestFileSystemRMStore(conf);
|
||||||
Assert.assertEquals(store.getNumRetries(), 8);
|
Assert.assertEquals(store.getNumRetries(), 8);
|
||||||
Assert.assertEquals(store.getRetryInterval(), 900L);
|
Assert.assertEquals(store.getRetryInterval(), 900L);
|
||||||
@ -111,6 +119,11 @@ public RMStateStore getRMStateStore() throws Exception {
|
|||||||
store.startInternal();
|
store.startInternal();
|
||||||
Assert.assertTrue(store.fs != previousFs);
|
Assert.assertTrue(store.fs != previousFs);
|
||||||
Assert.assertTrue(store.fs.getConf() == store.fsConf);
|
Assert.assertTrue(store.fs.getConf() == store.fsConf);
|
||||||
|
if (adminCheckEnable) {
|
||||||
|
store.setIsHDFS(true);
|
||||||
|
} else {
|
||||||
|
store.setIsHDFS(false);
|
||||||
|
}
|
||||||
return store;
|
return store;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -123,8 +136,9 @@ public boolean isFinalStateValid() throws Exception {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void writeVersion(Version version) throws Exception {
|
public void writeVersion(Version version) throws Exception {
|
||||||
store.updateFile(store.getVersionNode(), ((VersionPBImpl) version)
|
store.updateFile(store.getVersionNode(), ((VersionPBImpl)
|
||||||
.getProto().toByteArray());
|
version)
|
||||||
|
.getProto().toByteArray(), false);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -135,7 +149,7 @@ public Version getCurrentVersion() throws Exception {
|
|||||||
public boolean appExists(RMApp app) throws IOException {
|
public boolean appExists(RMApp app) throws IOException {
|
||||||
FileSystem fs = cluster.getFileSystem();
|
FileSystem fs = cluster.getFileSystem();
|
||||||
Path nodePath =
|
Path nodePath =
|
||||||
store.getAppDir(app.getApplicationId().toString());
|
store.getAppDir(app.getApplicationId().toString());
|
||||||
return fs.exists(nodePath);
|
return fs.exists(nodePath);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -144,28 +158,28 @@ public boolean appExists(RMApp app) throws IOException {
|
|||||||
public void testFSRMStateStore() throws Exception {
|
public void testFSRMStateStore() throws Exception {
|
||||||
HdfsConfiguration conf = new HdfsConfiguration();
|
HdfsConfiguration conf = new HdfsConfiguration();
|
||||||
MiniDFSCluster cluster =
|
MiniDFSCluster cluster =
|
||||||
new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
||||||
try {
|
try {
|
||||||
fsTester = new TestFSRMStateStoreTester(cluster);
|
fsTester = new TestFSRMStateStoreTester(cluster, false);
|
||||||
// If the state store is FileSystemRMStateStore then add corrupted entry.
|
// If the state store is FileSystemRMStateStore then add corrupted entry.
|
||||||
// It should discard the entry and remove it from file system.
|
// It should discard the entry and remove it from file system.
|
||||||
FSDataOutputStream fsOut = null;
|
FSDataOutputStream fsOut = null;
|
||||||
FileSystemRMStateStore fileSystemRMStateStore =
|
FileSystemRMStateStore fileSystemRMStateStore =
|
||||||
(FileSystemRMStateStore) fsTester.getRMStateStore();
|
(FileSystemRMStateStore) fsTester.getRMStateStore();
|
||||||
String appAttemptIdStr3 = "appattempt_1352994193343_0001_000003";
|
String appAttemptIdStr3 = "appattempt_1352994193343_0001_000003";
|
||||||
ApplicationAttemptId attemptId3 =
|
ApplicationAttemptId attemptId3 =
|
||||||
ConverterUtils.toApplicationAttemptId(appAttemptIdStr3);
|
ConverterUtils.toApplicationAttemptId(appAttemptIdStr3);
|
||||||
Path appDir =
|
Path appDir =
|
||||||
fsTester.store.getAppDir(attemptId3.getApplicationId().toString());
|
fsTester.store.getAppDir(attemptId3.getApplicationId().toString());
|
||||||
Path tempAppAttemptFile =
|
Path tempAppAttemptFile =
|
||||||
new Path(appDir, attemptId3.toString() + ".tmp");
|
new Path(appDir, attemptId3.toString() + ".tmp");
|
||||||
fsOut = fileSystemRMStateStore.fs.create(tempAppAttemptFile, false);
|
fsOut = fileSystemRMStateStore.fs.create(tempAppAttemptFile, false);
|
||||||
fsOut.write("Some random data ".getBytes());
|
fsOut.write("Some random data ".getBytes());
|
||||||
fsOut.close();
|
fsOut.close();
|
||||||
|
|
||||||
testRMAppStateStore(fsTester);
|
testRMAppStateStore(fsTester);
|
||||||
Assert.assertFalse(fsTester.workingDirPathURI
|
Assert.assertFalse(fsTester.workingDirPathURI
|
||||||
.getFileSystem(conf).exists(tempAppAttemptFile));
|
.getFileSystem(conf).exists(tempAppAttemptFile));
|
||||||
testRMDTSecretManagerStateStore(fsTester);
|
testRMDTSecretManagerStateStore(fsTester);
|
||||||
testCheckVersion(fsTester);
|
testCheckVersion(fsTester);
|
||||||
testEpoch(fsTester);
|
testEpoch(fsTester);
|
||||||
@ -178,13 +192,110 @@ public void testFSRMStateStore() throws Exception {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testHDFSRMStateStore() throws Exception {
|
||||||
|
final HdfsConfiguration conf = new HdfsConfiguration();
|
||||||
|
UserGroupInformation yarnAdmin =
|
||||||
|
UserGroupInformation.createUserForTesting("yarn",
|
||||||
|
new String[]{"admin"});
|
||||||
|
final MiniDFSCluster cluster =
|
||||||
|
new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
||||||
|
cluster.getFileSystem().mkdir(new Path("/yarn"),
|
||||||
|
FsPermission.valueOf("-rwxrwxrwx"));
|
||||||
|
cluster.getFileSystem().setOwner(new Path("/yarn"), "yarn", "admin");
|
||||||
|
final UserGroupInformation hdfsAdmin = UserGroupInformation.getCurrentUser();
|
||||||
|
final StoreStateVerifier verifier = new StoreStateVerifier() {
|
||||||
|
@Override
|
||||||
|
void afterStoreApp(final RMStateStore store, final ApplicationId appId) {
|
||||||
|
try {
|
||||||
|
// Wait for things to settle
|
||||||
|
Thread.sleep(5000);
|
||||||
|
hdfsAdmin.doAs(
|
||||||
|
new PrivilegedExceptionAction<Void>() {
|
||||||
|
@Override
|
||||||
|
public Void run() throws Exception {
|
||||||
|
verifyFilesUnreadablebyHDFS(cluster,
|
||||||
|
((FileSystemRMStateStore) store).getAppDir
|
||||||
|
(appId));
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
void afterStoreAppAttempt(final RMStateStore store,
|
||||||
|
final ApplicationAttemptId appAttId) {
|
||||||
|
try {
|
||||||
|
// Wait for things to settle
|
||||||
|
Thread.sleep(5000);
|
||||||
|
hdfsAdmin.doAs(
|
||||||
|
new PrivilegedExceptionAction<Void>() {
|
||||||
|
@Override
|
||||||
|
public Void run() throws Exception {
|
||||||
|
verifyFilesUnreadablebyHDFS(cluster,
|
||||||
|
((FileSystemRMStateStore) store)
|
||||||
|
.getAppAttemptDir(appAttId));
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
try {
|
||||||
|
yarnAdmin.doAs(new PrivilegedExceptionAction<Void>() {
|
||||||
|
@Override
|
||||||
|
public Void run() throws Exception {
|
||||||
|
fsTester = new TestFSRMStateStoreTester(cluster, true);
|
||||||
|
testRMAppStateStore(fsTester, verifier);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
} finally {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void verifyFilesUnreadablebyHDFS(MiniDFSCluster cluster,
|
||||||
|
Path root) throws Exception{
|
||||||
|
DistributedFileSystem fs = cluster.getFileSystem();
|
||||||
|
Queue<Path> paths = new LinkedList<>();
|
||||||
|
paths.add(root);
|
||||||
|
while (!paths.isEmpty()) {
|
||||||
|
Path p = paths.poll();
|
||||||
|
FileStatus stat = fs.getFileStatus(p);
|
||||||
|
if (!stat.isDirectory()) {
|
||||||
|
try {
|
||||||
|
LOG.warn("\n\n ##Testing path [" + p + "]\n\n");
|
||||||
|
fs.open(p);
|
||||||
|
Assert.fail("Super user should not be able to read ["+ UserGroupInformation.getCurrentUser() + "] [" + p.getName() + "]");
|
||||||
|
} catch (AccessControlException e) {
|
||||||
|
Assert.assertTrue(e.getMessage().contains("superuser is not allowed to perform this operation"));
|
||||||
|
} catch (Exception e) {
|
||||||
|
Assert.fail("Should get an AccessControlException here");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (stat.isDirectory()) {
|
||||||
|
FileStatus[] ls = fs.listStatus(p);
|
||||||
|
for (FileStatus f : ls) {
|
||||||
|
paths.add(f.getPath());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
@Test(timeout = 60000)
|
@Test(timeout = 60000)
|
||||||
public void testCheckMajorVersionChange() throws Exception {
|
public void testCheckMajorVersionChange() throws Exception {
|
||||||
HdfsConfiguration conf = new HdfsConfiguration();
|
HdfsConfiguration conf = new HdfsConfiguration();
|
||||||
MiniDFSCluster cluster =
|
MiniDFSCluster cluster =
|
||||||
new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
||||||
try {
|
try {
|
||||||
fsTester = new TestFSRMStateStoreTester(cluster) {
|
fsTester = new TestFSRMStateStoreTester(cluster, false) {
|
||||||
Version VERSION_INFO = Version.newInstance(Integer.MAX_VALUE, 0);
|
Version VERSION_INFO = Version.newInstance(Integer.MAX_VALUE, 0);
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -238,14 +349,14 @@ protected void modifyAppState() throws Exception {
|
|||||||
ApplicationAttemptId attemptId1 =
|
ApplicationAttemptId attemptId1 =
|
||||||
ConverterUtils.toApplicationAttemptId(appAttemptIdStr1);
|
ConverterUtils.toApplicationAttemptId(appAttemptIdStr1);
|
||||||
Path appDir =
|
Path appDir =
|
||||||
fsTester.store.getAppDir(attemptId1.getApplicationId().toString());
|
fsTester.store.getAppDir(attemptId1.getApplicationId().toString());
|
||||||
Path appAttemptFile1 =
|
Path appAttemptFile1 =
|
||||||
new Path(appDir, attemptId1.toString() + ".new");
|
new Path(appDir, attemptId1.toString() + ".new");
|
||||||
FileSystemRMStateStore fileSystemRMStateStore =
|
FileSystemRMStateStore fileSystemRMStateStore =
|
||||||
(FileSystemRMStateStore) fsTester.getRMStateStore();
|
(FileSystemRMStateStore) fsTester.getRMStateStore();
|
||||||
fileSystemRMStateStore.renameFile(appAttemptFile1,
|
fileSystemRMStateStore.renameFile(appAttemptFile1,
|
||||||
new Path(appAttemptFile1.getParent(),
|
new Path(appAttemptFile1.getParent(),
|
||||||
appAttemptFile1.getName() + ".new"));
|
appAttemptFile1.getName() + ".new"));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -268,7 +379,7 @@ public void testFSRMStateStoreClientRetry() throws Exception {
|
|||||||
new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
|
new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
try {
|
try {
|
||||||
TestFSRMStateStoreTester fsTester = new TestFSRMStateStoreTester(cluster);
|
TestFSRMStateStoreTester fsTester = new TestFSRMStateStoreTester(cluster, false);
|
||||||
final RMStateStore store = fsTester.getRMStateStore();
|
final RMStateStore store = fsTester.getRMStateStore();
|
||||||
store.setRMDispatcher(new TestDispatcher());
|
store.setRMDispatcher(new TestDispatcher());
|
||||||
final AtomicBoolean assertionFailedInThread = new AtomicBoolean(false);
|
final AtomicBoolean assertionFailedInThread = new AtomicBoolean(false);
|
||||||
|
@ -53,7 +53,7 @@ When there are multiple RMs, the configuration (yarn-site.xml) used by clients a
|
|||||||
|
|
||||||
### Recovering prevous active-RM's state
|
### Recovering prevous active-RM's state
|
||||||
|
|
||||||
With the [ResourceManger Restart](./ResourceManagerRestart.html) enabled, the RM being promoted to an active state loads the RM internal state and continues to operate from where the previous active left off as much as possible depending on the RM restart feature. A new attempt is spawned for each managed application previously submitted to the RM. Applications can checkpoint periodically to avoid losing any work. The state-store must be visible from the both of Active/Standby RMs. Currently, there are two RMStateStore implementations for persistence - FileSystemRMStateStore and ZKRMStateStore. The `ZKRMStateStore` implicitly allows write access to a single RM at any point in time, and hence is the recommended store to use in an HA cluster. When using the ZKRMStateStore, there is no need for a separate fencing mechanism to address a potential split-brain situation where multiple RMs can potentially assume the Active role.
|
With the [ResourceManger Restart](./ResourceManagerRestart.html) enabled, the RM being promoted to an active state loads the RM internal state and continues to operate from where the previous active left off as much as possible depending on the RM restart feature. A new attempt is spawned for each managed application previously submitted to the RM. Applications can checkpoint periodically to avoid losing any work. The state-store must be visible from the both of Active/Standby RMs. Currently, there are two RMStateStore implementations for persistence - FileSystemRMStateStore and ZKRMStateStore. The `ZKRMStateStore` implicitly allows write access to a single RM at any point in time, and hence is the recommended store to use in an HA cluster. When using the ZKRMStateStore, there is no need for a separate fencing mechanism to address a potential split-brain situation where multiple RMs can potentially assume the Active role. When using the ZKRMStateStore, it is advisable to NOT set the "`zookeeper.DigestAuthenticationProvider.superDigest`" property on the Zookeeper cluster to ensure that the zookeeper admin does not have access to YARN application/user credential information.
|
||||||
|
|
||||||
Deployment
|
Deployment
|
||||||
----------
|
----------
|
||||||
|
Loading…
Reference in New Issue
Block a user