YARN-11515. [Federation] Improve DefaultRequestInterceptor#init Code. (#5752)
This commit is contained in:
parent
8b88e9f8f4
commit
680af87377
@ -98,6 +98,8 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
|
||||
import org.apache.hadoop.classification.VisibleForTesting;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Extends the {@code AbstractRequestInterceptorClient} class and provides an
|
||||
@ -107,25 +109,26 @@ import org.apache.hadoop.classification.VisibleForTesting;
|
||||
*/
|
||||
public class DefaultClientRequestInterceptor
|
||||
extends AbstractClientRequestInterceptor {
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(DefaultClientRequestInterceptor.class);
|
||||
private ApplicationClientProtocol clientRMProxy;
|
||||
|
||||
@Override
|
||||
public void init(String userName) {
|
||||
super.init(userName);
|
||||
|
||||
final Configuration conf = this.getConf();
|
||||
try {
|
||||
clientRMProxy =
|
||||
user.doAs(new PrivilegedExceptionAction<ApplicationClientProtocol>() {
|
||||
@Override
|
||||
public ApplicationClientProtocol run() throws Exception {
|
||||
return ClientRMProxy.createRMProxy(conf,
|
||||
ApplicationClientProtocol.class);
|
||||
}
|
||||
});
|
||||
final Configuration conf = this.getConf();
|
||||
clientRMProxy = user.doAs(
|
||||
(PrivilegedExceptionAction<ApplicationClientProtocol>) () ->
|
||||
ClientRMProxy.createRMProxy(conf, ApplicationClientProtocol.class));
|
||||
} catch (Exception e) {
|
||||
throw new YarnRuntimeException(
|
||||
"Unable to create the interface to reach the YarnRM", e);
|
||||
StringBuilder message = new StringBuilder();
|
||||
message.append("Error while creating Router RMClient Service");
|
||||
if (user != null) {
|
||||
message.append(", user: " + user);
|
||||
}
|
||||
LOG.error(message.toString(), e);
|
||||
throw new YarnRuntimeException(message.toString(), e);
|
||||
}
|
||||
}
|
||||
|
||||
@ -355,6 +358,5 @@ public class DefaultClientRequestInterceptor
|
||||
@VisibleForTesting
|
||||
public void setRMClient(ApplicationClientProtocol clientRM) {
|
||||
this.clientRMProxy = clientRM;
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -80,38 +80,18 @@ public class DefaultRMAdminRequestInterceptor
|
||||
public void init(String userName) {
|
||||
super.init(userName);
|
||||
try {
|
||||
// Do not create a proxy user if user name matches the user name on
|
||||
// current UGI
|
||||
if (UserGroupInformation.isSecurityEnabled()) {
|
||||
user = UserGroupInformation.createProxyUser(userName, UserGroupInformation.getLoginUser());
|
||||
} else if (userName.equalsIgnoreCase(UserGroupInformation.getCurrentUser().getUserName())) {
|
||||
user = UserGroupInformation.getCurrentUser();
|
||||
} else {
|
||||
user = UserGroupInformation.createProxyUser(userName,
|
||||
UserGroupInformation.getCurrentUser());
|
||||
}
|
||||
|
||||
final Configuration conf = this.getConf();
|
||||
|
||||
rmAdminProxy = user.doAs(
|
||||
new PrivilegedExceptionAction<ResourceManagerAdministrationProtocol>() {
|
||||
@Override
|
||||
public ResourceManagerAdministrationProtocol run()
|
||||
throws Exception {
|
||||
return ClientRMProxy.createRMProxy(conf,
|
||||
ResourceManagerAdministrationProtocol.class);
|
||||
}
|
||||
});
|
||||
} catch (IOException e) {
|
||||
String message = "Error while creating Router RMAdmin Service for user:";
|
||||
if (user != null) {
|
||||
message += ", user: " + user;
|
||||
}
|
||||
|
||||
LOG.info(message);
|
||||
throw new YarnRuntimeException(message, e);
|
||||
(PrivilegedExceptionAction<ResourceManagerAdministrationProtocol>) () ->
|
||||
ClientRMProxy.createRMProxy(conf, ResourceManagerAdministrationProtocol.class));
|
||||
} catch (Exception e) {
|
||||
throw new YarnRuntimeException(e);
|
||||
StringBuilder message = new StringBuilder();
|
||||
message.append("Error while creating Router RMAdmin Service");
|
||||
if (user != null) {
|
||||
message.append(", user: " + user);
|
||||
}
|
||||
LOG.error(message.toString(), e);
|
||||
throw new YarnRuntimeException(message.toString(), e);
|
||||
}
|
||||
}
|
||||
|
||||
@ -126,7 +106,6 @@ public class DefaultRMAdminRequestInterceptor
|
||||
@VisibleForTesting
|
||||
public void setRMAdmin(ResourceManagerAdministrationProtocol rmAdmin) {
|
||||
this.rmAdminProxy = rmAdmin;
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -103,6 +103,7 @@ public class DefaultRequestInterceptorREST
|
||||
|
||||
@Override
|
||||
public void init(String user) {
|
||||
super.init(user);
|
||||
webAppAddress = WebAppUtils.getRMWebAppURLWithScheme(getConf());
|
||||
client = RouterWebServiceUtil.createJerseyClient(getConf());
|
||||
}
|
||||
|
@ -211,7 +211,7 @@ public class MockDefaultRequestInterceptorREST
|
||||
validateRunning();
|
||||
|
||||
ApplicationId applicationId =
|
||||
ApplicationId.newInstance(Integer.valueOf(getSubClusterId().getId()),
|
||||
ApplicationId.newInstance(Integer.parseInt(getSubClusterId().getId()),
|
||||
applicationCounter.incrementAndGet());
|
||||
NewApplication appId =
|
||||
new NewApplication(applicationId.toString(), new ResourceInfo());
|
||||
@ -275,7 +275,7 @@ public class MockDefaultRequestInterceptorREST
|
||||
AppInfo appInfo = new AppInfo();
|
||||
|
||||
appInfo.setAppId(
|
||||
ApplicationId.newInstance(Integer.valueOf(getSubClusterId().getId()),
|
||||
ApplicationId.newInstance(Integer.parseInt(getSubClusterId().getId()),
|
||||
applicationCounter.incrementAndGet()).toString());
|
||||
appInfo.setAMHostHttpAddress("http://i_am_the_AM:1234");
|
||||
|
||||
@ -316,7 +316,7 @@ public class MockDefaultRequestInterceptorREST
|
||||
if (nodeId.contains(subClusterId) || nodeId.contains("test")) {
|
||||
node = new NodeInfo();
|
||||
node.setId(nodeId);
|
||||
node.setLastHealthUpdate(Integer.valueOf(getSubClusterId().getId()));
|
||||
node.setLastHealthUpdate(Integer.parseInt(getSubClusterId().getId()));
|
||||
}
|
||||
return node;
|
||||
}
|
||||
@ -328,7 +328,7 @@ public class MockDefaultRequestInterceptorREST
|
||||
}
|
||||
NodeInfo node = new NodeInfo();
|
||||
node.setId("Node " + Integer.valueOf(getSubClusterId().getId()));
|
||||
node.setLastHealthUpdate(Integer.valueOf(getSubClusterId().getId()));
|
||||
node.setLastHealthUpdate(Integer.parseInt(getSubClusterId().getId()));
|
||||
NodesInfo nodes = new NodesInfo();
|
||||
nodes.add(node);
|
||||
return nodes;
|
||||
@ -350,12 +350,12 @@ public class MockDefaultRequestInterceptorREST
|
||||
throw new RuntimeException("RM is stopped");
|
||||
}
|
||||
ClusterMetricsInfo metrics = new ClusterMetricsInfo();
|
||||
metrics.setAppsSubmitted(Integer.valueOf(getSubClusterId().getId()));
|
||||
metrics.setAppsCompleted(Integer.valueOf(getSubClusterId().getId()));
|
||||
metrics.setAppsPending(Integer.valueOf(getSubClusterId().getId()));
|
||||
metrics.setAppsRunning(Integer.valueOf(getSubClusterId().getId()));
|
||||
metrics.setAppsFailed(Integer.valueOf(getSubClusterId().getId()));
|
||||
metrics.setAppsKilled(Integer.valueOf(getSubClusterId().getId()));
|
||||
metrics.setAppsSubmitted(Integer.parseInt(getSubClusterId().getId()));
|
||||
metrics.setAppsCompleted(Integer.parseInt(getSubClusterId().getId()));
|
||||
metrics.setAppsPending(Integer.parseInt(getSubClusterId().getId()));
|
||||
metrics.setAppsRunning(Integer.parseInt(getSubClusterId().getId()));
|
||||
metrics.setAppsFailed(Integer.parseInt(getSubClusterId().getId()));
|
||||
metrics.setAppsKilled(Integer.parseInt(getSubClusterId().getId()));
|
||||
|
||||
return metrics;
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user