YARN-3385. Fixed a race-condition in ResourceManager's ZooKeeper based state-store to avoid crashing on duplicate deletes. Contributed by Zhihai Xu.
This commit is contained in:
parent
31b627b2a8
commit
4c7b9b6abe
@ -373,6 +373,9 @@ Release 2.7.1 - UNRELEASED
|
|||||||
YARN-3301. Fixed the format issue of the new RM attempt web page.
|
YARN-3301. Fixed the format issue of the new RM attempt web page.
|
||||||
(Xuan Gong via jianhe)
|
(Xuan Gong via jianhe)
|
||||||
|
|
||||||
|
YARN-3385. Fixed a race-condition in ResourceManager's ZooKeeper based
|
||||||
|
state-store to avoid crashing on duplicate deletes. (Zhihai Xu via vinodkv)
|
||||||
|
|
||||||
Release 2.7.0 - 2015-04-20
|
Release 2.7.0 - 2015-04-20
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -694,7 +694,7 @@ public synchronized void removeApplicationStateInternal(
|
|||||||
LOG.debug("Removing info for app: " + appId + " at: " + appIdRemovePath
|
LOG.debug("Removing info for app: " + appId + " at: " + appIdRemovePath
|
||||||
+ " and its attempts.");
|
+ " and its attempts.");
|
||||||
}
|
}
|
||||||
doMultiWithRetries(opList);
|
doDeleteMultiWithRetries(opList);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -703,13 +703,12 @@ protected synchronized void storeRMDelegationTokenState(
|
|||||||
throws Exception {
|
throws Exception {
|
||||||
ArrayList<Op> opList = new ArrayList<Op>();
|
ArrayList<Op> opList = new ArrayList<Op>();
|
||||||
addStoreOrUpdateOps(opList, rmDTIdentifier, renewDate, false);
|
addStoreOrUpdateOps(opList, rmDTIdentifier, renewDate, false);
|
||||||
doMultiWithRetries(opList);
|
doStoreMultiWithRetries(opList);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected synchronized void removeRMDelegationTokenState(
|
protected synchronized void removeRMDelegationTokenState(
|
||||||
RMDelegationTokenIdentifier rmDTIdentifier) throws Exception {
|
RMDelegationTokenIdentifier rmDTIdentifier) throws Exception {
|
||||||
ArrayList<Op> opList = new ArrayList<Op>();
|
|
||||||
String nodeRemovePath =
|
String nodeRemovePath =
|
||||||
getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX
|
getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX
|
||||||
+ rmDTIdentifier.getSequenceNumber());
|
+ rmDTIdentifier.getSequenceNumber());
|
||||||
@ -718,11 +717,12 @@ protected synchronized void removeRMDelegationTokenState(
|
|||||||
+ rmDTIdentifier.getSequenceNumber());
|
+ rmDTIdentifier.getSequenceNumber());
|
||||||
}
|
}
|
||||||
if (existsWithRetries(nodeRemovePath, false) != null) {
|
if (existsWithRetries(nodeRemovePath, false) != null) {
|
||||||
|
ArrayList<Op> opList = new ArrayList<Op>();
|
||||||
opList.add(Op.delete(nodeRemovePath, -1));
|
opList.add(Op.delete(nodeRemovePath, -1));
|
||||||
|
doDeleteMultiWithRetries(opList);
|
||||||
} else {
|
} else {
|
||||||
LOG.debug("Attempted to delete a non-existing znode " + nodeRemovePath);
|
LOG.debug("Attempted to delete a non-existing znode " + nodeRemovePath);
|
||||||
}
|
}
|
||||||
doMultiWithRetries(opList);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -741,7 +741,7 @@ protected synchronized void updateRMDelegationTokenState(
|
|||||||
// in case znode exists
|
// in case znode exists
|
||||||
addStoreOrUpdateOps(opList, rmDTIdentifier, renewDate, true);
|
addStoreOrUpdateOps(opList, rmDTIdentifier, renewDate, true);
|
||||||
}
|
}
|
||||||
doMultiWithRetries(opList);
|
doStoreMultiWithRetries(opList);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void addStoreOrUpdateOps(ArrayList<Op> opList,
|
private void addStoreOrUpdateOps(ArrayList<Op> opList,
|
||||||
@ -810,7 +810,7 @@ protected synchronized void removeRMDTMasterKeyState(
|
|||||||
LOG.debug("Removing RMDelegationKey_" + delegationKey.getKeyId());
|
LOG.debug("Removing RMDelegationKey_" + delegationKey.getKeyId());
|
||||||
}
|
}
|
||||||
if (existsWithRetries(nodeRemovePath, false) != null) {
|
if (existsWithRetries(nodeRemovePath, false) != null) {
|
||||||
doMultiWithRetries(Op.delete(nodeRemovePath, -1));
|
doDeleteMultiWithRetries(Op.delete(nodeRemovePath, -1));
|
||||||
} else {
|
} else {
|
||||||
LOG.debug("Attempted to delete a non-existing znode " + nodeRemovePath);
|
LOG.debug("Attempted to delete a non-existing znode " + nodeRemovePath);
|
||||||
}
|
}
|
||||||
@ -914,7 +914,7 @@ String getNodePath(String root, String nodeName) {
|
|||||||
* Helper method that creates fencing node, executes the passed operations,
|
* Helper method that creates fencing node, executes the passed operations,
|
||||||
* and deletes the fencing node.
|
* and deletes the fencing node.
|
||||||
*/
|
*/
|
||||||
private synchronized void doMultiWithRetries(
|
private synchronized void doStoreMultiWithRetries(
|
||||||
final List<Op> opList) throws Exception {
|
final List<Op> opList) throws Exception {
|
||||||
final List<Op> execOpList = new ArrayList<Op>(opList.size() + 2);
|
final List<Op> execOpList = new ArrayList<Op>(opList.size() + 2);
|
||||||
execOpList.add(createFencingNodePathOp);
|
execOpList.add(createFencingNodePathOp);
|
||||||
@ -933,8 +933,32 @@ public Void run() throws KeeperException, InterruptedException {
|
|||||||
* Helper method that creates fencing node, executes the passed operation,
|
* Helper method that creates fencing node, executes the passed operation,
|
||||||
* and deletes the fencing node.
|
* and deletes the fencing node.
|
||||||
*/
|
*/
|
||||||
private void doMultiWithRetries(final Op op) throws Exception {
|
private void doStoreMultiWithRetries(final Op op) throws Exception {
|
||||||
doMultiWithRetries(Collections.singletonList(op));
|
doStoreMultiWithRetries(Collections.singletonList(op));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Helper method that creates fencing node, executes the passed
|
||||||
|
* delete related operations and deletes the fencing node.
|
||||||
|
*/
|
||||||
|
private synchronized void doDeleteMultiWithRetries(
|
||||||
|
final List<Op> opList) throws Exception {
|
||||||
|
final List<Op> execOpList = new ArrayList<Op>(opList.size() + 2);
|
||||||
|
execOpList.add(createFencingNodePathOp);
|
||||||
|
execOpList.addAll(opList);
|
||||||
|
execOpList.add(deleteFencingNodePathOp);
|
||||||
|
new ZKAction<Void>() {
|
||||||
|
@Override
|
||||||
|
public Void run() throws KeeperException, InterruptedException {
|
||||||
|
setHasDeleteNodeOp(true);
|
||||||
|
zkClient.multi(execOpList);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}.runWithRetries();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void doDeleteMultiWithRetries(final Op op) throws Exception {
|
||||||
|
doDeleteMultiWithRetries(Collections.singletonList(op));
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
@ -943,7 +967,7 @@ private void doMultiWithRetries(final Op op) throws Exception {
|
|||||||
public void createWithRetries(
|
public void createWithRetries(
|
||||||
final String path, final byte[] data, final List<ACL> acl,
|
final String path, final byte[] data, final List<ACL> acl,
|
||||||
final CreateMode mode) throws Exception {
|
final CreateMode mode) throws Exception {
|
||||||
doMultiWithRetries(Op.create(path, data, acl, mode));
|
doStoreMultiWithRetries(Op.create(path, data, acl, mode));
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
@ -951,7 +975,7 @@ public void createWithRetries(
|
|||||||
@Unstable
|
@Unstable
|
||||||
public void setDataWithRetries(final String path, final byte[] data,
|
public void setDataWithRetries(final String path, final byte[] data,
|
||||||
final int version) throws Exception {
|
final int version) throws Exception {
|
||||||
doMultiWithRetries(Op.setData(path, data, version));
|
doStoreMultiWithRetries(Op.setData(path, data, version));
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
@ -1017,7 +1041,12 @@ private void recursiveDeleteWithRetriesHelper(String path, boolean watch)
|
|||||||
for (String child : children) {
|
for (String child : children) {
|
||||||
recursiveDeleteWithRetriesHelper(path + "/" + child, false);
|
recursiveDeleteWithRetriesHelper(path + "/" + child, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
zkClient.delete(path, -1);
|
zkClient.delete(path, -1);
|
||||||
|
} catch (KeeperException.NoNodeException nne) {
|
||||||
|
LOG.info("Node " + path + " doesn't exist to delete");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -1037,7 +1066,7 @@ public void run() {
|
|||||||
if(isFencedState()) {
|
if(isFencedState()) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
doMultiWithRetries(emptyOpList);
|
doStoreMultiWithRetries(emptyOpList);
|
||||||
Thread.sleep(zkSessionTimeout);
|
Thread.sleep(zkSessionTimeout);
|
||||||
}
|
}
|
||||||
} catch (InterruptedException ie) {
|
} catch (InterruptedException ie) {
|
||||||
@ -1050,6 +1079,10 @@ public void run() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private abstract class ZKAction<T> {
|
private abstract class ZKAction<T> {
|
||||||
|
private boolean hasDeleteNodeOp = false;
|
||||||
|
void setHasDeleteNodeOp(boolean hasDeleteOp) {
|
||||||
|
this.hasDeleteNodeOp = hasDeleteOp;
|
||||||
|
}
|
||||||
// run() expects synchronization on ZKRMStateStore.this
|
// run() expects synchronization on ZKRMStateStore.this
|
||||||
abstract T run() throws KeeperException, InterruptedException;
|
abstract T run() throws KeeperException, InterruptedException;
|
||||||
|
|
||||||
@ -1099,6 +1132,11 @@ T runWithRetries() throws Exception {
|
|||||||
LOG.info("znode already exists!");
|
LOG.info("znode already exists!");
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
if (hasDeleteNodeOp && ke.code() == Code.NONODE) {
|
||||||
|
LOG.info("znode has already been deleted!");
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
LOG.info("Exception while executing a ZK operation.", ke);
|
LOG.info("Exception while executing a ZK operation.", ke);
|
||||||
if (shouldRetry(ke.code()) && ++retry < numRetries) {
|
if (shouldRetry(ke.code()) && ++retry < numRetries) {
|
||||||
LOG.info("Retrying operation on ZK. Retry no. " + retry);
|
LOG.info("Retrying operation on ZK. Retry no. " + retry);
|
||||||
|
@ -25,6 +25,7 @@
|
|||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import javax.crypto.SecretKey;
|
import javax.crypto.SecretKey;
|
||||||
@ -38,6 +39,7 @@
|
|||||||
import org.apache.hadoop.security.token.delegation.DelegationKey;
|
import org.apache.hadoop.security.token.delegation.DelegationKey;
|
||||||
import org.apache.hadoop.service.Service;
|
import org.apache.hadoop.service.Service;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||||
import org.apache.hadoop.yarn.api.records.Container;
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||||
@ -58,6 +60,7 @@
|
|||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
|
||||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||||
|
import org.apache.zookeeper.KeeperException;
|
||||||
import org.apache.zookeeper.ZooKeeper;
|
import org.apache.zookeeper.ZooKeeper;
|
||||||
import org.apache.zookeeper.data.Stat;
|
import org.apache.zookeeper.data.Stat;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
@ -381,4 +384,36 @@ public void testFencedState() throws Exception {
|
|||||||
|
|
||||||
store.close();
|
store.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDuplicateRMAppDeletion() throws Exception {
|
||||||
|
TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester();
|
||||||
|
long submitTime = System.currentTimeMillis();
|
||||||
|
long startTime = System.currentTimeMillis() + 1234;
|
||||||
|
RMStateStore store = zkTester.getRMStateStore();
|
||||||
|
TestDispatcher dispatcher = new TestDispatcher();
|
||||||
|
store.setRMDispatcher(dispatcher);
|
||||||
|
|
||||||
|
ApplicationAttemptId attemptIdRemoved = ConverterUtils
|
||||||
|
.toApplicationAttemptId("appattempt_1352994193343_0002_000001");
|
||||||
|
ApplicationId appIdRemoved = attemptIdRemoved.getApplicationId();
|
||||||
|
storeApp(store, appIdRemoved, submitTime, startTime);
|
||||||
|
storeAttempt(store, attemptIdRemoved,
|
||||||
|
"container_1352994193343_0002_01_000001", null, null, dispatcher);
|
||||||
|
|
||||||
|
ApplicationSubmissionContext context =
|
||||||
|
new ApplicationSubmissionContextPBImpl();
|
||||||
|
context.setApplicationId(appIdRemoved);
|
||||||
|
ApplicationStateData appStateRemoved =
|
||||||
|
ApplicationStateData.newInstance(
|
||||||
|
submitTime, startTime, context, "user1");
|
||||||
|
appStateRemoved.attempts.put(attemptIdRemoved, null);
|
||||||
|
store.removeApplicationStateInternal(appStateRemoved);
|
||||||
|
try {
|
||||||
|
store.removeApplicationStateInternal(appStateRemoved);
|
||||||
|
} catch (KeeperException.NoNodeException nne) {
|
||||||
|
Assert.fail("NoNodeException should not happen.");
|
||||||
|
}
|
||||||
|
store.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user