YARN-2958. Made RMStateStore not update the last sequence number when updating the delegation token. Contributed by Varun Saxena.
This commit is contained in:
parent
dfd2589bcb
commit
562a701945
@ -311,6 +311,9 @@ Release 2.7.0 - UNRELEASED
|
||||
YARN-2922. ConcurrentModificationException in CapacityScheduler's LeafQueue.
|
||||
(Rohith Sharmaks via ozawa)
|
||||
|
||||
YARN-2958. Made RMStateStore not update the last sequence number when updating the
|
||||
delegation token. (Varun Saxena via zjshen)
|
||||
|
||||
Release 2.6.0 - 2014-11-18
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -60,8 +60,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.AM
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.EpochPBImpl;
|
||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
@Private
|
||||
@ -452,11 +450,10 @@ public class FileSystemRMStateStore extends RMStateStore {
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void storeRMDelegationTokenAndSequenceNumberState(
|
||||
RMDelegationTokenIdentifier identifier, Long renewDate,
|
||||
int latestSequenceNumber) throws Exception {
|
||||
storeOrUpdateRMDelegationTokenAndSequenceNumberState(
|
||||
identifier, renewDate,latestSequenceNumber, false);
|
||||
public synchronized void storeRMDelegationTokenState(
|
||||
RMDelegationTokenIdentifier identifier, Long renewDate)
|
||||
throws Exception {
|
||||
storeOrUpdateRMDelegationTokenState(identifier, renewDate, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -469,16 +466,15 @@ public class FileSystemRMStateStore extends RMStateStore {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void updateRMDelegationTokenAndSequenceNumberInternal(
|
||||
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
|
||||
int latestSequenceNumber) throws Exception {
|
||||
storeOrUpdateRMDelegationTokenAndSequenceNumberState(
|
||||
rmDTIdentifier, renewDate,latestSequenceNumber, true);
|
||||
protected void updateRMDelegationTokenState(
|
||||
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate)
|
||||
throws Exception {
|
||||
storeOrUpdateRMDelegationTokenState(rmDTIdentifier, renewDate, true);
|
||||
}
|
||||
|
||||
private void storeOrUpdateRMDelegationTokenAndSequenceNumberState(
|
||||
private void storeOrUpdateRMDelegationTokenState(
|
||||
RMDelegationTokenIdentifier identifier, Long renewDate,
|
||||
int latestSequenceNumber, boolean isUpdate) throws Exception {
|
||||
boolean isUpdate) throws Exception {
|
||||
Path nodeCreatePath =
|
||||
getNodePath(rmDTSecretManagerRoot,
|
||||
DELEGATION_TOKEN_PREFIX + identifier.getSequenceNumber());
|
||||
@ -490,23 +486,24 @@ public class FileSystemRMStateStore extends RMStateStore {
|
||||
} else {
|
||||
LOG.info("Storing RMDelegationToken_" + identifier.getSequenceNumber());
|
||||
writeFile(nodeCreatePath, identifierData.toByteArray());
|
||||
}
|
||||
|
||||
// store sequence number
|
||||
Path latestSequenceNumberPath = getNodePath(rmDTSecretManagerRoot,
|
||||
DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX + latestSequenceNumber);
|
||||
LOG.info("Storing " + DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX
|
||||
+ latestSequenceNumber);
|
||||
if (dtSequenceNumberPath == null) {
|
||||
if (!createFile(latestSequenceNumberPath)) {
|
||||
throw new Exception("Failed to create " + latestSequenceNumberPath);
|
||||
}
|
||||
} else {
|
||||
if (!renameFile(dtSequenceNumberPath, latestSequenceNumberPath)) {
|
||||
throw new Exception("Failed to rename " + dtSequenceNumberPath);
|
||||
// store sequence number
|
||||
Path latestSequenceNumberPath = getNodePath(rmDTSecretManagerRoot,
|
||||
DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX
|
||||
+ identifier.getSequenceNumber());
|
||||
LOG.info("Storing " + DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX
|
||||
+ identifier.getSequenceNumber());
|
||||
if (dtSequenceNumberPath == null) {
|
||||
if (!createFile(latestSequenceNumberPath)) {
|
||||
throw new Exception("Failed to create " + latestSequenceNumberPath);
|
||||
}
|
||||
} else {
|
||||
if (!renameFile(dtSequenceNumberPath, latestSequenceNumberPath)) {
|
||||
throw new Exception("Failed to rename " + dtSequenceNumberPath);
|
||||
}
|
||||
}
|
||||
dtSequenceNumberPath = latestSequenceNumberPath;
|
||||
}
|
||||
dtSequenceNumberPath = latestSequenceNumberPath;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -544,31 +544,30 @@ public class LeveldbRMStateStore extends RMStateStore {
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void storeRMDelegationTokenAndSequenceNumberState(
|
||||
RMDelegationTokenIdentifier tokenId, Long renewDate,
|
||||
int latestSequenceNumber) throws IOException {
|
||||
|
||||
private void storeOrUpdateRMDT(RMDelegationTokenIdentifier tokenId,
|
||||
Long renewDate, boolean isUpdate) throws IOException {
|
||||
String tokenKey = getRMDTTokenNodeKey(tokenId);
|
||||
RMDelegationTokenIdentifierData tokenData =
|
||||
new RMDelegationTokenIdentifierData(tokenId, renewDate);
|
||||
ByteArrayOutputStream bs = new ByteArrayOutputStream();
|
||||
DataOutputStream ds = new DataOutputStream(bs);
|
||||
try {
|
||||
ds.writeInt(latestSequenceNumber);
|
||||
} finally {
|
||||
ds.close();
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Storing token to " + tokenKey);
|
||||
LOG.debug("Storing " + latestSequenceNumber + " to "
|
||||
+ RM_DT_SEQUENCE_NUMBER_KEY);
|
||||
}
|
||||
try {
|
||||
WriteBatch batch = db.createWriteBatch();
|
||||
try {
|
||||
batch.put(bytes(tokenKey), tokenData.toByteArray());
|
||||
batch.put(bytes(RM_DT_SEQUENCE_NUMBER_KEY), bs.toByteArray());
|
||||
if(!isUpdate) {
|
||||
ByteArrayOutputStream bs = new ByteArrayOutputStream();
|
||||
try (DataOutputStream ds = new DataOutputStream(bs)) {
|
||||
ds.writeInt(tokenId.getSequenceNumber());
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Storing " + tokenId.getSequenceNumber() + " to "
|
||||
+ RM_DT_SEQUENCE_NUMBER_KEY);
|
||||
}
|
||||
batch.put(bytes(RM_DT_SEQUENCE_NUMBER_KEY), bs.toByteArray());
|
||||
}
|
||||
db.write(batch);
|
||||
} finally {
|
||||
batch.close();
|
||||
@ -579,11 +578,17 @@ public class LeveldbRMStateStore extends RMStateStore {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void updateRMDelegationTokenAndSequenceNumberInternal(
|
||||
RMDelegationTokenIdentifier tokenId, Long renewDate,
|
||||
int latestSequenceNumber) throws IOException {
|
||||
storeRMDelegationTokenAndSequenceNumberState(tokenId, renewDate,
|
||||
latestSequenceNumber);
|
||||
protected void storeRMDelegationTokenState(
|
||||
RMDelegationTokenIdentifier tokenId, Long renewDate)
|
||||
throws IOException {
|
||||
storeOrUpdateRMDT(tokenId, renewDate, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void updateRMDelegationTokenState(
|
||||
RMDelegationTokenIdentifier tokenId, Long renewDate)
|
||||
throws IOException {
|
||||
storeOrUpdateRMDT(tokenId, renewDate, true);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -149,23 +149,30 @@ public class MemoryRMStateStore extends RMStateStore {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void storeRMDelegationTokenAndSequenceNumberState(
|
||||
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
|
||||
int latestSequenceNumber) throws Exception {
|
||||
private void storeOrUpdateRMDT(RMDelegationTokenIdentifier rmDTIdentifier,
|
||||
Long renewDate, boolean isUpdate) throws Exception {
|
||||
Map<RMDelegationTokenIdentifier, Long> rmDTState =
|
||||
state.rmSecretManagerState.getTokenState();
|
||||
if (rmDTState.containsKey(rmDTIdentifier)) {
|
||||
IOException e = new IOException("RMDelegationToken: " + rmDTIdentifier
|
||||
+ "is already stored.");
|
||||
+ "is already stored.");
|
||||
LOG.info("Error storing info for RMDelegationToken: " + rmDTIdentifier, e);
|
||||
throw e;
|
||||
}
|
||||
rmDTState.put(rmDTIdentifier, renewDate);
|
||||
state.rmSecretManagerState.dtSequenceNumber = latestSequenceNumber;
|
||||
if(!isUpdate) {
|
||||
state.rmSecretManagerState.dtSequenceNumber =
|
||||
rmDTIdentifier.getSequenceNumber();
|
||||
}
|
||||
LOG.info("Store RMDT with sequence number "
|
||||
+ rmDTIdentifier.getSequenceNumber()
|
||||
+ ". And the latest sequence number is " + latestSequenceNumber);
|
||||
+ rmDTIdentifier.getSequenceNumber());
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void storeRMDelegationTokenState(
|
||||
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate)
|
||||
throws Exception {
|
||||
storeOrUpdateRMDT(rmDTIdentifier, renewDate, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -179,12 +186,11 @@ public class MemoryRMStateStore extends RMStateStore {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void updateRMDelegationTokenAndSequenceNumberInternal(
|
||||
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
|
||||
int latestSequenceNumber) throws Exception {
|
||||
protected void updateRMDelegationTokenState(
|
||||
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate)
|
||||
throws Exception {
|
||||
removeRMDelegationTokenState(rmDTIdentifier);
|
||||
storeRMDelegationTokenAndSequenceNumberState(
|
||||
rmDTIdentifier, renewDate, latestSequenceNumber);
|
||||
storeOrUpdateRMDT(rmDTIdentifier, renewDate, true);
|
||||
LOG.info("Update RMDT with sequence number "
|
||||
+ rmDTIdentifier.getSequenceNumber());
|
||||
}
|
||||
|
@ -77,9 +77,9 @@ public class NullRMStateStore extends RMStateStore {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void storeRMDelegationTokenAndSequenceNumberState(
|
||||
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
|
||||
int latestSequenceNumber) throws Exception {
|
||||
public void storeRMDelegationTokenState(
|
||||
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate)
|
||||
throws Exception {
|
||||
// Do nothing
|
||||
}
|
||||
|
||||
@ -90,9 +90,9 @@ public class NullRMStateStore extends RMStateStore {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void updateRMDelegationTokenAndSequenceNumberInternal(
|
||||
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
|
||||
int latestSequenceNumber) throws Exception {
|
||||
protected void updateRMDelegationTokenState(
|
||||
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate)
|
||||
throws Exception {
|
||||
// Do nothing
|
||||
}
|
||||
|
||||
|
@ -296,9 +296,8 @@ public abstract class RMStateStore extends AbstractService {
|
||||
RMStateStoreRMDTEvent dtEvent = (RMStateStoreRMDTEvent) event;
|
||||
try {
|
||||
LOG.info("Storing RMDelegationToken and SequenceNumber");
|
||||
store.storeRMDelegationTokenAndSequenceNumberState(
|
||||
dtEvent.getRmDTIdentifier(), dtEvent.getRenewDate(),
|
||||
dtEvent.getLatestSequenceNumber());
|
||||
store.storeRMDelegationTokenState(
|
||||
dtEvent.getRmDTIdentifier(), dtEvent.getRenewDate());
|
||||
} catch (Exception e) {
|
||||
LOG.error("Error While Storing RMDelegationToken and SequenceNumber ",
|
||||
e);
|
||||
@ -341,9 +340,8 @@ public abstract class RMStateStore extends AbstractService {
|
||||
RMStateStoreRMDTEvent dtEvent = (RMStateStoreRMDTEvent) event;
|
||||
try {
|
||||
LOG.info("Updating RMDelegationToken and SequenceNumber");
|
||||
store.updateRMDelegationTokenAndSequenceNumberInternal(
|
||||
dtEvent.getRmDTIdentifier(), dtEvent.getRenewDate(),
|
||||
dtEvent.getLatestSequenceNumber());
|
||||
store.updateRMDelegationTokenState(
|
||||
dtEvent.getRmDTIdentifier(), dtEvent.getRenewDate());
|
||||
} catch (Exception e) {
|
||||
LOG.error("Error While Updating RMDelegationToken and SequenceNumber ",
|
||||
e);
|
||||
@ -672,11 +670,10 @@ public abstract class RMStateStore extends AbstractService {
|
||||
* RMDTSecretManager call this to store the state of a delegation token
|
||||
* and sequence number
|
||||
*/
|
||||
public void storeRMDelegationTokenAndSequenceNumber(
|
||||
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
|
||||
int latestSequenceNumber) {
|
||||
public void storeRMDelegationToken(
|
||||
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate) {
|
||||
handleStoreEvent(new RMStateStoreRMDTEvent(rmDTIdentifier, renewDate,
|
||||
latestSequenceNumber, RMStateStoreEventType.STORE_DELEGATION_TOKEN));
|
||||
RMStateStoreEventType.STORE_DELEGATION_TOKEN));
|
||||
}
|
||||
|
||||
/**
|
||||
@ -684,17 +681,17 @@ public abstract class RMStateStore extends AbstractService {
|
||||
* Derived classes must implement this method to store the state of
|
||||
* RMDelegationToken and sequence number
|
||||
*/
|
||||
protected abstract void storeRMDelegationTokenAndSequenceNumberState(
|
||||
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
|
||||
int latestSequenceNumber) throws Exception;
|
||||
protected abstract void storeRMDelegationTokenState(
|
||||
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate)
|
||||
throws Exception;
|
||||
|
||||
/**
|
||||
* RMDTSecretManager call this to remove the state of a delegation token
|
||||
*/
|
||||
public void removeRMDelegationToken(
|
||||
RMDelegationTokenIdentifier rmDTIdentifier, int sequenceNumber) {
|
||||
RMDelegationTokenIdentifier rmDTIdentifier) {
|
||||
handleStoreEvent(new RMStateStoreRMDTEvent(rmDTIdentifier, null,
|
||||
sequenceNumber, RMStateStoreEventType.REMOVE_DELEGATION_TOKEN));
|
||||
RMStateStoreEventType.REMOVE_DELEGATION_TOKEN));
|
||||
}
|
||||
|
||||
/**
|
||||
@ -708,11 +705,10 @@ public abstract class RMStateStore extends AbstractService {
|
||||
* RMDTSecretManager call this to update the state of a delegation token
|
||||
* and sequence number
|
||||
*/
|
||||
public void updateRMDelegationTokenAndSequenceNumber(
|
||||
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
|
||||
int latestSequenceNumber) {
|
||||
public void updateRMDelegationToken(
|
||||
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate) {
|
||||
handleStoreEvent(new RMStateStoreRMDTEvent(rmDTIdentifier, renewDate,
|
||||
latestSequenceNumber, RMStateStoreEventType.UPDATE_DELEGATION_TOKEN));
|
||||
RMStateStoreEventType.UPDATE_DELEGATION_TOKEN));
|
||||
}
|
||||
|
||||
/**
|
||||
@ -720,9 +716,9 @@ public abstract class RMStateStore extends AbstractService {
|
||||
* Derived classes must implement this method to update the state of
|
||||
* RMDelegationToken and sequence number
|
||||
*/
|
||||
protected abstract void updateRMDelegationTokenAndSequenceNumberInternal(
|
||||
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
|
||||
int latestSequenceNumber) throws Exception;
|
||||
protected abstract void updateRMDelegationTokenState(
|
||||
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate)
|
||||
throws Exception;
|
||||
|
||||
/**
|
||||
* RMDTSecretManager call this to store the state of a master key
|
||||
|
@ -23,18 +23,16 @@ import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
||||
public class RMStateStoreRMDTEvent extends RMStateStoreEvent {
|
||||
private RMDelegationTokenIdentifier rmDTIdentifier;
|
||||
private Long renewDate;
|
||||
private int latestSequenceNumber;
|
||||
|
||||
public RMStateStoreRMDTEvent(RMStateStoreEventType type) {
|
||||
super(type);
|
||||
}
|
||||
|
||||
public RMStateStoreRMDTEvent(RMDelegationTokenIdentifier rmDTIdentifier,
|
||||
Long renewDate, int latestSequenceNumber, RMStateStoreEventType type) {
|
||||
Long renewDate, RMStateStoreEventType type) {
|
||||
this(type);
|
||||
this.rmDTIdentifier = rmDTIdentifier;
|
||||
this.renewDate = renewDate;
|
||||
this.latestSequenceNumber = latestSequenceNumber;
|
||||
}
|
||||
|
||||
public RMDelegationTokenIdentifier getRmDTIdentifier() {
|
||||
@ -44,8 +42,4 @@ public class RMStateStoreRMDTEvent extends RMStateStoreEvent {
|
||||
public Long getRenewDate() {
|
||||
return renewDate;
|
||||
}
|
||||
|
||||
public int getLatestSequenceNumber() {
|
||||
return latestSequenceNumber;
|
||||
}
|
||||
}
|
||||
}
|
@ -698,12 +698,11 @@ public class ZKRMStateStore extends RMStateStore {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected synchronized void storeRMDelegationTokenAndSequenceNumberState(
|
||||
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
|
||||
int latestSequenceNumber) throws Exception {
|
||||
protected synchronized void storeRMDelegationTokenState(
|
||||
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate)
|
||||
throws Exception {
|
||||
ArrayList<Op> opList = new ArrayList<Op>();
|
||||
addStoreOrUpdateOps(
|
||||
opList, rmDTIdentifier, renewDate, latestSequenceNumber, false);
|
||||
addStoreOrUpdateOps(opList, rmDTIdentifier, renewDate, false);
|
||||
doMultiWithRetries(opList);
|
||||
}
|
||||
|
||||
@ -727,29 +726,27 @@ public class ZKRMStateStore extends RMStateStore {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected synchronized void updateRMDelegationTokenAndSequenceNumberInternal(
|
||||
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
|
||||
int latestSequenceNumber) throws Exception {
|
||||
protected synchronized void updateRMDelegationTokenState(
|
||||
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate)
|
||||
throws Exception {
|
||||
ArrayList<Op> opList = new ArrayList<Op>();
|
||||
String nodeRemovePath =
|
||||
getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX
|
||||
+ rmDTIdentifier.getSequenceNumber());
|
||||
if (existsWithRetries(nodeRemovePath, true) == null) {
|
||||
// in case znode doesn't exist
|
||||
addStoreOrUpdateOps(
|
||||
opList, rmDTIdentifier, renewDate, latestSequenceNumber, false);
|
||||
addStoreOrUpdateOps(opList, rmDTIdentifier, renewDate, false);
|
||||
LOG.debug("Attempted to update a non-existing znode " + nodeRemovePath);
|
||||
} else {
|
||||
// in case znode exists
|
||||
addStoreOrUpdateOps(
|
||||
opList, rmDTIdentifier, renewDate, latestSequenceNumber, true);
|
||||
addStoreOrUpdateOps(opList, rmDTIdentifier, renewDate, true);
|
||||
}
|
||||
doMultiWithRetries(opList);
|
||||
}
|
||||
|
||||
private void addStoreOrUpdateOps(ArrayList<Op> opList,
|
||||
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
|
||||
int latestSequenceNumber, boolean isUpdate) throws Exception {
|
||||
boolean isUpdate) throws Exception {
|
||||
// store RM delegation token
|
||||
String nodeCreatePath =
|
||||
getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX
|
||||
@ -769,16 +766,15 @@ public class ZKRMStateStore extends RMStateStore {
|
||||
} else {
|
||||
opList.add(Op.create(nodeCreatePath, identifierData.toByteArray(), zkAcl,
|
||||
CreateMode.PERSISTENT));
|
||||
// Update Sequence number only while storing DT
|
||||
seqOut.writeInt(rmDTIdentifier.getSequenceNumber());
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug((isUpdate ? "Storing " : "Updating ") +
|
||||
dtSequenceNumberPath + ". SequenceNumber: "
|
||||
+ rmDTIdentifier.getSequenceNumber());
|
||||
}
|
||||
opList.add(Op.setData(dtSequenceNumberPath, seqOs.toByteArray(), -1));
|
||||
}
|
||||
|
||||
|
||||
seqOut.writeInt(latestSequenceNumber);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug((isUpdate ? "Storing " : "Updating ") + dtSequenceNumberPath +
|
||||
". SequenceNumber: " + latestSequenceNumber);
|
||||
}
|
||||
|
||||
opList.add(Op.setData(dtSequenceNumberPath, seqOs.toByteArray(), -1));
|
||||
} finally {
|
||||
seqOs.close();
|
||||
}
|
||||
|
@ -29,10 +29,8 @@ import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
||||
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
|
||||
import org.apache.hadoop.security.token.delegation.DelegationKey;
|
||||
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation;
|
||||
import org.apache.hadoop.util.ExitUtil;
|
||||
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
@ -109,8 +107,7 @@ public class RMDelegationTokenSecretManager extends
|
||||
try {
|
||||
LOG.info("storing RMDelegation token with sequence number: "
|
||||
+ identifier.getSequenceNumber());
|
||||
rmContext.getStateStore().storeRMDelegationTokenAndSequenceNumber(
|
||||
identifier, renewDate, identifier.getSequenceNumber());
|
||||
rmContext.getStateStore().storeRMDelegationToken(identifier, renewDate);
|
||||
} catch (Exception e) {
|
||||
LOG.error("Error in storing RMDelegationToken with sequence number: "
|
||||
+ identifier.getSequenceNumber());
|
||||
@ -124,11 +121,10 @@ public class RMDelegationTokenSecretManager extends
|
||||
try {
|
||||
LOG.info("updating RMDelegation token with sequence number: "
|
||||
+ id.getSequenceNumber());
|
||||
rmContext.getStateStore().updateRMDelegationTokenAndSequenceNumber(id,
|
||||
renewDate, id.getSequenceNumber());
|
||||
rmContext.getStateStore().updateRMDelegationToken(id, renewDate);
|
||||
} catch (Exception e) {
|
||||
LOG.error("Error in updating persisted RMDelegationToken with sequence number: "
|
||||
+ id.getSequenceNumber());
|
||||
LOG.error("Error in updating persisted RMDelegationToken" +
|
||||
" with sequence number: " + id.getSequenceNumber());
|
||||
ExitUtil.terminate(1, e);
|
||||
}
|
||||
}
|
||||
@ -139,8 +135,7 @@ public class RMDelegationTokenSecretManager extends
|
||||
try {
|
||||
LOG.info("removing RMDelegation token with sequence number: "
|
||||
+ ident.getSequenceNumber());
|
||||
rmContext.getStateStore().removeRMDelegationToken(ident,
|
||||
delegationTokenSequenceNumber);
|
||||
rmContext.getStateStore().removeRMDelegationToken(ident);
|
||||
} catch (Exception e) {
|
||||
LOG.error("Error in removing RMDelegationToken with sequence number: "
|
||||
+ ident.getSequenceNumber());
|
||||
|
@ -411,16 +411,15 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
|
||||
RMDelegationTokenIdentifier dtId1 =
|
||||
new RMDelegationTokenIdentifier(new Text("owner1"),
|
||||
new Text("renewer1"), new Text("realuser1"));
|
||||
int sequenceNumber = 1111;
|
||||
dtId1.setSequenceNumber(sequenceNumber);
|
||||
byte[] tokenBeforeStore = dtId1.getBytes();
|
||||
Long renewDate1 = new Long(System.currentTimeMillis());
|
||||
int sequenceNumber = 1111;
|
||||
store.storeRMDelegationTokenAndSequenceNumber(dtId1, renewDate1,
|
||||
sequenceNumber);
|
||||
store.storeRMDelegationToken(dtId1, renewDate1);
|
||||
modifyRMDelegationTokenState();
|
||||
Map<RMDelegationTokenIdentifier, Long> token1 =
|
||||
new HashMap<RMDelegationTokenIdentifier, Long>();
|
||||
token1.put(dtId1, renewDate1);
|
||||
|
||||
// store delegation key;
|
||||
DelegationKey key = new DelegationKey(1234, 4321 , "keyBytes".getBytes());
|
||||
HashSet<DelegationKey> keySet = new HashSet<DelegationKey>();
|
||||
@ -440,9 +439,7 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
|
||||
|
||||
// update RM delegation token;
|
||||
renewDate1 = new Long(System.currentTimeMillis());
|
||||
++sequenceNumber;
|
||||
store.updateRMDelegationTokenAndSequenceNumber(
|
||||
dtId1, renewDate1, sequenceNumber);
|
||||
store.updateRMDelegationToken(dtId1, renewDate1);
|
||||
token1.put(dtId1, renewDate1);
|
||||
|
||||
RMDTSecretManagerState updateSecretManagerState =
|
||||
@ -463,7 +460,7 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
|
||||
noKeySecretManagerState.getDTSequenceNumber());
|
||||
|
||||
// check to delete delegationToken
|
||||
store.removeRMDelegationToken(dtId1, sequenceNumber);
|
||||
store.removeRMDelegationToken(dtId1);
|
||||
RMDTSecretManagerState noKeyAndTokenSecretManagerState =
|
||||
store.loadState().getRMDTSecretManagerState();
|
||||
token1.clear();
|
||||
|
@ -337,20 +337,18 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
|
||||
RMDelegationTokenIdentifier dtId1 =
|
||||
new RMDelegationTokenIdentifier(new Text("owner1"),
|
||||
new Text("renewer1"), new Text("realuser1"));
|
||||
Long renewDate1 = new Long(System.currentTimeMillis());
|
||||
int sequenceNumber = 1111;
|
||||
store.storeRMDelegationTokenAndSequenceNumber(dtId1, renewDate1,
|
||||
sequenceNumber);
|
||||
Long renewDate1 = new Long(System.currentTimeMillis());
|
||||
dtId1.setSequenceNumber(1111);
|
||||
store.storeRMDelegationToken(dtId1, renewDate1);
|
||||
assertEquals("RMStateStore should have been in fenced state", true,
|
||||
store.isFencedState());
|
||||
|
||||
store.updateRMDelegationTokenAndSequenceNumber(dtId1, renewDate1,
|
||||
sequenceNumber);
|
||||
store.updateRMDelegationToken(dtId1, renewDate1);
|
||||
assertEquals("RMStateStore should have been in fenced state", true,
|
||||
store.isFencedState());
|
||||
|
||||
// remove delegation key;
|
||||
store.removeRMDelegationToken(dtId1, sequenceNumber);
|
||||
store.removeRMDelegationToken(dtId1);
|
||||
assertEquals("RMStateStore should have been in fenced state", true,
|
||||
store.isFencedState());
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user