YARN-1342. Recover container tokens upon nodemanager restart. Contributed by Jason Lowe.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1612995 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
06e5c5cb2d
commit
2050e0dad6
@ -62,6 +62,9 @@ Release 2.6.0 - UNRELEASED
|
|||||||
YARN-2295. Refactored DistributedShell to use public APIs of protocol records.
|
YARN-2295. Refactored DistributedShell to use public APIs of protocol records.
|
||||||
(Li Lu via jianhe)
|
(Li Lu via jianhe)
|
||||||
|
|
||||||
|
YARN-1342. Recover container tokens upon nodemanager restart. (Jason Lowe via
|
||||||
|
devaraj)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
@ -43,7 +43,7 @@ public class BaseContainerTokenSecretManager extends
|
|||||||
private static Log LOG = LogFactory
|
private static Log LOG = LogFactory
|
||||||
.getLog(BaseContainerTokenSecretManager.class);
|
.getLog(BaseContainerTokenSecretManager.class);
|
||||||
|
|
||||||
private int serialNo = new SecureRandom().nextInt();
|
protected int serialNo = new SecureRandom().nextInt();
|
||||||
|
|
||||||
protected final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
|
protected final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
|
||||||
protected final Lock readLock = readWriteLock.readLock();
|
protected final Lock readLock = readWriteLock.readLock();
|
||||||
|
@ -173,8 +173,8 @@ private void recoverTokens(NMTokenSecretManagerInNM nmTokenSecretManager,
|
|||||||
NMContainerTokenSecretManager containerTokenSecretManager)
|
NMContainerTokenSecretManager containerTokenSecretManager)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
if (nmStore.canRecover()) {
|
if (nmStore.canRecover()) {
|
||||||
nmTokenSecretManager.recover(nmStore.loadNMTokenState());
|
nmTokenSecretManager.recover();
|
||||||
// TODO: recover containerTokenSecretManager
|
containerTokenSecretManager.recover();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -190,7 +190,7 @@ protected void serviceInit(Configuration conf) throws Exception {
|
|||||||
initAndStartRecoveryStore(conf);
|
initAndStartRecoveryStore(conf);
|
||||||
|
|
||||||
NMContainerTokenSecretManager containerTokenSecretManager =
|
NMContainerTokenSecretManager containerTokenSecretManager =
|
||||||
new NMContainerTokenSecretManager(conf);
|
new NMContainerTokenSecretManager(conf, nmStore);
|
||||||
|
|
||||||
NMTokenSecretManagerInNM nmTokenSecretManager =
|
NMTokenSecretManagerInNM nmTokenSecretManager =
|
||||||
new NMTokenSecretManagerInNM(nmStore);
|
new NMTokenSecretManagerInNM(nmStore);
|
||||||
|
@ -37,6 +37,7 @@
|
|||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
|
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
|
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
|
||||||
@ -90,6 +91,12 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|||||||
NM_TOKENS_KEY_PREFIX + CURRENT_MASTER_KEY_SUFFIX;
|
NM_TOKENS_KEY_PREFIX + CURRENT_MASTER_KEY_SUFFIX;
|
||||||
private static final String NM_TOKENS_PREV_MASTER_KEY =
|
private static final String NM_TOKENS_PREV_MASTER_KEY =
|
||||||
NM_TOKENS_KEY_PREFIX + PREV_MASTER_KEY_SUFFIX;
|
NM_TOKENS_KEY_PREFIX + PREV_MASTER_KEY_SUFFIX;
|
||||||
|
private static final String CONTAINER_TOKENS_KEY_PREFIX =
|
||||||
|
"ContainerTokens/";
|
||||||
|
private static final String CONTAINER_TOKENS_CURRENT_MASTER_KEY =
|
||||||
|
CONTAINER_TOKENS_KEY_PREFIX + CURRENT_MASTER_KEY_SUFFIX;
|
||||||
|
private static final String CONTAINER_TOKENS_PREV_MASTER_KEY =
|
||||||
|
CONTAINER_TOKENS_KEY_PREFIX + PREV_MASTER_KEY_SUFFIX;
|
||||||
|
|
||||||
private DB db;
|
private DB db;
|
||||||
|
|
||||||
@ -141,7 +148,7 @@ public RecoveredLocalizationState loadLocalizationState()
|
|||||||
key.substring(0, userEndPos+1)));
|
key.substring(0, userEndPos+1)));
|
||||||
}
|
}
|
||||||
} catch (DBException e) {
|
} catch (DBException e) {
|
||||||
throw new IOException(e.getMessage(), e);
|
throw new IOException(e);
|
||||||
} finally {
|
} finally {
|
||||||
if (iter != null) {
|
if (iter != null) {
|
||||||
iter.close();
|
iter.close();
|
||||||
@ -260,7 +267,7 @@ public void startResourceLocalization(String user, ApplicationId appId,
|
|||||||
try {
|
try {
|
||||||
db.put(bytes(key), proto.toByteArray());
|
db.put(bytes(key), proto.toByteArray());
|
||||||
} catch (DBException e) {
|
} catch (DBException e) {
|
||||||
throw new IOException(e.getMessage(), e);
|
throw new IOException(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -283,7 +290,7 @@ public void finishResourceLocalization(String user, ApplicationId appId,
|
|||||||
batch.close();
|
batch.close();
|
||||||
}
|
}
|
||||||
} catch (DBException e) {
|
} catch (DBException e) {
|
||||||
throw new IOException(e.getMessage(), e);
|
throw new IOException(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -306,7 +313,7 @@ public void removeLocalizedResource(String user, ApplicationId appId,
|
|||||||
batch.close();
|
batch.close();
|
||||||
}
|
}
|
||||||
} catch (DBException e) {
|
} catch (DBException e) {
|
||||||
throw new IOException(e.getMessage(), e);
|
throw new IOException(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -355,7 +362,7 @@ public RecoveredDeletionServiceState loadDeletionServiceState()
|
|||||||
DeletionServiceDeleteTaskProto.parseFrom(entry.getValue()));
|
DeletionServiceDeleteTaskProto.parseFrom(entry.getValue()));
|
||||||
}
|
}
|
||||||
} catch (DBException e) {
|
} catch (DBException e) {
|
||||||
throw new IOException(e.getMessage(), e);
|
throw new IOException(e);
|
||||||
} finally {
|
} finally {
|
||||||
if (iter != null) {
|
if (iter != null) {
|
||||||
iter.close();
|
iter.close();
|
||||||
@ -371,7 +378,7 @@ public void storeDeletionTask(int taskId,
|
|||||||
try {
|
try {
|
||||||
db.put(bytes(key), taskProto.toByteArray());
|
db.put(bytes(key), taskProto.toByteArray());
|
||||||
} catch (DBException e) {
|
} catch (DBException e) {
|
||||||
throw new IOException(e.getMessage(), e);
|
throw new IOException(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -381,14 +388,14 @@ public void removeDeletionTask(int taskId) throws IOException {
|
|||||||
try {
|
try {
|
||||||
db.delete(bytes(key));
|
db.delete(bytes(key));
|
||||||
} catch (DBException e) {
|
} catch (DBException e) {
|
||||||
throw new IOException(e.getMessage(), e);
|
throw new IOException(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public RecoveredNMTokenState loadNMTokenState() throws IOException {
|
public RecoveredNMTokensState loadNMTokensState() throws IOException {
|
||||||
RecoveredNMTokenState state = new RecoveredNMTokenState();
|
RecoveredNMTokensState state = new RecoveredNMTokensState();
|
||||||
state.applicationMasterKeys =
|
state.applicationMasterKeys =
|
||||||
new HashMap<ApplicationAttemptId, MasterKey>();
|
new HashMap<ApplicationAttemptId, MasterKey>();
|
||||||
LeveldbIterator iter = null;
|
LeveldbIterator iter = null;
|
||||||
@ -420,7 +427,7 @@ public RecoveredNMTokenState loadNMTokenState() throws IOException {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (DBException e) {
|
} catch (DBException e) {
|
||||||
throw new IOException(e.getMessage(), e);
|
throw new IOException(e);
|
||||||
} finally {
|
} finally {
|
||||||
if (iter != null) {
|
if (iter != null) {
|
||||||
iter.close();
|
iter.close();
|
||||||
@ -454,7 +461,7 @@ public void removeNMTokenApplicationMasterKey(
|
|||||||
try {
|
try {
|
||||||
db.delete(bytes(key));
|
db.delete(bytes(key));
|
||||||
} catch (DBException e) {
|
} catch (DBException e) {
|
||||||
throw new IOException(e.getMessage(), e);
|
throw new IOException(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -468,7 +475,91 @@ private void storeMasterKey(String dbKey, MasterKey key)
|
|||||||
try {
|
try {
|
||||||
db.put(bytes(dbKey), pb.getProto().toByteArray());
|
db.put(bytes(dbKey), pb.getProto().toByteArray());
|
||||||
} catch (DBException e) {
|
} catch (DBException e) {
|
||||||
throw new IOException(e.getMessage(), e);
|
throw new IOException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RecoveredContainerTokensState loadContainerTokensState()
|
||||||
|
throws IOException {
|
||||||
|
RecoveredContainerTokensState state = new RecoveredContainerTokensState();
|
||||||
|
state.activeTokens = new HashMap<ContainerId, Long>();
|
||||||
|
LeveldbIterator iter = null;
|
||||||
|
try {
|
||||||
|
iter = new LeveldbIterator(db);
|
||||||
|
iter.seek(bytes(CONTAINER_TOKENS_KEY_PREFIX));
|
||||||
|
final int containerTokensKeyPrefixLength =
|
||||||
|
CONTAINER_TOKENS_KEY_PREFIX.length();
|
||||||
|
while (iter.hasNext()) {
|
||||||
|
Entry<byte[], byte[]> entry = iter.next();
|
||||||
|
String fullKey = asString(entry.getKey());
|
||||||
|
if (!fullKey.startsWith(CONTAINER_TOKENS_KEY_PREFIX)) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
String key = fullKey.substring(containerTokensKeyPrefixLength);
|
||||||
|
if (key.equals(CURRENT_MASTER_KEY_SUFFIX)) {
|
||||||
|
state.currentMasterKey = parseMasterKey(entry.getValue());
|
||||||
|
} else if (key.equals(PREV_MASTER_KEY_SUFFIX)) {
|
||||||
|
state.previousMasterKey = parseMasterKey(entry.getValue());
|
||||||
|
} else if (key.startsWith(ConverterUtils.CONTAINER_PREFIX)) {
|
||||||
|
loadContainerToken(state, fullKey, key, entry.getValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (DBException e) {
|
||||||
|
throw new IOException(e);
|
||||||
|
} finally {
|
||||||
|
if (iter != null) {
|
||||||
|
iter.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return state;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void loadContainerToken(RecoveredContainerTokensState state,
|
||||||
|
String key, String containerIdStr, byte[] value) throws IOException {
|
||||||
|
ContainerId containerId;
|
||||||
|
Long expTime;
|
||||||
|
try {
|
||||||
|
containerId = ConverterUtils.toContainerId(containerIdStr);
|
||||||
|
expTime = Long.parseLong(asString(value));
|
||||||
|
} catch (IllegalArgumentException e) {
|
||||||
|
throw new IOException("Bad container token state for " + key, e);
|
||||||
|
}
|
||||||
|
state.activeTokens.put(containerId, expTime);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void storeContainerTokenCurrentMasterKey(MasterKey key)
|
||||||
|
throws IOException {
|
||||||
|
storeMasterKey(CONTAINER_TOKENS_CURRENT_MASTER_KEY, key);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void storeContainerTokenPreviousMasterKey(MasterKey key)
|
||||||
|
throws IOException {
|
||||||
|
storeMasterKey(CONTAINER_TOKENS_PREV_MASTER_KEY, key);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void storeContainerToken(ContainerId containerId, Long expTime)
|
||||||
|
throws IOException {
|
||||||
|
String key = CONTAINER_TOKENS_KEY_PREFIX + containerId;
|
||||||
|
try {
|
||||||
|
db.put(bytes(key), bytes(expTime.toString()));
|
||||||
|
} catch (DBException e) {
|
||||||
|
throw new IOException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void removeContainerToken(ContainerId containerId)
|
||||||
|
throws IOException {
|
||||||
|
String key = CONTAINER_TOKENS_KEY_PREFIX + containerId;
|
||||||
|
try {
|
||||||
|
db.delete(bytes(key));
|
||||||
|
} catch (DBException e) {
|
||||||
|
throw new IOException(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -554,7 +645,7 @@ private void dbStoreVersion(NMDBSchemaVersion state) throws IOException {
|
|||||||
try {
|
try {
|
||||||
db.put(bytes(key), data);
|
db.put(bytes(key), data);
|
||||||
} catch (DBException e) {
|
} catch (DBException e) {
|
||||||
throw new IOException(e.getMessage(), e);
|
throw new IOException(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -24,6 +24,7 @@
|
|||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
|
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
|
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
|
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
|
||||||
@ -80,7 +81,7 @@ public void removeDeletionTask(int taskId) throws IOException {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public RecoveredNMTokenState loadNMTokenState() throws IOException {
|
public RecoveredNMTokensState loadNMTokensState() throws IOException {
|
||||||
throw new UnsupportedOperationException(
|
throw new UnsupportedOperationException(
|
||||||
"Recovery not supported by this state store");
|
"Recovery not supported by this state store");
|
||||||
}
|
}
|
||||||
@ -105,6 +106,33 @@ public void removeNMTokenApplicationMasterKey(ApplicationAttemptId attempt)
|
|||||||
throws IOException {
|
throws IOException {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RecoveredContainerTokensState loadContainerTokensState()
|
||||||
|
throws IOException {
|
||||||
|
throw new UnsupportedOperationException(
|
||||||
|
"Recovery not supported by this state store");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void storeContainerTokenCurrentMasterKey(MasterKey key)
|
||||||
|
throws IOException {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void storeContainerTokenPreviousMasterKey(MasterKey key)
|
||||||
|
throws IOException {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void storeContainerToken(ContainerId containerId,
|
||||||
|
Long expirationTime) throws IOException {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void removeContainerToken(ContainerId containerId)
|
||||||
|
throws IOException {
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void initStorage(Configuration conf) throws IOException {
|
protected void initStorage(Configuration conf) throws IOException {
|
||||||
}
|
}
|
||||||
|
@ -31,6 +31,7 @@
|
|||||||
import org.apache.hadoop.service.AbstractService;
|
import org.apache.hadoop.service.AbstractService;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
|
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
|
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
|
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
|
||||||
@ -102,7 +103,7 @@ public List<DeletionServiceDeleteTaskProto> getTasks() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class RecoveredNMTokenState {
|
public static class RecoveredNMTokensState {
|
||||||
MasterKey currentMasterKey;
|
MasterKey currentMasterKey;
|
||||||
MasterKey previousMasterKey;
|
MasterKey previousMasterKey;
|
||||||
Map<ApplicationAttemptId, MasterKey> applicationMasterKeys;
|
Map<ApplicationAttemptId, MasterKey> applicationMasterKeys;
|
||||||
@ -120,6 +121,24 @@ public Map<ApplicationAttemptId, MasterKey> getApplicationMasterKeys() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static class RecoveredContainerTokensState {
|
||||||
|
MasterKey currentMasterKey;
|
||||||
|
MasterKey previousMasterKey;
|
||||||
|
Map<ContainerId, Long> activeTokens;
|
||||||
|
|
||||||
|
public MasterKey getCurrentMasterKey() {
|
||||||
|
return currentMasterKey;
|
||||||
|
}
|
||||||
|
|
||||||
|
public MasterKey getPreviousMasterKey() {
|
||||||
|
return previousMasterKey;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Map<ContainerId, Long> getActiveTokens() {
|
||||||
|
return activeTokens;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/** Initialize the state storage */
|
/** Initialize the state storage */
|
||||||
@Override
|
@Override
|
||||||
public void serviceInit(Configuration conf) throws IOException {
|
public void serviceInit(Configuration conf) throws IOException {
|
||||||
@ -193,7 +212,8 @@ public abstract void storeDeletionTask(int taskId,
|
|||||||
public abstract void removeDeletionTask(int taskId) throws IOException;
|
public abstract void removeDeletionTask(int taskId) throws IOException;
|
||||||
|
|
||||||
|
|
||||||
public abstract RecoveredNMTokenState loadNMTokenState() throws IOException;
|
public abstract RecoveredNMTokensState loadNMTokensState()
|
||||||
|
throws IOException;
|
||||||
|
|
||||||
public abstract void storeNMTokenCurrentMasterKey(MasterKey key)
|
public abstract void storeNMTokenCurrentMasterKey(MasterKey key)
|
||||||
throws IOException;
|
throws IOException;
|
||||||
@ -208,6 +228,22 @@ public abstract void removeNMTokenApplicationMasterKey(
|
|||||||
ApplicationAttemptId attempt) throws IOException;
|
ApplicationAttemptId attempt) throws IOException;
|
||||||
|
|
||||||
|
|
||||||
|
public abstract RecoveredContainerTokensState loadContainerTokensState()
|
||||||
|
throws IOException;
|
||||||
|
|
||||||
|
public abstract void storeContainerTokenCurrentMasterKey(MasterKey key)
|
||||||
|
throws IOException;
|
||||||
|
|
||||||
|
public abstract void storeContainerTokenPreviousMasterKey(MasterKey key)
|
||||||
|
throws IOException;
|
||||||
|
|
||||||
|
public abstract void storeContainerToken(ContainerId containerId,
|
||||||
|
Long expirationTime) throws IOException;
|
||||||
|
|
||||||
|
public abstract void removeContainerToken(ContainerId containerId)
|
||||||
|
throws IOException;
|
||||||
|
|
||||||
|
|
||||||
protected abstract void initStorage(Configuration conf) throws IOException;
|
protected abstract void initStorage(Configuration conf) throws IOException;
|
||||||
|
|
||||||
protected abstract void startStorage() throws IOException;
|
protected abstract void startStorage() throws IOException;
|
||||||
|
@ -18,6 +18,7 @@
|
|||||||
|
|
||||||
package org.apache.hadoop.yarn.server.nodemanager.security;
|
package org.apache.hadoop.yarn.server.nodemanager.security;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@ -33,6 +34,9 @@
|
|||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerTokensState;
|
||||||
import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager;
|
import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager;
|
||||||
import org.apache.hadoop.yarn.server.security.MasterKeyData;
|
import org.apache.hadoop.yarn.server.security.MasterKeyData;
|
||||||
|
|
||||||
@ -49,14 +53,74 @@ public class NMContainerTokenSecretManager extends
|
|||||||
|
|
||||||
private MasterKeyData previousMasterKey;
|
private MasterKeyData previousMasterKey;
|
||||||
private final TreeMap<Long, List<ContainerId>> recentlyStartedContainerTracker;
|
private final TreeMap<Long, List<ContainerId>> recentlyStartedContainerTracker;
|
||||||
|
private final NMStateStoreService stateStore;
|
||||||
|
|
||||||
private String nodeHostAddr;
|
private String nodeHostAddr;
|
||||||
|
|
||||||
public NMContainerTokenSecretManager(Configuration conf) {
|
public NMContainerTokenSecretManager(Configuration conf) {
|
||||||
|
this(conf, new NMNullStateStoreService());
|
||||||
|
}
|
||||||
|
|
||||||
|
public NMContainerTokenSecretManager(Configuration conf,
|
||||||
|
NMStateStoreService stateStore) {
|
||||||
super(conf);
|
super(conf);
|
||||||
recentlyStartedContainerTracker =
|
recentlyStartedContainerTracker =
|
||||||
new TreeMap<Long, List<ContainerId>>();
|
new TreeMap<Long, List<ContainerId>>();
|
||||||
|
this.stateStore = stateStore;
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized void recover()
|
||||||
|
throws IOException {
|
||||||
|
RecoveredContainerTokensState state =
|
||||||
|
stateStore.loadContainerTokensState();
|
||||||
|
MasterKey key = state.getCurrentMasterKey();
|
||||||
|
if (key != null) {
|
||||||
|
super.currentMasterKey =
|
||||||
|
new MasterKeyData(key, createSecretKey(key.getBytes().array()));
|
||||||
|
}
|
||||||
|
|
||||||
|
key = state.getPreviousMasterKey();
|
||||||
|
if (key != null) {
|
||||||
|
previousMasterKey =
|
||||||
|
new MasterKeyData(key, createSecretKey(key.getBytes().array()));
|
||||||
|
}
|
||||||
|
|
||||||
|
// restore the serial number from the current master key
|
||||||
|
if (super.currentMasterKey != null) {
|
||||||
|
super.serialNo = super.currentMasterKey.getMasterKey().getKeyId() + 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (Entry<ContainerId, Long> entry : state.getActiveTokens().entrySet()) {
|
||||||
|
ContainerId containerId = entry.getKey();
|
||||||
|
Long expTime = entry.getValue();
|
||||||
|
List<ContainerId> containerList =
|
||||||
|
recentlyStartedContainerTracker.get(expTime);
|
||||||
|
if (containerList == null) {
|
||||||
|
containerList = new ArrayList<ContainerId>();
|
||||||
|
recentlyStartedContainerTracker.put(expTime, containerList);
|
||||||
|
}
|
||||||
|
if (!containerList.contains(containerId)) {
|
||||||
|
containerList.add(containerId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void updateCurrentMasterKey(MasterKeyData key) {
|
||||||
|
super.currentMasterKey = key;
|
||||||
|
try {
|
||||||
|
stateStore.storeContainerTokenCurrentMasterKey(key.getMasterKey());
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.error("Unable to update current master key in state store", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void updatePreviousMasterKey(MasterKeyData key) {
|
||||||
|
previousMasterKey = key;
|
||||||
|
try {
|
||||||
|
stateStore.storeContainerTokenPreviousMasterKey(key.getMasterKey());
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.error("Unable to update previous master key in state store", e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -68,21 +132,16 @@ public NMContainerTokenSecretManager(Configuration conf) {
|
|||||||
*/
|
*/
|
||||||
@Private
|
@Private
|
||||||
public synchronized void setMasterKey(MasterKey masterKeyRecord) {
|
public synchronized void setMasterKey(MasterKey masterKeyRecord) {
|
||||||
LOG.info("Rolling master-key for container-tokens, got key with id "
|
// Update keys only if the key has changed.
|
||||||
+ masterKeyRecord.getKeyId());
|
if (super.currentMasterKey == null || super.currentMasterKey.getMasterKey()
|
||||||
if (super.currentMasterKey == null) {
|
.getKeyId() != masterKeyRecord.getKeyId()) {
|
||||||
super.currentMasterKey =
|
LOG.info("Rolling master-key for container-tokens, got key with id "
|
||||||
new MasterKeyData(masterKeyRecord, createSecretKey(masterKeyRecord
|
+ masterKeyRecord.getKeyId());
|
||||||
.getBytes().array()));
|
if (super.currentMasterKey != null) {
|
||||||
} else {
|
updatePreviousMasterKey(super.currentMasterKey);
|
||||||
if (super.currentMasterKey.getMasterKey().getKeyId() != masterKeyRecord
|
|
||||||
.getKeyId()) {
|
|
||||||
// Update keys only if the key has changed.
|
|
||||||
this.previousMasterKey = super.currentMasterKey;
|
|
||||||
super.currentMasterKey =
|
|
||||||
new MasterKeyData(masterKeyRecord, createSecretKey(masterKeyRecord
|
|
||||||
.getBytes().array()));
|
|
||||||
}
|
}
|
||||||
|
updateCurrentMasterKey(new MasterKeyData(masterKeyRecord,
|
||||||
|
createSecretKey(masterKeyRecord.getBytes().array())));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -137,14 +196,19 @@ public synchronized void startContainerSuccessful(
|
|||||||
|
|
||||||
removeAnyContainerTokenIfExpired();
|
removeAnyContainerTokenIfExpired();
|
||||||
|
|
||||||
|
ContainerId containerId = tokenId.getContainerID();
|
||||||
Long expTime = tokenId.getExpiryTimeStamp();
|
Long expTime = tokenId.getExpiryTimeStamp();
|
||||||
// We might have multiple containers with same expiration time.
|
// We might have multiple containers with same expiration time.
|
||||||
if (!recentlyStartedContainerTracker.containsKey(expTime)) {
|
if (!recentlyStartedContainerTracker.containsKey(expTime)) {
|
||||||
recentlyStartedContainerTracker
|
recentlyStartedContainerTracker
|
||||||
.put(expTime, new ArrayList<ContainerId>());
|
.put(expTime, new ArrayList<ContainerId>());
|
||||||
}
|
}
|
||||||
recentlyStartedContainerTracker.get(expTime).add(tokenId.getContainerID());
|
recentlyStartedContainerTracker.get(expTime).add(containerId);
|
||||||
|
try {
|
||||||
|
stateStore.storeContainerToken(containerId, expTime);
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.error("Unable to store token for container " + containerId, e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected synchronized void removeAnyContainerTokenIfExpired() {
|
protected synchronized void removeAnyContainerTokenIfExpired() {
|
||||||
@ -155,6 +219,13 @@ protected synchronized void removeAnyContainerTokenIfExpired() {
|
|||||||
while (containersI.hasNext()) {
|
while (containersI.hasNext()) {
|
||||||
Entry<Long, List<ContainerId>> containerEntry = containersI.next();
|
Entry<Long, List<ContainerId>> containerEntry = containersI.next();
|
||||||
if (containerEntry.getKey() < currTime) {
|
if (containerEntry.getKey() < currTime) {
|
||||||
|
for (ContainerId container : containerEntry.getValue()) {
|
||||||
|
try {
|
||||||
|
stateStore.removeContainerToken(container);
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.error("Unable to remove token for container " + container, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
containersI.remove();
|
containersI.remove();
|
||||||
} else {
|
} else {
|
||||||
break;
|
break;
|
||||||
|
@ -34,7 +34,7 @@
|
|||||||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
|
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
|
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredNMTokenState;
|
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredNMTokensState;
|
||||||
import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager;
|
import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager;
|
||||||
import org.apache.hadoop.yarn.server.security.MasterKeyData;
|
import org.apache.hadoop.yarn.server.security.MasterKeyData;
|
||||||
|
|
||||||
@ -64,8 +64,9 @@ public NMTokenSecretManagerInNM(NMStateStoreService stateStore) {
|
|||||||
this.stateStore = stateStore;
|
this.stateStore = stateStore;
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized void recover(RecoveredNMTokenState state)
|
public synchronized void recover()
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
RecoveredNMTokensState state = stateStore.loadNMTokensState();
|
||||||
MasterKey key = state.getCurrentMasterKey();
|
MasterKey key = state.getCurrentMasterKey();
|
||||||
if (key != null) {
|
if (key != null) {
|
||||||
super.currentMasterKey =
|
super.currentMasterKey =
|
||||||
|
@ -27,6 +27,7 @@
|
|||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
|
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
|
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
|
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
|
||||||
@ -36,7 +37,8 @@
|
|||||||
public class NMMemoryStateStoreService extends NMStateStoreService {
|
public class NMMemoryStateStoreService extends NMStateStoreService {
|
||||||
private Map<TrackerKey, TrackerState> trackerStates;
|
private Map<TrackerKey, TrackerState> trackerStates;
|
||||||
private Map<Integer, DeletionServiceDeleteTaskProto> deleteTasks;
|
private Map<Integer, DeletionServiceDeleteTaskProto> deleteTasks;
|
||||||
private RecoveredNMTokenState nmTokenState;
|
private RecoveredNMTokensState nmTokenState;
|
||||||
|
private RecoveredContainerTokensState containerTokenState;
|
||||||
|
|
||||||
public NMMemoryStateStoreService() {
|
public NMMemoryStateStoreService() {
|
||||||
super(NMMemoryStateStoreService.class.getName());
|
super(NMMemoryStateStoreService.class.getName());
|
||||||
@ -117,12 +119,13 @@ public synchronized void removeLocalizedResource(String user,
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void initStorage(Configuration conf) {
|
protected void initStorage(Configuration conf) {
|
||||||
nmTokenState = new RecoveredNMTokenState();
|
nmTokenState = new RecoveredNMTokensState();
|
||||||
nmTokenState.applicationMasterKeys =
|
nmTokenState.applicationMasterKeys =
|
||||||
new HashMap<ApplicationAttemptId, MasterKey>();
|
new HashMap<ApplicationAttemptId, MasterKey>();
|
||||||
|
containerTokenState = new RecoveredContainerTokensState();
|
||||||
|
containerTokenState.activeTokens = new HashMap<ContainerId, Long>();
|
||||||
trackerStates = new HashMap<TrackerKey, TrackerState>();
|
trackerStates = new HashMap<TrackerKey, TrackerState>();
|
||||||
deleteTasks = new HashMap<Integer, DeletionServiceDeleteTaskProto>();
|
deleteTasks = new HashMap<Integer, DeletionServiceDeleteTaskProto>();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -157,9 +160,9 @@ public synchronized void removeDeletionTask(int taskId) throws IOException {
|
|||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public RecoveredNMTokenState loadNMTokenState() throws IOException {
|
public RecoveredNMTokensState loadNMTokensState() throws IOException {
|
||||||
// return a copy so caller can't modify our state
|
// return a copy so caller can't modify our state
|
||||||
RecoveredNMTokenState result = new RecoveredNMTokenState();
|
RecoveredNMTokensState result = new RecoveredNMTokensState();
|
||||||
result.currentMasterKey = nmTokenState.currentMasterKey;
|
result.currentMasterKey = nmTokenState.currentMasterKey;
|
||||||
result.previousMasterKey = nmTokenState.previousMasterKey;
|
result.previousMasterKey = nmTokenState.previousMasterKey;
|
||||||
result.applicationMasterKeys =
|
result.applicationMasterKeys =
|
||||||
@ -197,6 +200,48 @@ public void removeNMTokenApplicationMasterKey(ApplicationAttemptId attempt)
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RecoveredContainerTokensState loadContainerTokensState()
|
||||||
|
throws IOException {
|
||||||
|
// return a copy so caller can't modify our state
|
||||||
|
RecoveredContainerTokensState result =
|
||||||
|
new RecoveredContainerTokensState();
|
||||||
|
result.currentMasterKey = containerTokenState.currentMasterKey;
|
||||||
|
result.previousMasterKey = containerTokenState.previousMasterKey;
|
||||||
|
result.activeTokens =
|
||||||
|
new HashMap<ContainerId, Long>(containerTokenState.activeTokens);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void storeContainerTokenCurrentMasterKey(MasterKey key)
|
||||||
|
throws IOException {
|
||||||
|
MasterKeyPBImpl keypb = (MasterKeyPBImpl) key;
|
||||||
|
containerTokenState.currentMasterKey =
|
||||||
|
new MasterKeyPBImpl(keypb.getProto());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void storeContainerTokenPreviousMasterKey(MasterKey key)
|
||||||
|
throws IOException {
|
||||||
|
MasterKeyPBImpl keypb = (MasterKeyPBImpl) key;
|
||||||
|
containerTokenState.previousMasterKey =
|
||||||
|
new MasterKeyPBImpl(keypb.getProto());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void storeContainerToken(ContainerId containerId,
|
||||||
|
Long expirationTime) throws IOException {
|
||||||
|
containerTokenState.activeTokens.put(containerId, expirationTime);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void removeContainerToken(ContainerId containerId)
|
||||||
|
throws IOException {
|
||||||
|
containerTokenState.activeTokens.remove(containerId);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
private static class TrackerState {
|
private static class TrackerState {
|
||||||
Map<Path, LocalResourceProto> inProgressMap =
|
Map<Path, LocalResourceProto> inProgressMap =
|
||||||
new HashMap<Path, LocalResourceProto>();
|
new HashMap<Path, LocalResourceProto>();
|
||||||
|
@ -27,11 +27,13 @@
|
|||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileUtil;
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.service.ServiceStateException;
|
import org.apache.hadoop.service.ServiceStateException;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.LocalResource;
|
import org.apache.hadoop.yarn.api.records.LocalResource;
|
||||||
import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
||||||
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
||||||
@ -42,12 +44,15 @@
|
|||||||
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
|
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
|
||||||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.LocalResourceTrackerState;
|
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.LocalResourceTrackerState;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerTokensState;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredDeletionServiceState;
|
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredDeletionServiceState;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLocalizationState;
|
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLocalizationState;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredNMTokenState;
|
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredNMTokensState;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredUserResources;
|
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredUserResources;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.records.NMDBSchemaVersion;
|
import org.apache.hadoop.yarn.server.nodemanager.recovery.records.NMDBSchemaVersion;
|
||||||
|
import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager;
|
||||||
import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager;
|
import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager;
|
||||||
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
@ -502,7 +507,7 @@ public void testDeletionTaskStorage() throws IOException {
|
|||||||
@Test
|
@Test
|
||||||
public void testNMTokenStorage() throws IOException {
|
public void testNMTokenStorage() throws IOException {
|
||||||
// test empty when no state
|
// test empty when no state
|
||||||
RecoveredNMTokenState state = stateStore.loadNMTokenState();
|
RecoveredNMTokensState state = stateStore.loadNMTokensState();
|
||||||
assertNull(state.getCurrentMasterKey());
|
assertNull(state.getCurrentMasterKey());
|
||||||
assertNull(state.getPreviousMasterKey());
|
assertNull(state.getPreviousMasterKey());
|
||||||
assertTrue(state.getApplicationMasterKeys().isEmpty());
|
assertTrue(state.getApplicationMasterKeys().isEmpty());
|
||||||
@ -512,7 +517,7 @@ public void testNMTokenStorage() throws IOException {
|
|||||||
MasterKey currentKey = secretMgr.generateKey();
|
MasterKey currentKey = secretMgr.generateKey();
|
||||||
stateStore.storeNMTokenCurrentMasterKey(currentKey);
|
stateStore.storeNMTokenCurrentMasterKey(currentKey);
|
||||||
restartStateStore();
|
restartStateStore();
|
||||||
state = stateStore.loadNMTokenState();
|
state = stateStore.loadNMTokensState();
|
||||||
assertEquals(currentKey, state.getCurrentMasterKey());
|
assertEquals(currentKey, state.getCurrentMasterKey());
|
||||||
assertNull(state.getPreviousMasterKey());
|
assertNull(state.getPreviousMasterKey());
|
||||||
assertTrue(state.getApplicationMasterKeys().isEmpty());
|
assertTrue(state.getApplicationMasterKeys().isEmpty());
|
||||||
@ -521,7 +526,7 @@ public void testNMTokenStorage() throws IOException {
|
|||||||
MasterKey prevKey = secretMgr.generateKey();
|
MasterKey prevKey = secretMgr.generateKey();
|
||||||
stateStore.storeNMTokenPreviousMasterKey(prevKey);
|
stateStore.storeNMTokenPreviousMasterKey(prevKey);
|
||||||
restartStateStore();
|
restartStateStore();
|
||||||
state = stateStore.loadNMTokenState();
|
state = stateStore.loadNMTokensState();
|
||||||
assertEquals(currentKey, state.getCurrentMasterKey());
|
assertEquals(currentKey, state.getCurrentMasterKey());
|
||||||
assertEquals(prevKey, state.getPreviousMasterKey());
|
assertEquals(prevKey, state.getPreviousMasterKey());
|
||||||
assertTrue(state.getApplicationMasterKeys().isEmpty());
|
assertTrue(state.getApplicationMasterKeys().isEmpty());
|
||||||
@ -536,7 +541,7 @@ public void testNMTokenStorage() throws IOException {
|
|||||||
MasterKey attemptKey2 = secretMgr.generateKey();
|
MasterKey attemptKey2 = secretMgr.generateKey();
|
||||||
stateStore.storeNMTokenApplicationMasterKey(attempt2, attemptKey2);
|
stateStore.storeNMTokenApplicationMasterKey(attempt2, attemptKey2);
|
||||||
restartStateStore();
|
restartStateStore();
|
||||||
state = stateStore.loadNMTokenState();
|
state = stateStore.loadNMTokensState();
|
||||||
assertEquals(currentKey, state.getCurrentMasterKey());
|
assertEquals(currentKey, state.getCurrentMasterKey());
|
||||||
assertEquals(prevKey, state.getPreviousMasterKey());
|
assertEquals(prevKey, state.getPreviousMasterKey());
|
||||||
Map<ApplicationAttemptId, MasterKey> loadedAppKeys =
|
Map<ApplicationAttemptId, MasterKey> loadedAppKeys =
|
||||||
@ -558,7 +563,7 @@ public void testNMTokenStorage() throws IOException {
|
|||||||
currentKey = secretMgr.generateKey();
|
currentKey = secretMgr.generateKey();
|
||||||
stateStore.storeNMTokenCurrentMasterKey(currentKey);
|
stateStore.storeNMTokenCurrentMasterKey(currentKey);
|
||||||
restartStateStore();
|
restartStateStore();
|
||||||
state = stateStore.loadNMTokenState();
|
state = stateStore.loadNMTokensState();
|
||||||
assertEquals(currentKey, state.getCurrentMasterKey());
|
assertEquals(currentKey, state.getCurrentMasterKey());
|
||||||
assertEquals(prevKey, state.getPreviousMasterKey());
|
assertEquals(prevKey, state.getPreviousMasterKey());
|
||||||
loadedAppKeys = state.getApplicationMasterKeys();
|
loadedAppKeys = state.getApplicationMasterKeys();
|
||||||
@ -568,10 +573,89 @@ public void testNMTokenStorage() throws IOException {
|
|||||||
assertEquals(attemptKey3, loadedAppKeys.get(attempt3));
|
assertEquals(attemptKey3, loadedAppKeys.get(attempt3));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testContainerTokenStorage() throws IOException {
|
||||||
|
// test empty when no state
|
||||||
|
RecoveredContainerTokensState state =
|
||||||
|
stateStore.loadContainerTokensState();
|
||||||
|
assertNull(state.getCurrentMasterKey());
|
||||||
|
assertNull(state.getPreviousMasterKey());
|
||||||
|
assertTrue(state.getActiveTokens().isEmpty());
|
||||||
|
|
||||||
|
// store a master key and verify recovered
|
||||||
|
ContainerTokenKeyGeneratorForTest keygen =
|
||||||
|
new ContainerTokenKeyGeneratorForTest(new YarnConfiguration());
|
||||||
|
MasterKey currentKey = keygen.generateKey();
|
||||||
|
stateStore.storeContainerTokenCurrentMasterKey(currentKey);
|
||||||
|
restartStateStore();
|
||||||
|
state = stateStore.loadContainerTokensState();
|
||||||
|
assertEquals(currentKey, state.getCurrentMasterKey());
|
||||||
|
assertNull(state.getPreviousMasterKey());
|
||||||
|
assertTrue(state.getActiveTokens().isEmpty());
|
||||||
|
|
||||||
|
// store a previous key and verify recovered
|
||||||
|
MasterKey prevKey = keygen.generateKey();
|
||||||
|
stateStore.storeContainerTokenPreviousMasterKey(prevKey);
|
||||||
|
restartStateStore();
|
||||||
|
state = stateStore.loadContainerTokensState();
|
||||||
|
assertEquals(currentKey, state.getCurrentMasterKey());
|
||||||
|
assertEquals(prevKey, state.getPreviousMasterKey());
|
||||||
|
assertTrue(state.getActiveTokens().isEmpty());
|
||||||
|
|
||||||
|
// store a few container tokens and verify recovered
|
||||||
|
ContainerId cid1 = BuilderUtils.newContainerId(1, 1, 1, 1);
|
||||||
|
Long expTime1 = 1234567890L;
|
||||||
|
ContainerId cid2 = BuilderUtils.newContainerId(2, 2, 2, 2);
|
||||||
|
Long expTime2 = 9876543210L;
|
||||||
|
stateStore.storeContainerToken(cid1, expTime1);
|
||||||
|
stateStore.storeContainerToken(cid2, expTime2);
|
||||||
|
restartStateStore();
|
||||||
|
state = stateStore.loadContainerTokensState();
|
||||||
|
assertEquals(currentKey, state.getCurrentMasterKey());
|
||||||
|
assertEquals(prevKey, state.getPreviousMasterKey());
|
||||||
|
Map<ContainerId, Long> loadedActiveTokens =
|
||||||
|
state.getActiveTokens();
|
||||||
|
assertEquals(2, loadedActiveTokens.size());
|
||||||
|
assertEquals(expTime1, loadedActiveTokens.get(cid1));
|
||||||
|
assertEquals(expTime2, loadedActiveTokens.get(cid2));
|
||||||
|
|
||||||
|
// add/update/remove tokens and verify recovered
|
||||||
|
ContainerId cid3 = BuilderUtils.newContainerId(3, 3, 3, 3);
|
||||||
|
Long expTime3 = 135798642L;
|
||||||
|
stateStore.storeContainerToken(cid3, expTime3);
|
||||||
|
stateStore.removeContainerToken(cid1);
|
||||||
|
expTime2 += 246897531L;
|
||||||
|
stateStore.storeContainerToken(cid2, expTime2);
|
||||||
|
prevKey = currentKey;
|
||||||
|
stateStore.storeContainerTokenPreviousMasterKey(prevKey);
|
||||||
|
currentKey = keygen.generateKey();
|
||||||
|
stateStore.storeContainerTokenCurrentMasterKey(currentKey);
|
||||||
|
restartStateStore();
|
||||||
|
state = stateStore.loadContainerTokensState();
|
||||||
|
assertEquals(currentKey, state.getCurrentMasterKey());
|
||||||
|
assertEquals(prevKey, state.getPreviousMasterKey());
|
||||||
|
loadedActiveTokens = state.getActiveTokens();
|
||||||
|
assertEquals(2, loadedActiveTokens.size());
|
||||||
|
assertNull(loadedActiveTokens.get(cid1));
|
||||||
|
assertEquals(expTime2, loadedActiveTokens.get(cid2));
|
||||||
|
assertEquals(expTime3, loadedActiveTokens.get(cid3));
|
||||||
|
}
|
||||||
|
|
||||||
private static class NMTokenSecretManagerForTest extends
|
private static class NMTokenSecretManagerForTest extends
|
||||||
BaseNMTokenSecretManager {
|
BaseNMTokenSecretManager {
|
||||||
public MasterKey generateKey() {
|
public MasterKey generateKey() {
|
||||||
return createNewMasterKey().getMasterKey();
|
return createNewMasterKey().getMasterKey();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static class ContainerTokenKeyGeneratorForTest extends
|
||||||
|
BaseContainerTokenSecretManager {
|
||||||
|
public ContainerTokenKeyGeneratorForTest(Configuration conf) {
|
||||||
|
super(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
public MasterKey generateKey() {
|
||||||
|
return createNewMasterKey().getMasterKey();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,144 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.nodemanager.security;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Token;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||||
|
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService;
|
||||||
|
import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager;
|
||||||
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class TestNMContainerTokenSecretManager {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRecovery() throws IOException {
|
||||||
|
YarnConfiguration conf = new YarnConfiguration();
|
||||||
|
conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
|
||||||
|
final NodeId nodeId = NodeId.newInstance("somehost", 1234);
|
||||||
|
final ContainerId cid1 = BuilderUtils.newContainerId(1, 1, 1, 1);
|
||||||
|
final ContainerId cid2 = BuilderUtils.newContainerId(2, 2, 2, 2);
|
||||||
|
ContainerTokenKeyGeneratorForTest keygen =
|
||||||
|
new ContainerTokenKeyGeneratorForTest(conf);
|
||||||
|
NMMemoryStateStoreService stateStore = new NMMemoryStateStoreService();
|
||||||
|
stateStore.init(conf);
|
||||||
|
stateStore.start();
|
||||||
|
NMContainerTokenSecretManager secretMgr =
|
||||||
|
new NMContainerTokenSecretManager(conf, stateStore);
|
||||||
|
secretMgr.setNodeId(nodeId);
|
||||||
|
MasterKey currentKey = keygen.generateKey();
|
||||||
|
secretMgr.setMasterKey(currentKey);
|
||||||
|
ContainerTokenIdentifier tokenId1 =
|
||||||
|
createContainerTokenId(cid1, nodeId, "user1", secretMgr);
|
||||||
|
ContainerTokenIdentifier tokenId2 =
|
||||||
|
createContainerTokenId(cid2, nodeId, "user2", secretMgr);
|
||||||
|
assertNotNull(secretMgr.retrievePassword(tokenId1));
|
||||||
|
assertNotNull(secretMgr.retrievePassword(tokenId2));
|
||||||
|
|
||||||
|
// restart and verify tokens still valid
|
||||||
|
secretMgr = new NMContainerTokenSecretManager(conf, stateStore);
|
||||||
|
secretMgr.setNodeId(nodeId);
|
||||||
|
secretMgr.recover();
|
||||||
|
assertEquals(currentKey, secretMgr.getCurrentKey());
|
||||||
|
assertTrue(secretMgr.isValidStartContainerRequest(tokenId1));
|
||||||
|
assertTrue(secretMgr.isValidStartContainerRequest(tokenId2));
|
||||||
|
assertNotNull(secretMgr.retrievePassword(tokenId1));
|
||||||
|
assertNotNull(secretMgr.retrievePassword(tokenId2));
|
||||||
|
|
||||||
|
// roll master key and start a container
|
||||||
|
secretMgr.startContainerSuccessful(tokenId2);
|
||||||
|
currentKey = keygen.generateKey();
|
||||||
|
secretMgr.setMasterKey(currentKey);
|
||||||
|
|
||||||
|
// restart and verify tokens still valid due to prev key persist
|
||||||
|
secretMgr = new NMContainerTokenSecretManager(conf, stateStore);
|
||||||
|
secretMgr.setNodeId(nodeId);
|
||||||
|
secretMgr.recover();
|
||||||
|
assertEquals(currentKey, secretMgr.getCurrentKey());
|
||||||
|
assertTrue(secretMgr.isValidStartContainerRequest(tokenId1));
|
||||||
|
assertFalse(secretMgr.isValidStartContainerRequest(tokenId2));
|
||||||
|
assertNotNull(secretMgr.retrievePassword(tokenId1));
|
||||||
|
assertNotNull(secretMgr.retrievePassword(tokenId2));
|
||||||
|
|
||||||
|
// roll master key again, restart, and verify keys no longer valid
|
||||||
|
currentKey = keygen.generateKey();
|
||||||
|
secretMgr.setMasterKey(currentKey);
|
||||||
|
secretMgr = new NMContainerTokenSecretManager(conf, stateStore);
|
||||||
|
secretMgr.setNodeId(nodeId);
|
||||||
|
secretMgr.recover();
|
||||||
|
assertEquals(currentKey, secretMgr.getCurrentKey());
|
||||||
|
assertTrue(secretMgr.isValidStartContainerRequest(tokenId1));
|
||||||
|
assertFalse(secretMgr.isValidStartContainerRequest(tokenId2));
|
||||||
|
try {
|
||||||
|
secretMgr.retrievePassword(tokenId1);
|
||||||
|
fail("token should not be valid");
|
||||||
|
} catch (InvalidToken e) {
|
||||||
|
// expected
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
secretMgr.retrievePassword(tokenId2);
|
||||||
|
fail("token should not be valid");
|
||||||
|
} catch (InvalidToken e) {
|
||||||
|
// expected
|
||||||
|
}
|
||||||
|
|
||||||
|
stateStore.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static ContainerTokenIdentifier createContainerTokenId(
|
||||||
|
ContainerId cid, NodeId nodeId, String user,
|
||||||
|
NMContainerTokenSecretManager secretMgr) throws IOException {
|
||||||
|
long rmid = cid.getApplicationAttemptId().getApplicationId()
|
||||||
|
.getClusterTimestamp();
|
||||||
|
ContainerTokenIdentifier ctid = new ContainerTokenIdentifier(cid,
|
||||||
|
nodeId.toString(), user, BuilderUtils.newResource(1024, 1),
|
||||||
|
System.currentTimeMillis() + 100000L,
|
||||||
|
secretMgr.getCurrentKey().getKeyId(), rmid,
|
||||||
|
Priority.newInstance(0), 0);
|
||||||
|
Token token = BuilderUtils.newContainerToken(nodeId,
|
||||||
|
secretMgr.createPassword(ctid), ctid);
|
||||||
|
return BuilderUtils.newContainerTokenIdentifier(token);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class ContainerTokenKeyGeneratorForTest extends
|
||||||
|
BaseContainerTokenSecretManager {
|
||||||
|
public ContainerTokenKeyGeneratorForTest(Configuration conf) {
|
||||||
|
super(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
public MasterKey generateKey() {
|
||||||
|
return createNewMasterKey().getMasterKey();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -73,7 +73,7 @@ public void testRecovery() throws IOException {
|
|||||||
|
|
||||||
// restart and verify key is still there and token still valid
|
// restart and verify key is still there and token still valid
|
||||||
secretMgr = new NMTokenSecretManagerInNM(stateStore);
|
secretMgr = new NMTokenSecretManagerInNM(stateStore);
|
||||||
secretMgr.recover(stateStore.loadNMTokenState());
|
secretMgr.recover();
|
||||||
secretMgr.setNodeId(nodeId);
|
secretMgr.setNodeId(nodeId);
|
||||||
assertEquals(currentKey, secretMgr.getCurrentKey());
|
assertEquals(currentKey, secretMgr.getCurrentKey());
|
||||||
assertTrue(secretMgr.isAppAttemptNMTokenKeyPresent(attempt1));
|
assertTrue(secretMgr.isAppAttemptNMTokenKeyPresent(attempt1));
|
||||||
@ -88,7 +88,7 @@ public void testRecovery() throws IOException {
|
|||||||
|
|
||||||
// restart and verify attempt1 key is still valid due to prev key persist
|
// restart and verify attempt1 key is still valid due to prev key persist
|
||||||
secretMgr = new NMTokenSecretManagerInNM(stateStore);
|
secretMgr = new NMTokenSecretManagerInNM(stateStore);
|
||||||
secretMgr.recover(stateStore.loadNMTokenState());
|
secretMgr.recover();
|
||||||
secretMgr.setNodeId(nodeId);
|
secretMgr.setNodeId(nodeId);
|
||||||
assertEquals(currentKey, secretMgr.getCurrentKey());
|
assertEquals(currentKey, secretMgr.getCurrentKey());
|
||||||
assertFalse(secretMgr.isAppAttemptNMTokenKeyPresent(attempt1));
|
assertFalse(secretMgr.isAppAttemptNMTokenKeyPresent(attempt1));
|
||||||
@ -101,7 +101,7 @@ public void testRecovery() throws IOException {
|
|||||||
currentKey = keygen.generateKey();
|
currentKey = keygen.generateKey();
|
||||||
secretMgr.setMasterKey(currentKey);
|
secretMgr.setMasterKey(currentKey);
|
||||||
secretMgr = new NMTokenSecretManagerInNM(stateStore);
|
secretMgr = new NMTokenSecretManagerInNM(stateStore);
|
||||||
secretMgr.recover(stateStore.loadNMTokenState());
|
secretMgr.recover();
|
||||||
secretMgr.setNodeId(nodeId);
|
secretMgr.setNodeId(nodeId);
|
||||||
assertEquals(currentKey, secretMgr.getCurrentKey());
|
assertEquals(currentKey, secretMgr.getCurrentKey());
|
||||||
assertFalse(secretMgr.isAppAttemptNMTokenKeyPresent(attempt1));
|
assertFalse(secretMgr.isAppAttemptNMTokenKeyPresent(attempt1));
|
||||||
@ -117,7 +117,7 @@ public void testRecovery() throws IOException {
|
|||||||
// remove last attempt, restart, verify both tokens are now bad
|
// remove last attempt, restart, verify both tokens are now bad
|
||||||
secretMgr.appFinished(attempt2.getApplicationId());
|
secretMgr.appFinished(attempt2.getApplicationId());
|
||||||
secretMgr = new NMTokenSecretManagerInNM(stateStore);
|
secretMgr = new NMTokenSecretManagerInNM(stateStore);
|
||||||
secretMgr.recover(stateStore.loadNMTokenState());
|
secretMgr.recover();
|
||||||
secretMgr.setNodeId(nodeId);
|
secretMgr.setNodeId(nodeId);
|
||||||
assertEquals(currentKey, secretMgr.getCurrentKey());
|
assertEquals(currentKey, secretMgr.getCurrentKey());
|
||||||
assertFalse(secretMgr.isAppAttemptNMTokenKeyPresent(attempt1));
|
assertFalse(secretMgr.isAppAttemptNMTokenKeyPresent(attempt1));
|
||||||
|
Loading…
Reference in New Issue
Block a user