YARN-11508. [Minor] Improve UnmanagedAMPoolManager/UnmanagedApplicationManager Code (#5726)

This commit is contained in:
slfan1989 2023-07-28 20:20:23 +08:00 committed by GitHub
parent aae5527c9a
commit 5056c267e6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 61 additions and 91 deletions

View File

@ -76,7 +76,7 @@ public class UnmanagedAMPoolManager extends AbstractService {
private ExecutorService threadpool; private ExecutorService threadpool;
private String dispatcherThreadName = "UnmanagedAMPoolManager-Finish-Thread"; private final String dispatcherThreadName = "UnmanagedAMPoolManager-Finish-Thread";
private Thread finishApplicationThread; private Thread finishApplicationThread;
@ -138,7 +138,7 @@ public String createAndRegisterNewUAM(
boolean keepContainersAcrossApplicationAttempts, String rmName, boolean keepContainersAcrossApplicationAttempts, String rmName,
ApplicationSubmissionContext originalAppSubmissionContext) ApplicationSubmissionContext originalAppSubmissionContext)
throws YarnException, IOException { throws YarnException, IOException {
ApplicationId appId = null; ApplicationId appId;
ApplicationClientProtocol rmClient; ApplicationClientProtocol rmClient;
try { try {
UserGroupInformation appSubmitter = UserGroupInformation appSubmitter =
@ -198,14 +198,16 @@ public Token<AMRMTokenIdentifier> launchUAM(String uamId, Configuration conf,
if (this.unmanagedAppMasterMap.containsKey(uamId)) { if (this.unmanagedAppMasterMap.containsKey(uamId)) {
throw new YarnException("UAM " + uamId + " already exists"); throw new YarnException("UAM " + uamId + " already exists");
} }
UnmanagedApplicationManager uam = createUAM(conf, appId, queueName, UnmanagedApplicationManager uam = createUAM(conf, appId, queueName,
submitter, appNameSuffix, keepContainersAcrossApplicationAttempts, submitter, appNameSuffix, keepContainersAcrossApplicationAttempts,
rmName, originalAppSubmissionContext); rmName, originalAppSubmissionContext);
// Put the UAM into map first before initializing it to avoid additional UAM // Put the UAM into map first before initializing it to avoid additional UAM
// for the same uamId being created concurrently // for the same uamId being created concurrently
this.unmanagedAppMasterMap.put(uamId, uam); this.unmanagedAppMasterMap.put(uamId, uam);
Token<AMRMTokenIdentifier> amrmToken = null; Token<AMRMTokenIdentifier> amrmToken;
try { try {
LOG.info("Launching UAM id {} for application {}", uamId, appId); LOG.info("Launching UAM id {} for application {}", uamId, appId);
amrmToken = uam.launchUAM(); amrmToken = uam.launchUAM();
@ -390,7 +392,7 @@ public void shutDownConnections() throws YarnException {
public Set<String> getAllUAMIds() { public Set<String> getAllUAMIds() {
// Return a clone of the current id set for concurrency reasons, so that the // Return a clone of the current id set for concurrency reasons, so that the
// returned map won't change with the actual map // returned map won't change with the actual map
return new HashSet<String>(this.unmanagedAppMasterMap.keySet()); return new HashSet<>(this.unmanagedAppMasterMap.keySet());
} }
/** /**
@ -439,7 +441,7 @@ public void drainUAMHeartbeats() {
* *
* @param request FinishApplicationMasterRequest * @param request FinishApplicationMasterRequest
* @param appId application Id * @param appId application Id
* @return Returns the Map map, * @return Returns the Map,
* the key is subClusterId, the value is FinishApplicationMasterResponse * the key is subClusterId, the value is FinishApplicationMasterResponse
*/ */
public Map<String, FinishApplicationMasterResponse> batchFinishApplicationMaster( public Map<String, FinishApplicationMasterResponse> batchFinishApplicationMaster(

View File

@ -42,6 +42,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -147,10 +148,8 @@ public UnmanagedApplicationManager(Configuration conf, ApplicationId appId,
this.registerRequest = null; this.registerRequest = null;
this.recordFactory = RecordFactoryProvider.getRecordFactory(conf); this.recordFactory = RecordFactoryProvider.getRecordFactory(conf);
this.asyncApiPollIntervalMillis = conf.getLong( this.asyncApiPollIntervalMillis = conf.getLong(
YarnConfiguration. YarnConfiguration.YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS,
YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS, YarnConfiguration.DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS);
YarnConfiguration.
DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS);
this.keepContainersAcrossApplicationAttempts = this.keepContainersAcrossApplicationAttempts =
keepContainersAcrossApplicationAttempts; keepContainersAcrossApplicationAttempts;
this.applicationSubmissionContext = originalApplicationSubmissionContext; this.applicationSubmissionContext = originalApplicationSubmissionContext;
@ -175,8 +174,7 @@ public Token<AMRMTokenIdentifier> launchUAM()
this.connectionInitiated = true; this.connectionInitiated = true;
// Blocking call to RM // Blocking call to RM
Token<AMRMTokenIdentifier> amrmToken = Token<AMRMTokenIdentifier> amrmToken = initializeUnmanagedAM(this.applicationId);
initializeUnmanagedAM(this.applicationId);
// Creates the UAM connection // Creates the UAM connection
createUAMProxy(amrmToken); createUAMProxy(amrmToken);
@ -217,8 +215,8 @@ protected void createUAMProxy(Token<AMRMTokenIdentifier> amrmToken)
* @throws IOException if register fails * @throws IOException if register fails
*/ */
public RegisterApplicationMasterResponse registerApplicationMaster( public RegisterApplicationMasterResponse registerApplicationMaster(
RegisterApplicationMasterRequest request) RegisterApplicationMasterRequest request) throws YarnException, IOException {
throws YarnException, IOException {
// Save the register request for re-register later // Save the register request for re-register later
this.registerRequest = request; this.registerRequest = request;
@ -228,16 +226,17 @@ public RegisterApplicationMasterResponse registerApplicationMaster(
this.rmProxyRelayer.registerApplicationMaster(this.registerRequest); this.rmProxyRelayer.registerApplicationMaster(this.registerRequest);
this.heartbeatHandler.resetLastResponseId(); this.heartbeatHandler.resetLastResponseId();
for (Container container : response.getContainersFromPreviousAttempts()) { if (LOG.isDebugEnabled()) {
LOG.debug("RegisterUAM returned existing running container {}", for (Container container : response.getContainersFromPreviousAttempts()) {
container.getId()); LOG.debug("RegisterUAM returned existing running container {}", container.getId());
}
for (NMToken nmToken : response.getNMTokensFromPreviousAttempts()) {
LOG.debug("RegisterUAM returned existing NM token for node {}", nmToken.getNodeId());
}
} }
for (NMToken nmToken : response.getNMTokensFromPreviousAttempts()) {
LOG.debug("RegisterUAM returned existing NM token for node {}", LOG.info("RegisterUAM returned {} existing running container and {} NM tokens",
nmToken.getNodeId());
}
LOG.info(
"RegisterUAM returned {} existing running container and {} NM tokens",
response.getContainersFromPreviousAttempts().size(), response.getContainersFromPreviousAttempts().size(),
response.getNMTokensFromPreviousAttempts().size()); response.getNMTokensFromPreviousAttempts().size());
@ -257,8 +256,8 @@ public RegisterApplicationMasterResponse registerApplicationMaster(
* @throws IOException if finishAM call fails * @throws IOException if finishAM call fails
*/ */
public FinishApplicationMasterResponse finishApplicationMaster( public FinishApplicationMasterResponse finishApplicationMaster(
FinishApplicationMasterRequest request) FinishApplicationMasterRequest request) throws YarnException, IOException {
throws YarnException, IOException {
if (this.userUgi == null) { if (this.userUgi == null) {
if (this.connectionInitiated) { if (this.connectionInitiated) {
// This is possible if the async launchUAM is still // This is possible if the async launchUAM is still
@ -322,8 +321,7 @@ public void allocateAsync(AllocateRequest request,
LOG.info("Unmanaged AM still not successfully launched/registered yet." LOG.info("Unmanaged AM still not successfully launched/registered yet."
+ " Saving the allocate request and send later."); + " Saving the allocate request and send later.");
} else { } else {
throw new YarnException( throw new YarnException("AllocateAsync should not be called before launchUAM");
"AllocateAsync should not be called before launchUAM");
} }
} }
} }
@ -358,7 +356,7 @@ public AMRMClientRelayer getAMRMClientRelayer() {
* Returns RM proxy for the specified protocol type. Unit test cases can * Returns RM proxy for the specified protocol type. Unit test cases can
* override this method and return mock proxy instances. * override this method and return mock proxy instances.
* *
* @param protocol protocal of the proxy * @param protocol protocol of the proxy
* @param config configuration * @param config configuration
* @param user ugi for the proxy connection * @param user ugi for the proxy connection
* @param token token for the connection * @param token token for the connection
@ -411,8 +409,8 @@ protected Token<AMRMTokenIdentifier> initializeUnmanagedAM(
} }
} }
private void submitUnmanagedApp(ApplicationId appId) private void submitUnmanagedApp(ApplicationId appId) throws YarnException, IOException {
throws YarnException, IOException {
SubmitApplicationRequest submitRequest = SubmitApplicationRequest submitRequest =
this.recordFactory.newRecordInstance(SubmitApplicationRequest.class); this.recordFactory.newRecordInstance(SubmitApplicationRequest.class);
@ -422,8 +420,7 @@ private void submitUnmanagedApp(ApplicationId appId)
context.setApplicationId(appId); context.setApplicationId(appId);
context.setApplicationName(APP_NAME + "-" + appNameSuffix); context.setApplicationName(APP_NAME + "-" + appNameSuffix);
if (StringUtils.isBlank(this.queueName)) { if (StringUtils.isBlank(this.queueName)) {
context.setQueue(this.conf.get(DEFAULT_QUEUE_CONFIG, context.setQueue(this.conf.get(DEFAULT_QUEUE_CONFIG, YarnConfiguration.DEFAULT_QUEUE_NAME));
YarnConfiguration.DEFAULT_QUEUE_NAME));
} else { } else {
context.setQueue(this.queueName); context.setQueue(this.queueName);
} }
@ -467,8 +464,7 @@ private void submitUnmanagedApp(ApplicationId appId)
* @throws IOException if getApplicationReport fails * @throws IOException if getApplicationReport fails
*/ */
private ApplicationAttemptReport monitorCurrentAppAttempt(ApplicationId appId, private ApplicationAttemptReport monitorCurrentAppAttempt(ApplicationId appId,
Set<YarnApplicationState> appStates, Set<YarnApplicationState> appStates, YarnApplicationAttemptState attemptState)
YarnApplicationAttemptState attemptState)
throws YarnException, IOException { throws YarnException, IOException {
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
@ -495,25 +491,26 @@ private ApplicationAttemptReport monitorCurrentAppAttempt(ApplicationId appId,
} }
if (appAttemptId != null) { if (appAttemptId != null) {
GetApplicationAttemptReportRequest req = this.recordFactory GetApplicationAttemptReportRequest req =
.newRecordInstance(GetApplicationAttemptReportRequest.class); this.recordFactory.newRecordInstance(GetApplicationAttemptReportRequest.class);
req.setApplicationAttemptId(appAttemptId); req.setApplicationAttemptId(appAttemptId);
ApplicationAttemptReport attemptReport = this.rmClient GetApplicationAttemptReportResponse appAttemptReport =
.getApplicationAttemptReport(req).getApplicationAttemptReport(); this.rmClient.getApplicationAttemptReport(req);
if (attemptState ApplicationAttemptReport attemptReport = appAttemptReport.getApplicationAttemptReport();
.equals(attemptReport.getYarnApplicationAttemptState())) { YarnApplicationAttemptState appAttemptState =
attemptReport.getYarnApplicationAttemptState();
if (attemptState.equals(appAttemptState)) {
return attemptReport; return attemptReport;
} }
LOG.info("Current attempt state of " + appAttemptId + " is " LOG.info("Current attempt state of {} is {}, waiting for current attempt to reach {}.",
+ attemptReport.getYarnApplicationAttemptState() appAttemptId, appAttemptState, attemptState);
+ ", waiting for current attempt to reach " + attemptState);
} }
try { try {
Thread.sleep(this.asyncApiPollIntervalMillis); Thread.sleep(this.asyncApiPollIntervalMillis);
} catch (InterruptedException e) { } catch (InterruptedException e) {
LOG.warn("Interrupted while waiting for current attempt of " + appId LOG.warn("Interrupted while waiting for current attempt of {} to reach {}.",
+ " to reach " + attemptState); appId, attemptState);
} }
if (System.currentTimeMillis() - startTime > AM_STATE_WAIT_TIMEOUT_MS) { if (System.currentTimeMillis() - startTime > AM_STATE_WAIT_TIMEOUT_MS) {
@ -538,8 +535,7 @@ protected Token<AMRMTokenIdentifier> getUAMToken()
if (amrmToken != null) { if (amrmToken != null) {
token = ConverterUtils.convertFromYarn(amrmToken, (Text) null); token = ConverterUtils.convertFromYarn(amrmToken, (Text) null);
} else { } else {
LOG.warn( LOG.warn("AMRMToken not found in the application report for application: {}",
"AMRMToken not found in the application report for application: {}",
this.applicationId); this.applicationId);
} }
return token; return token;

View File

@ -351,23 +351,15 @@ protected Token<AMRMTokenIdentifier> launchUAM(
ApplicationAttemptId appAttemptId) ApplicationAttemptId appAttemptId)
throws IOException, InterruptedException { throws IOException, InterruptedException {
return getUGIWithToken(appAttemptId) return getUGIWithToken(appAttemptId)
.doAs(new PrivilegedExceptionAction<Token<AMRMTokenIdentifier>>() { .doAs((PrivilegedExceptionAction<Token<AMRMTokenIdentifier>>) () -> uam.launchUAM());
@Override
public Token<AMRMTokenIdentifier> run() throws Exception {
return uam.launchUAM();
}
});
} }
protected void reAttachUAM(final Token<AMRMTokenIdentifier> uamToken, protected void reAttachUAM(final Token<AMRMTokenIdentifier> uamToken,
ApplicationAttemptId appAttemptId) ApplicationAttemptId appAttemptId)
throws IOException, InterruptedException { throws IOException, InterruptedException {
getUGIWithToken(appAttemptId).doAs(new PrivilegedExceptionAction<Object>() { getUGIWithToken(appAttemptId).doAs((PrivilegedExceptionAction<Object>) () -> {
@Override uam.reAttachUAM(uamToken);
public Token<AMRMTokenIdentifier> run() throws Exception { return null;
uam.reAttachUAM(uamToken);
return null;
}
}); });
} }
@ -376,25 +368,16 @@ protected RegisterApplicationMasterResponse registerApplicationMaster(
ApplicationAttemptId appAttemptId) ApplicationAttemptId appAttemptId)
throws YarnException, IOException, InterruptedException { throws YarnException, IOException, InterruptedException {
return getUGIWithToken(appAttemptId).doAs( return getUGIWithToken(appAttemptId).doAs(
new PrivilegedExceptionAction<RegisterApplicationMasterResponse>() { (PrivilegedExceptionAction<RegisterApplicationMasterResponse>)
@Override () -> uam.registerApplicationMaster(request));
public RegisterApplicationMasterResponse run()
throws YarnException, IOException {
return uam.registerApplicationMaster(request);
}
});
} }
protected void allocateAsync(final AllocateRequest request, protected void allocateAsync(final AllocateRequest request,
final AsyncCallback<AllocateResponse> callBack, final AsyncCallback<AllocateResponse> callBack, ApplicationAttemptId appAttemptId)
ApplicationAttemptId appAttemptId)
throws YarnException, IOException, InterruptedException { throws YarnException, IOException, InterruptedException {
getUGIWithToken(appAttemptId).doAs(new PrivilegedExceptionAction<Object>() { getUGIWithToken(appAttemptId).doAs((PrivilegedExceptionAction<Object>) () -> {
@Override uam.allocateAsync(request, callBack);
public Object run() throws YarnException { return null;
uam.allocateAsync(request, callBack);
return null;
}
}); });
} }
@ -402,16 +385,9 @@ protected FinishApplicationMasterResponse finishApplicationMaster(
final FinishApplicationMasterRequest request, final FinishApplicationMasterRequest request,
ApplicationAttemptId appAttemptId) ApplicationAttemptId appAttemptId)
throws YarnException, IOException, InterruptedException { throws YarnException, IOException, InterruptedException {
return getUGIWithToken(appAttemptId) return getUGIWithToken(appAttemptId).doAs(
.doAs(new PrivilegedExceptionAction<FinishApplicationMasterResponse>() { (PrivilegedExceptionAction<FinishApplicationMasterResponse>) () ->
@Override uam.finishApplicationMaster(request));
public FinishApplicationMasterResponse run()
throws YarnException, IOException {
FinishApplicationMasterResponse response =
uam.finishApplicationMaster(request);
return response;
}
});
} }
protected class CountingCallback implements AsyncCallback<AllocateResponse> { protected class CountingCallback implements AsyncCallback<AllocateResponse> {
@ -497,14 +473,10 @@ public TestableAMRequestHandlerThread(Configuration conf,
@Override @Override
public void run() { public void run() {
try { try {
getUGIWithToken(attemptId) getUGIWithToken(attemptId).doAs((PrivilegedExceptionAction<Object>) () -> {
.doAs(new PrivilegedExceptionAction<Object>() { TestableAMRequestHandlerThread.super.run();
@Override return null;
public Object run() { });
TestableAMRequestHandlerThread.super.run();
return null;
}
});
} catch (Exception e) { } catch (Exception e) {
LOG.error("Exception running TestableAMRequestHandlerThread", e); LOG.error("Exception running TestableAMRequestHandlerThread", e);
} }