YARN-10141.Interceptor in FederationInterceptorREST doesnt update on RM switchover. Contributed by D M Murali Krishna Reddy.
This commit is contained in:
parent
352a4ec16d
commit
3a9ccf7f6d
@ -81,6 +81,10 @@ public void setWebAppAddress(String webAppAddress) {
|
|||||||
this.webAppAddress = webAppAddress;
|
this.webAppAddress = webAppAddress;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected String getWebAppAddress() {
|
||||||
|
return this.webAppAddress;
|
||||||
|
}
|
||||||
|
|
||||||
protected void setSubClusterId(SubClusterId scId) {
|
protected void setSubClusterId(SubClusterId scId) {
|
||||||
this.subClusterId = scId;
|
this.subClusterId = scId;
|
||||||
}
|
}
|
||||||
|
@ -97,6 +97,7 @@
|
|||||||
import org.apache.hadoop.yarn.util.Clock;
|
import org.apache.hadoop.yarn.util.Clock;
|
||||||
import org.apache.hadoop.yarn.util.MonotonicClock;
|
import org.apache.hadoop.yarn.util.MonotonicClock;
|
||||||
import org.apache.hadoop.yarn.webapp.NotFoundException;
|
import org.apache.hadoop.yarn.webapp.NotFoundException;
|
||||||
|
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
@ -237,7 +238,10 @@ protected DefaultRequestInterceptorREST getOrCreateInterceptorForSubCluster(
|
|||||||
SubClusterId subClusterId, String webAppAddress) {
|
SubClusterId subClusterId, String webAppAddress) {
|
||||||
DefaultRequestInterceptorREST interceptor =
|
DefaultRequestInterceptorREST interceptor =
|
||||||
getInterceptorForSubCluster(subClusterId);
|
getInterceptorForSubCluster(subClusterId);
|
||||||
if (interceptor == null) {
|
String webAppAddresswithScheme = WebAppUtils.getHttpSchemePrefix(
|
||||||
|
this.getConf()) + webAppAddress;
|
||||||
|
if (interceptor == null || !webAppAddresswithScheme.equals(interceptor.
|
||||||
|
getWebAppAddress())){
|
||||||
interceptor = createInterceptorForSubCluster(subClusterId, webAppAddress);
|
interceptor = createInterceptorForSubCluster(subClusterId, webAppAddress);
|
||||||
}
|
}
|
||||||
return interceptor;
|
return interceptor;
|
||||||
|
@ -32,6 +32,9 @@
|
|||||||
import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager;
|
import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager;
|
||||||
import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
|
import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
|
||||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
|
||||||
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
|
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
|
||||||
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreTestUtil;
|
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreTestUtil;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
|
||||||
@ -44,6 +47,7 @@
|
|||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceOptionInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceOptionInfo;
|
||||||
|
import org.apache.hadoop.yarn.util.MonotonicClock;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
@ -521,4 +525,39 @@ public void testGetApplicationStateWrongFormat()
|
|||||||
Assert.assertNull(response);
|
Assert.assertNull(response);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This test validates the creation of new interceptor in case of a
|
||||||
|
* RMSwitchover in a subCluster.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testRMSwitchoverOfOneSC() throws Exception {
|
||||||
|
SubClusterId subClusterId = SubClusterId.newInstance(Integer.toString(0));
|
||||||
|
|
||||||
|
interceptor.getClusterMetricsInfo();
|
||||||
|
Assert.assertEquals("http://1.2.3.4:4", interceptor
|
||||||
|
.getInterceptorForSubCluster(subClusterId).getWebAppAddress());
|
||||||
|
|
||||||
|
//Register the first subCluster with secondRM simulating RMSwitchover
|
||||||
|
registerSubClusterWithSwitchoverRM(subClusterId);
|
||||||
|
|
||||||
|
interceptor.getClusterMetricsInfo();
|
||||||
|
Assert.assertEquals("http://5.6.7.8:8", interceptor
|
||||||
|
.getInterceptorForSubCluster(subClusterId).getWebAppAddress());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void registerSubClusterWithSwitchoverRM(SubClusterId subClusterId)
|
||||||
|
throws YarnException {
|
||||||
|
String amRMAddress = "5.6.7.8:5";
|
||||||
|
String clientRMAddress = "5.6.7.8:6";
|
||||||
|
String rmAdminAddress = "5.6.7.8:7";
|
||||||
|
String webAppAddress = "5.6.7.8:8";
|
||||||
|
|
||||||
|
SubClusterInfo subClusterInfo = SubClusterInfo.newInstance(subClusterId,
|
||||||
|
amRMAddress, clientRMAddress, rmAdminAddress, webAppAddress,
|
||||||
|
SubClusterState.SC_RUNNING, new MonotonicClock().getTime(),
|
||||||
|
"capability");
|
||||||
|
stateStore.registerSubCluster(
|
||||||
|
SubClusterRegisterRequest.newInstance(subClusterInfo));
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -42,7 +42,7 @@ public class TestableFederationInterceptorREST
|
|||||||
protected void registerBadSubCluster(SubClusterId badSC) {
|
protected void registerBadSubCluster(SubClusterId badSC) {
|
||||||
|
|
||||||
// Adding in the cache the bad SubCluster, in this way we can stop them
|
// Adding in the cache the bad SubCluster, in this way we can stop them
|
||||||
getOrCreateInterceptorForSubCluster(badSC, "test");
|
getOrCreateInterceptorForSubCluster(badSC, "1.2.3.4:4");
|
||||||
|
|
||||||
badSubCluster.add(badSC);
|
badSubCluster.add(badSC);
|
||||||
MockDefaultRequestInterceptorREST interceptor =
|
MockDefaultRequestInterceptorREST interceptor =
|
||||||
|
Loading…
Reference in New Issue
Block a user