YARN-1866. Fixed an issue with renewal of RM-delegation tokens on restart or fail-over. Contributed by Jian He.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1581448 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2014-03-25 18:21:55 +00:00
parent 274d9b2ac4
commit d14eff7d38
5 changed files with 54 additions and 29 deletions

View File

@ -571,6 +571,9 @@ Release 2.4.0 - UNRELEASED
YARN-1852. Fixed RMAppAttempt to not resend AttemptFailed/AttemptKilled YARN-1852. Fixed RMAppAttempt to not resend AttemptFailed/AttemptKilled
events to already recovered Failed/Killed RMApps. (Rohith via jianhe) events to already recovered Failed/Killed RMApps. (Rohith via jianhe)
YARN-1866. Fixed an issue with renewal of RM-delegation tokens on restart or
fail-over. (Jian He via vinodkv)
Release 2.3.1 - UNRELEASED Release 2.3.1 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -112,6 +112,8 @@ protected synchronized void serviceInit(Configuration conf) throws Exception {
this.tokenRemovalDelayMs = this.tokenRemovalDelayMs =
conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS); YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
setLocalSecretManagerAndServiceAddr();
renewerService = createNewThreadPoolService(conf); renewerService = createNewThreadPoolService(conf);
pendingEventQueue = new LinkedBlockingQueue<DelegationTokenRenewerEvent>(); pendingEventQueue = new LinkedBlockingQueue<DelegationTokenRenewerEvent>();
renewalTimer = new Timer(true); renewalTimer = new Timer(true);
@ -134,6 +136,13 @@ protected ThreadPoolExecutor createNewThreadPoolService(Configuration conf) {
return pool; return pool;
} }
// enable RM to short-circuit token operations directly to itself
private void setLocalSecretManagerAndServiceAddr() {
RMDelegationTokenIdentifier.Renewer.setSecretManager(rmContext
.getRMDelegationTokenSecretManager(), rmContext.getClientRMService()
.getBindAddress());
}
@Override @Override
protected void serviceStart() throws Exception { protected void serviceStart() throws Exception {
dtCancelThread.start(); dtCancelThread.start();
@ -143,10 +152,8 @@ protected void serviceStart() throws Exception {
"DelayedTokenCanceller"); "DelayedTokenCanceller");
delayedRemovalThread.start(); delayedRemovalThread.start();
} }
// enable RM to short-circuit token operations directly to itself
RMDelegationTokenIdentifier.Renewer.setSecretManager( setLocalSecretManagerAndServiceAddr();
rmContext.getRMDelegationTokenSecretManager(),
rmContext.getClientRMService().getBindAddress());
serviceStateLock.writeLock().lock(); serviceStateLock.writeLock().lock();
isServiceStarted = true; isServiceStarted = true;
serviceStateLock.writeLock().unlock(); serviceStateLock.writeLock().unlock();

View File

@ -394,7 +394,7 @@ public void testRMRestart() throws Exception {
Assert.assertEquals(4, rmAppState.size()); Assert.assertEquals(4, rmAppState.size());
} }
@Test @Test (timeout = 60000)
public void testRMRestartAppRunningAMFailed() throws Exception { public void testRMRestartAppRunningAMFailed() throws Exception {
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
@ -440,7 +440,7 @@ public void testRMRestartAppRunningAMFailed() throws Exception {
rm2.stop(); rm2.stop();
} }
@Test @Test (timeout = 60000)
public void testRMRestartWaitForPreviousAMToFinish() throws Exception { public void testRMRestartWaitForPreviousAMToFinish() throws Exception {
// testing 3 cases // testing 3 cases
// After RM restarts // After RM restarts
@ -607,7 +607,7 @@ public void testRMRestartWaitForPreviousAMToFinish() throws Exception {
// store but before the RMAppAttempt notifies RMApp that it has succeeded. On // store but before the RMAppAttempt notifies RMApp that it has succeeded. On
// recovery, RMAppAttempt should send the AttemptFinished event to RMApp so // recovery, RMAppAttempt should send the AttemptFinished event to RMApp so
// that RMApp can recover its state. // that RMApp can recover its state.
@Test @Test (timeout = 60000)
public void testRMRestartWaitForPreviousSucceededAttempt() throws Exception { public void testRMRestartWaitForPreviousSucceededAttempt() throws Exception {
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
MemoryRMStateStore memStore = new MemoryRMStateStore() { MemoryRMStateStore memStore = new MemoryRMStateStore() {
@ -660,7 +660,7 @@ public void updateApplicationStateInternal(ApplicationId appId,
rmAppState.get(app0.getApplicationId()).getState()); rmAppState.get(app0.getApplicationId()).getState());
} }
@Test @Test (timeout = 60000)
public void testRMRestartFailedApp() throws Exception { public void testRMRestartFailedApp() throws Exception {
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
MemoryRMStateStore memStore = new MemoryRMStateStore(); MemoryRMStateStore memStore = new MemoryRMStateStore();
@ -709,7 +709,7 @@ public void testRMRestartFailedApp() throws Exception {
rm2.stop(); rm2.stop();
} }
@Test @Test (timeout = 60000)
public void testRMRestartKilledApp() throws Exception{ public void testRMRestartKilledApp() throws Exception{
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
@ -757,7 +757,7 @@ public void testRMRestartKilledApp() throws Exception{
rm2.stop(); rm2.stop();
} }
@Test @Test (timeout = 60000)
public void testRMRestartKilledAppWithNoAttempts() throws Exception { public void testRMRestartKilledAppWithNoAttempts() throws Exception {
MemoryRMStateStore memStore = new MemoryRMStateStore() { MemoryRMStateStore memStore = new MemoryRMStateStore() {
@Override @Override
@ -797,7 +797,7 @@ public synchronized void updateApplicationAttemptStateInternal(
Assert.assertTrue(loadedApp0.getAppAttempts().size() == 0); Assert.assertTrue(loadedApp0.getAppAttempts().size() == 0);
} }
@Test @Test (timeout = 60000)
public void testRMRestartSucceededApp() throws Exception { public void testRMRestartSucceededApp() throws Exception {
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
@ -849,7 +849,7 @@ public void testRMRestartSucceededApp() throws Exception {
rm2.stop(); rm2.stop();
} }
@Test @Test (timeout = 60000)
public void testRMRestartGetApplicationList() throws Exception { public void testRMRestartGetApplicationList() throws Exception {
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
MemoryRMStateStore memStore = new MemoryRMStateStore(); MemoryRMStateStore memStore = new MemoryRMStateStore();
@ -997,7 +997,7 @@ private void finishApplicationMaster(RMApp rmApp, MockRM rm, MockNM nm,
appState.getAttempt(am.getApplicationAttemptId()).getState()); appState.getAttempt(am.getApplicationAttemptId()).getState());
} }
@Test @Test (timeout = 60000)
public void testRMRestartOnMaxAppAttempts() throws Exception { public void testRMRestartOnMaxAppAttempts() throws Exception {
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
@ -1071,7 +1071,7 @@ public void testRMRestartOnMaxAppAttempts() throws Exception {
rm2.stop(); rm2.stop();
} }
@Test @Test (timeout = 60000)
public void testDelegationTokenRestoredInDelegationTokenRenewer() public void testDelegationTokenRestoredInDelegationTokenRenewer()
throws Exception { throws Exception {
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
@ -1171,7 +1171,7 @@ private void waitForTokensToBeRenewed(MockRM rm2) throws Exception {
} }
} }
@Test @Test (timeout = 60000)
public void testAppAttemptTokensRestoredOnRMRestart() throws Exception { public void testAppAttemptTokensRestoredOnRMRestart() throws Exception {
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
@ -1261,7 +1261,7 @@ public void testAppAttemptTokensRestoredOnRMRestart() throws Exception {
rm2.stop(); rm2.stop();
} }
@Test @Test (timeout = 60000)
public void testRMDelegationTokenRestoredOnRMRestart() throws Exception { public void testRMDelegationTokenRestoredOnRMRestart() throws Exception {
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
conf.set( conf.set(
@ -1414,7 +1414,7 @@ public void testRMDelegationTokenRestoredOnRMRestart() throws Exception {
// This is to test submit an application to the new RM with the old delegation // This is to test submit an application to the new RM with the old delegation
// token got from previous RM. // token got from previous RM.
@Test @Test (timeout = 60000)
public void testAppSubmissionWithOldDelegationTokenAfterRMRestart() public void testAppSubmissionWithOldDelegationTokenAfterRMRestart()
throws Exception { throws Exception {
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
@ -1449,7 +1449,7 @@ public void testAppSubmissionWithOldDelegationTokenAfterRMRestart()
rm2.waitForState(app.getApplicationId(), RMAppState.ACCEPTED); rm2.waitForState(app.getApplicationId(), RMAppState.ACCEPTED);
} }
@Test @Test (timeout = 60000)
public void testRMStateStoreDispatcherDrainedOnRMStop() throws Exception { public void testRMStateStoreDispatcherDrainedOnRMStop() throws Exception {
MemoryRMStateStore memStore = new MemoryRMStateStore() { MemoryRMStateStore memStore = new MemoryRMStateStore() {
volatile boolean wait = true; volatile boolean wait = true;
@ -1508,7 +1508,7 @@ protected void handleStoreEvent(RMStateStoreEvent event) {
Assert.assertTrue(rmAppState.size() == NUM_APPS); Assert.assertTrue(rmAppState.size() == NUM_APPS);
} }
@Test @Test (timeout = 60000)
public void testFinishedAppRemovalAfterRMRestart() throws Exception { public void testFinishedAppRemovalAfterRMRestart() throws Exception {
MemoryRMStateStore memStore = new MemoryRMStateStore(); MemoryRMStateStore memStore = new MemoryRMStateStore();
conf.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 1); conf.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 1);
@ -1580,7 +1580,7 @@ public synchronized void checkVersion()
// This is to test Killing application should be able to wait until app // This is to test Killing application should be able to wait until app
// reaches killed state and also check that attempt state is saved before app // reaches killed state and also check that attempt state is saved before app
// state is saved. // state is saved.
@Test @Test (timeout = 60000)
public void testClientRetryOnKillingApplication() throws Exception { public void testClientRetryOnKillingApplication() throws Exception {
MemoryRMStateStore memStore = new TestMemoryRMStateStore(); MemoryRMStateStore memStore = new TestMemoryRMStateStore();
memStore.init(conf); memStore.init(conf);
@ -1738,7 +1738,7 @@ private void assertQueueMetrics(QueueMetrics qm, int appsSubmitted,
appsCompleted + appsCompletedCarryOn); appsCompleted + appsCompletedCarryOn);
} }
@Test @Test (timeout = 60000)
public void testDecomissionedNMsMetricsOnRMRestart() throws Exception { public void testDecomissionedNMsMetricsOnRMRestart() throws Exception {
YarnConfiguration conf = new YarnConfiguration(); YarnConfiguration conf = new YarnConfiguration();
conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
@ -1875,6 +1875,13 @@ public TestSecurityMockRM(Configuration conf, RMStateStore store) {
super(conf, store); super(conf, store);
} }
@Override
public void init(Configuration conf) {
// reset localServiceAddress.
RMDelegationTokenIdentifier.Renewer.setSecretManager(null, null);
super.init(conf);
}
@Override @Override
protected ClientRMService createClientRMService() { protected ClientRMService createClientRMService() {
return new ClientRMService(getRMContext(), getResourceScheduler(), return new ClientRMService(getRMContext(), getResourceScheduler(),

View File

@ -43,8 +43,6 @@
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Assert;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -82,6 +80,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.After; import org.junit.After;
import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
@ -179,7 +178,6 @@ public void setUp() throws Exception {
dispatcher = new AsyncDispatcher(eventQueue); dispatcher = new AsyncDispatcher(eventQueue);
Renewer.reset(); Renewer.reset();
delegationTokenRenewer = createNewDelegationTokenRenewer(conf, counter); delegationTokenRenewer = createNewDelegationTokenRenewer(conf, counter);
delegationTokenRenewer.init(conf);
RMContext mockContext = mock(RMContext.class); RMContext mockContext = mock(RMContext.class);
ClientRMService mockClientRMService = mock(ClientRMService.class); ClientRMService mockClientRMService = mock(ClientRMService.class);
when(mockContext.getDelegationTokenRenewer()).thenReturn( when(mockContext.getDelegationTokenRenewer()).thenReturn(
@ -190,6 +188,7 @@ public void setUp() throws Exception {
InetSocketAddress.createUnresolved("localhost", 1234); InetSocketAddress.createUnresolved("localhost", 1234);
when(mockClientRMService.getBindAddress()).thenReturn(sockAddr); when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
delegationTokenRenewer.setRMContext(mockContext); delegationTokenRenewer.setRMContext(mockContext);
delegationTokenRenewer.init(conf);
delegationTokenRenewer.start(); delegationTokenRenewer.start();
} }
@ -515,7 +514,6 @@ public void testDTKeepAlive1 () throws Exception {
1000l); 1000l);
DelegationTokenRenewer localDtr = DelegationTokenRenewer localDtr =
createNewDelegationTokenRenewer(lconf, counter); createNewDelegationTokenRenewer(lconf, counter);
localDtr.init(lconf);
RMContext mockContext = mock(RMContext.class); RMContext mockContext = mock(RMContext.class);
ClientRMService mockClientRMService = mock(ClientRMService.class); ClientRMService mockClientRMService = mock(ClientRMService.class);
when(mockContext.getClientRMService()).thenReturn(mockClientRMService); when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
@ -526,6 +524,7 @@ public void testDTKeepAlive1 () throws Exception {
InetSocketAddress.createUnresolved("localhost", 1234); InetSocketAddress.createUnresolved("localhost", 1234);
when(mockClientRMService.getBindAddress()).thenReturn(sockAddr); when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
localDtr.setRMContext(mockContext); localDtr.setRMContext(mockContext);
localDtr.init(lconf);
localDtr.start(); localDtr.start();
MyFS dfs = (MyFS)FileSystem.get(lconf); MyFS dfs = (MyFS)FileSystem.get(lconf);
@ -592,7 +591,6 @@ public void testDTKeepAlive2() throws Exception {
1000l); 1000l);
DelegationTokenRenewer localDtr = DelegationTokenRenewer localDtr =
createNewDelegationTokenRenewer(conf, counter); createNewDelegationTokenRenewer(conf, counter);
localDtr.init(lconf);
RMContext mockContext = mock(RMContext.class); RMContext mockContext = mock(RMContext.class);
ClientRMService mockClientRMService = mock(ClientRMService.class); ClientRMService mockClientRMService = mock(ClientRMService.class);
when(mockContext.getClientRMService()).thenReturn(mockClientRMService); when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
@ -603,6 +601,7 @@ public void testDTKeepAlive2() throws Exception {
InetSocketAddress.createUnresolved("localhost", 1234); InetSocketAddress.createUnresolved("localhost", 1234);
when(mockClientRMService.getBindAddress()).thenReturn(sockAddr); when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
localDtr.setRMContext(mockContext); localDtr.setRMContext(mockContext);
localDtr.init(lconf);
localDtr.start(); localDtr.start();
MyFS dfs = (MyFS)FileSystem.get(lconf); MyFS dfs = (MyFS)FileSystem.get(lconf);
@ -704,7 +703,6 @@ public Long answer(InvocationOnMock invocation)
// fire up the renewer // fire up the renewer
final DelegationTokenRenewer dtr = final DelegationTokenRenewer dtr =
createNewDelegationTokenRenewer(conf, counter); createNewDelegationTokenRenewer(conf, counter);
dtr.init(conf);
RMContext mockContext = mock(RMContext.class); RMContext mockContext = mock(RMContext.class);
ClientRMService mockClientRMService = mock(ClientRMService.class); ClientRMService mockClientRMService = mock(ClientRMService.class);
when(mockContext.getClientRMService()).thenReturn(mockClientRMService); when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
@ -713,6 +711,7 @@ public Long answer(InvocationOnMock invocation)
when(mockClientRMService.getBindAddress()).thenReturn(sockAddr); when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
dtr.setRMContext(mockContext); dtr.setRMContext(mockContext);
when(mockContext.getDelegationTokenRenewer()).thenReturn(dtr); when(mockContext.getDelegationTokenRenewer()).thenReturn(dtr);
dtr.init(conf);
dtr.start(); dtr.start();
// submit a job that blocks during renewal // submit a job that blocks during renewal
Thread submitThread = new Thread() { Thread submitThread = new Thread() {

View File

@ -19,7 +19,12 @@
package org.apache.hadoop.yarn.server.resourcemanager.security; package org.apache.hadoop.yarn.server.resourcemanager.security;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.junit.Test; import org.junit.Test;
/** /**
@ -32,9 +37,13 @@ public class TestDelegationTokenRenewerLifecycle {
@Test @Test
public void testStartupFailure() throws Exception { public void testStartupFailure() throws Exception {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
DelegationTokenRenewer delegationTokenRenewer = new DelegationTokenRenewer(); DelegationTokenRenewer delegationTokenRenewer =
new DelegationTokenRenewer();
RMContext mockContext = mock(RMContext.class);
ClientRMService mockClientRMService = mock(ClientRMService.class);
when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
delegationTokenRenewer.setRMContext(mockContext);
delegationTokenRenewer.init(conf); delegationTokenRenewer.init(conf);
delegationTokenRenewer.stop(); delegationTokenRenewer.stop();
} }
} }