YARN-1641. ZK store should attempt a write periodically to ensure it is still Active. (kasha)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1567628 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Karthik Kambatla 2014-02-12 14:09:13 +00:00
parent d82bc78733
commit 41ec3cce76
4 changed files with 42 additions and 33 deletions

View File

@ -176,6 +176,9 @@ Release 2.4.0 - UNRELEASED
YARN-1706. Created an utility method to dump timeline records to JSON YARN-1706. Created an utility method to dump timeline records to JSON
strings. (zjshen) strings. (zjshen)
YARN-1641. ZK store should attempt a write periodically to ensure it is
still Active. (kasha)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -676,11 +676,11 @@ public abstract class RMStateStore extends AbstractService {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
/** /**
* In {#handleStoreEvent}, this method is called to notify the * This method is called to notify the ResourceManager that the store
* ResourceManager that the store operation has failed. * operation has failed.
* @param failureCause the exception due to which the operation failed * @param failureCause the exception due to which the operation failed
*/ */
private void notifyStoreOperationFailed(Exception failureCause) { protected void notifyStoreOperationFailed(Exception failureCause) {
RMFatalEventType type; RMFatalEventType type;
if (failureCause instanceof StoreFencedException) { if (failureCause instanceof StoreFencedException) {
type = RMFatalEventType.STATE_STORE_FENCED; type = RMFatalEventType.STATE_STORE_FENCED;

View File

@ -137,6 +137,7 @@ public class ZKRMStateStore extends RMStateStore {
private String fencingNodePath; private String fencingNodePath;
private Op createFencingNodePathOp; private Op createFencingNodePathOp;
private Op deleteFencingNodePathOp; private Op deleteFencingNodePathOp;
private Thread verifyActiveStatusThread;
private String zkRootNodeUsername; private String zkRootNodeUsername;
private final String zkRootNodePassword = Long.toString(random.nextLong()); private final String zkRootNodePassword = Long.toString(random.nextLong());
@ -258,6 +259,8 @@ public class ZKRMStateStore extends RMStateStore {
createRootDir(zkRootNodePath); createRootDir(zkRootNodePath);
if (HAUtil.isHAEnabled(getConfig())){ if (HAUtil.isHAEnabled(getConfig())){
fence(); fence();
verifyActiveStatusThread = new VerifyActiveStatusThread();
verifyActiveStatusThread.start();
} }
createRootDir(rmAppRoot); createRootDir(rmAppRoot);
createRootDir(rmDTSecretManagerRoot); createRootDir(rmDTSecretManagerRoot);
@ -350,6 +353,10 @@ public class ZKRMStateStore extends RMStateStore {
@Override @Override
protected synchronized void closeInternal() throws Exception { protected synchronized void closeInternal() throws Exception {
if (verifyActiveStatusThread != null) {
verifyActiveStatusThread.interrupt();
verifyActiveStatusThread.join(1000);
}
closeZkClients(); closeZkClients();
} }
@ -856,6 +863,32 @@ public class ZKRMStateStore extends RMStateStore {
}.runWithRetries(); }.runWithRetries();
} }
/**
* Helper class that periodically attempts creating a znode to ensure that
* this RM continues to be the Active.
*/
private class VerifyActiveStatusThread extends Thread {
private List<Op> emptyOpList = new ArrayList<Op>();
VerifyActiveStatusThread() {
super(VerifyActiveStatusThread.class.getName());
}
public void run() {
try {
while (true) {
doMultiWithRetries(emptyOpList);
Thread.sleep(zkSessionTimeout);
}
} catch (InterruptedException ie) {
LOG.info(VerifyActiveStatusThread.class.getName() + " thread " +
"interrupted! Exiting!");
} catch (Exception e) {
notifyStoreOperationFailed(new StoreFencedException());
}
}
}
private abstract class ZKAction<T> { private abstract class ZKAction<T> {
// run() expects synchronization on ZKRMStateStore.this // run() expects synchronization on ZKRMStateStore.this
abstract T run() throws KeeperException, InterruptedException; abstract T run() throws KeeperException, InterruptedException;

View File

@ -23,10 +23,7 @@ import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -34,15 +31,8 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo; import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
import org.apache.hadoop.service.Service; import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl;
@ -54,6 +44,7 @@ import org.junit.Test;
public class TestZKRMStateStore extends RMStateStoreTestBase { public class TestZKRMStateStore extends RMStateStoreTestBase {
public static final Log LOG = LogFactory.getLog(TestZKRMStateStore.class); public static final Log LOG = LogFactory.getLog(TestZKRMStateStore.class);
private static final int ZK_TIMEOUT_MS = 1000;
class TestZKRMStateStoreTester implements RMStateStoreHelper { class TestZKRMStateStoreTester implements RMStateStoreHelper {
@ -141,6 +132,7 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
conf.set(YarnConfiguration.RM_STORE, ZKRMStateStore.class.getName()); conf.set(YarnConfiguration.RM_STORE, ZKRMStateStore.class.getName());
conf.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort); conf.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort);
conf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, ZK_TIMEOUT_MS);
conf.set(YarnConfiguration.RM_HA_ID, rmId); conf.set(YarnConfiguration.RM_HA_ID, rmId);
for (String rpcAddress : YarnConfiguration.RM_SERVICES_ADDRESS_CONF_KEYS) { for (String rpcAddress : YarnConfiguration.RM_SERVICES_ADDRESS_CONF_KEYS) {
for (String id : HAUtil.getRMHAIds(conf)) { for (String id : HAUtil.getRMHAIds(conf)) {
@ -182,26 +174,7 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
HAServiceProtocol.HAServiceState.ACTIVE, HAServiceProtocol.HAServiceState.ACTIVE,
rm2.getRMContext().getRMAdminService().getServiceStatus().getState()); rm2.getRMContext().getRMAdminService().getServiceStatus().getState());
// Submitting an application to RM1 to trigger a state store operation. for (int i = 0; i < ZK_TIMEOUT_MS / 50; i++) {
// RM1 should realize that it got fenced and is not the Active RM anymore.
Map mockMap = mock(Map.class);
ApplicationSubmissionContext asc =
ApplicationSubmissionContext.newInstance(
ApplicationId.newInstance(1000, 1),
"testApplication", // app Name
"default", // queue name
Priority.newInstance(0),
ContainerLaunchContext.newInstance(mockMap, mockMap,
new ArrayList<String>(), mockMap, mock(ByteBuffer.class),
mockMap),
false, // unmanaged AM
true, // cancelTokens
1, // max app attempts
Resource.newInstance(1024, 1));
ClientRMService rmService = rm1.getClientRMService();
rmService.submitApplication(SubmitApplicationRequest.newInstance(asc));
for (int i = 0; i < 30; i++) {
if (HAServiceProtocol.HAServiceState.ACTIVE == if (HAServiceProtocol.HAServiceState.ACTIVE ==
rm1.getRMContext().getRMAdminService().getServiceStatus().getState()) { rm1.getRMContext().getRMAdminService().getServiceStatus().getState()) {
Thread.sleep(100); Thread.sleep(100);