YARN-6093. Minor bugs with AMRMtoken renewal and state store availability when using FederationRMFailoverProxyProvider during RM failover. (Botong Huang via Subru).

(cherry picked from commit 66500f4fa6155d29435d7c92fd6d68079c4cab86)
This commit is contained in:
Subru Krishnan 2017-02-22 13:16:22 -08:00 committed by Carlo Curino
parent 91803305e5
commit 98b45b0ed3
2 changed files with 118 additions and 39 deletions

View File

@ -19,17 +19,21 @@
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.security.PrivilegedExceptionAction;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.io.retry.FailoverProxyProvider.ProxyInfo;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.MiniYARNCluster; import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil; import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil;
import org.apache.hadoop.yarn.server.federation.failover.FederationRMFailoverProxyProvider;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore; import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
@ -44,6 +48,10 @@
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/** /**
* Unit tests for FederationRMFailoverProxyProvider. * Unit tests for FederationRMFailoverProxyProvider.
*/ */
@ -151,4 +159,65 @@ private void makeRMActive(final SubClusterId subClusterId,
} }
} }
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void testUGIForProxyCreation()
throws IOException, InterruptedException {
conf.set(YarnConfiguration.RM_CLUSTER_ID, "cluster1");
UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
UserGroupInformation user1 =
UserGroupInformation.createProxyUser("user1", currentUser);
UserGroupInformation user2 =
UserGroupInformation.createProxyUser("user2", currentUser);
final TestableFederationRMFailoverProxyProvider provider =
new TestableFederationRMFailoverProxyProvider();
InetSocketAddress addr =
conf.getSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
final ClientRMProxy rmProxy = mock(ClientRMProxy.class);
when(rmProxy.getRMAddress(any(YarnConfiguration.class), any(Class.class)))
.thenReturn(addr);
user1.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() {
provider.init(conf, rmProxy, ApplicationMasterProtocol.class);
return null;
}
});
final ProxyInfo currentProxy = provider.getProxy();
Assert.assertEquals("user1", provider.getLastProxyUGI().getUserName());
user2.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() {
provider.performFailover(currentProxy.proxy);
return null;
}
});
Assert.assertEquals("user1", provider.getLastProxyUGI().getUserName());
provider.close();
}
protected static class TestableFederationRMFailoverProxyProvider<T>
extends FederationRMFailoverProxyProvider<T> {
private UserGroupInformation lastProxyUGI = null;
@Override
protected T createRMProxy(InetSocketAddress rmAddress) throws IOException {
lastProxyUGI = UserGroupInformation.getCurrentUser();
return super.createRMProxy(rmAddress);
}
public UserGroupInformation getLastProxyUGI() {
return lastProxyUGI;
}
}
} }

View File

@ -21,7 +21,7 @@
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.Collection; import java.security.PrivilegedExceptionAction;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
@ -29,14 +29,12 @@
import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.client.RMFailoverProxyProvider; import org.apache.hadoop.yarn.client.RMFailoverProxyProvider;
import org.apache.hadoop.yarn.client.RMProxy; import org.apache.hadoop.yarn.client.RMProxy;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol; import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
@ -44,6 +42,7 @@
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
/** /**
@ -64,7 +63,7 @@ public class FederationRMFailoverProxyProvider<T>
private YarnConfiguration conf; private YarnConfiguration conf;
private FederationStateStoreFacade facade; private FederationStateStoreFacade facade;
private SubClusterId subClusterId; private SubClusterId subClusterId;
private Collection<Token<? extends TokenIdentifier>> originalTokens; private UserGroupInformation originalUser;
private boolean federationFailoverEnabled = false; private boolean federationFailoverEnabled = false;
@Override @Override
@ -97,59 +96,67 @@ public void init(Configuration configuration, RMProxy<T> proxy,
YarnConfiguration.DEFAULT_CLIENT_FAILOVER_RETRIES_ON_SOCKET_TIMEOUTS)); YarnConfiguration.DEFAULT_CLIENT_FAILOVER_RETRIES_ON_SOCKET_TIMEOUTS));
try { try {
UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); this.originalUser = UserGroupInformation.getCurrentUser();
originalTokens = currentUser.getTokens();
LOG.info("Initialized Federation proxy for user: {}", LOG.info("Initialized Federation proxy for user: {}",
currentUser.getUserName()); this.originalUser.getUserName());
} catch (IOException e) { } catch (IOException e) {
LOG.warn("Could not get information of requester, ignoring for now."); LOG.warn("Could not get information of requester, ignoring for now.");
this.originalUser = null;
} }
} }
private void addOriginalTokens(UserGroupInformation currentUser) { @VisibleForTesting
if (originalTokens == null || originalTokens.isEmpty()) { protected T createRMProxy(InetSocketAddress rmAddress) throws IOException {
return; return rmProxy.getProxy(conf, protocol, rmAddress);
}
for (Token<? extends TokenIdentifier> token : originalTokens) {
currentUser.addToken(token);
}
} }
private T getProxyInternal(boolean isFailover) { private T getProxyInternal(boolean isFailover) {
SubClusterInfo subClusterInfo; SubClusterInfo subClusterInfo;
UserGroupInformation currentUser = null; // Use the existing proxy as a backup in case getting the new proxy fails.
// Note that if the first time it fails, the backup is also null. In that
// case we will hit NullPointerException and throw it back to AM.
T proxy = this.current;
try { try {
LOG.info("Failing over to the ResourceManager for SubClusterId: {}", LOG.info("Failing over to the ResourceManager for SubClusterId: {}",
subClusterId); subClusterId);
subClusterInfo = facade.getSubCluster(subClusterId, isFailover); subClusterInfo = facade.getSubCluster(subClusterId, isFailover);
// updating the conf with the refreshed RM addresses as proxy // updating the conf with the refreshed RM addresses as proxy
// creations // creations are based out of conf
// are based out of conf
updateRMAddress(subClusterInfo); updateRMAddress(subClusterInfo);
currentUser = UserGroupInformation.getCurrentUser(); if (this.originalUser == null) {
addOriginalTokens(currentUser); InetSocketAddress rmAddress = rmProxy.getRMAddress(conf, protocol);
} catch (YarnException e) { LOG.info(
"Connecting to {} subClusterId {} with protocol {}"
+ " without a proxy user",
rmAddress, subClusterId, protocol.getSimpleName());
proxy = createRMProxy(rmAddress);
} else {
// If the original ugi exists, always use that to create proxy because
// it contains up-to-date AMRMToken
proxy = this.originalUser.doAs(new PrivilegedExceptionAction<T>() {
@Override
public T run() throws IOException {
InetSocketAddress rmAddress = rmProxy.getRMAddress(conf, protocol);
LOG.info(
"Connecting to {} subClusterId {} with protocol {} as user {}",
rmAddress, subClusterId, protocol.getSimpleName(),
originalUser);
return createRMProxy(rmAddress);
}
});
}
} catch (Exception e) {
LOG.error("Exception while trying to create proxy to the ResourceManager" LOG.error("Exception while trying to create proxy to the ResourceManager"
+ " for SubClusterId: {}", subClusterId, e); + " for SubClusterId: {}", subClusterId, e);
return null; if (proxy == null) {
} catch (IOException e) { throw new YarnRuntimeException(
LOG.warn("Could not get information of requester, ignoring for now."); String.format("Create initial proxy to the ResourceManager for"
} + " SubClusterId %s failed", subClusterId),
try { e);
final InetSocketAddress rmAddress = rmProxy.getRMAddress(conf, protocol); }
LOG.info("Connecting to {} with protocol {} as user: {}", rmAddress,
protocol.getSimpleName(), currentUser);
LOG.info("Failed over to the RM at {} for SubClusterId: {}", rmAddress,
subClusterId);
return rmProxy.getProxy(conf, protocol, rmAddress);
} catch (IOException ioe) {
LOG.error(
"IOException while trying to create proxy to the ResourceManager"
+ " for SubClusterId: {}",
subClusterId, ioe);
return null;
} }
return proxy;
} }
private void updateRMAddress(SubClusterInfo subClusterInfo) { private void updateRMAddress(SubClusterInfo subClusterInfo) {
@ -177,8 +184,11 @@ public synchronized ProxyInfo<T> getProxy() {
@Override @Override
public synchronized void performFailover(T currentProxy) { public synchronized void performFailover(T currentProxy) {
closeInternal(currentProxy); // It will not return null proxy here
current = getProxyInternal(federationFailoverEnabled); current = getProxyInternal(federationFailoverEnabled);
if (current != currentProxy) {
closeInternal(currentProxy);
}
} }
@Override @Override