diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java index aa7a84b0c2..0ff4260c5e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java @@ -76,7 +76,7 @@ public class UnmanagedAMPoolManager extends AbstractService { private ExecutorService threadpool; - private String dispatcherThreadName = "UnmanagedAMPoolManager-Finish-Thread"; + private final String dispatcherThreadName = "UnmanagedAMPoolManager-Finish-Thread"; private Thread finishApplicationThread; @@ -138,7 +138,7 @@ public String createAndRegisterNewUAM( boolean keepContainersAcrossApplicationAttempts, String rmName, ApplicationSubmissionContext originalAppSubmissionContext) throws YarnException, IOException { - ApplicationId appId = null; + ApplicationId appId; ApplicationClientProtocol rmClient; try { UserGroupInformation appSubmitter = @@ -198,14 +198,16 @@ public Token launchUAM(String uamId, Configuration conf, if (this.unmanagedAppMasterMap.containsKey(uamId)) { throw new YarnException("UAM " + uamId + " already exists"); } + UnmanagedApplicationManager uam = createUAM(conf, appId, queueName, submitter, appNameSuffix, keepContainersAcrossApplicationAttempts, rmName, originalAppSubmissionContext); + // Put the UAM into map first before initializing it to avoid additional UAM // for the same uamId being created concurrently this.unmanagedAppMasterMap.put(uamId, uam); - Token amrmToken = null; + Token amrmToken; try { LOG.info("Launching UAM id {} for application {}", uamId, appId); amrmToken = uam.launchUAM(); @@ -390,7 +392,7 @@ public void shutDownConnections() throws YarnException { public Set getAllUAMIds() { // Return a clone of the current id set for concurrency reasons, so that the // returned map won't change with the actual map - return new HashSet(this.unmanagedAppMasterMap.keySet()); + return new HashSet<>(this.unmanagedAppMasterMap.keySet()); } /** @@ -439,7 +441,7 @@ public void drainUAMHeartbeats() { * * @param request FinishApplicationMasterRequest * @param appId application Id - * @return Returns the Map map, + * @return Returns the Map, * the key is subClusterId, the value is FinishApplicationMasterResponse */ public Map batchFinishApplicationMaster( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java index 3121e6d44d..56be136bda 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java @@ -42,6 +42,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -147,10 +148,8 @@ public UnmanagedApplicationManager(Configuration conf, ApplicationId appId, this.registerRequest = null; this.recordFactory = RecordFactoryProvider.getRecordFactory(conf); this.asyncApiPollIntervalMillis = conf.getLong( - YarnConfiguration. - YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS, - YarnConfiguration. - DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS); + YarnConfiguration.YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS, + YarnConfiguration.DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS); this.keepContainersAcrossApplicationAttempts = keepContainersAcrossApplicationAttempts; this.applicationSubmissionContext = originalApplicationSubmissionContext; @@ -175,8 +174,7 @@ public Token launchUAM() this.connectionInitiated = true; // Blocking call to RM - Token amrmToken = - initializeUnmanagedAM(this.applicationId); + Token amrmToken = initializeUnmanagedAM(this.applicationId); // Creates the UAM connection createUAMProxy(amrmToken); @@ -217,8 +215,8 @@ protected void createUAMProxy(Token amrmToken) * @throws IOException if register fails */ public RegisterApplicationMasterResponse registerApplicationMaster( - RegisterApplicationMasterRequest request) - throws YarnException, IOException { + RegisterApplicationMasterRequest request) throws YarnException, IOException { + // Save the register request for re-register later this.registerRequest = request; @@ -228,16 +226,17 @@ public RegisterApplicationMasterResponse registerApplicationMaster( this.rmProxyRelayer.registerApplicationMaster(this.registerRequest); this.heartbeatHandler.resetLastResponseId(); - for (Container container : response.getContainersFromPreviousAttempts()) { - LOG.debug("RegisterUAM returned existing running container {}", - container.getId()); + if (LOG.isDebugEnabled()) { + for (Container container : response.getContainersFromPreviousAttempts()) { + LOG.debug("RegisterUAM returned existing running container {}", container.getId()); + } + + for (NMToken nmToken : response.getNMTokensFromPreviousAttempts()) { + LOG.debug("RegisterUAM returned existing NM token for node {}", nmToken.getNodeId()); + } } - for (NMToken nmToken : response.getNMTokensFromPreviousAttempts()) { - LOG.debug("RegisterUAM returned existing NM token for node {}", - nmToken.getNodeId()); - } - LOG.info( - "RegisterUAM returned {} existing running container and {} NM tokens", + + LOG.info("RegisterUAM returned {} existing running container and {} NM tokens", response.getContainersFromPreviousAttempts().size(), response.getNMTokensFromPreviousAttempts().size()); @@ -257,8 +256,8 @@ public RegisterApplicationMasterResponse registerApplicationMaster( * @throws IOException if finishAM call fails */ public FinishApplicationMasterResponse finishApplicationMaster( - FinishApplicationMasterRequest request) - throws YarnException, IOException { + FinishApplicationMasterRequest request) throws YarnException, IOException { + if (this.userUgi == null) { if (this.connectionInitiated) { // This is possible if the async launchUAM is still @@ -322,8 +321,7 @@ public void allocateAsync(AllocateRequest request, LOG.info("Unmanaged AM still not successfully launched/registered yet." + " Saving the allocate request and send later."); } else { - throw new YarnException( - "AllocateAsync should not be called before launchUAM"); + throw new YarnException("AllocateAsync should not be called before launchUAM"); } } } @@ -358,7 +356,7 @@ public AMRMClientRelayer getAMRMClientRelayer() { * Returns RM proxy for the specified protocol type. Unit test cases can * override this method and return mock proxy instances. * - * @param protocol protocal of the proxy + * @param protocol protocol of the proxy * @param config configuration * @param user ugi for the proxy connection * @param token token for the connection @@ -411,8 +409,8 @@ protected Token initializeUnmanagedAM( } } - private void submitUnmanagedApp(ApplicationId appId) - throws YarnException, IOException { + private void submitUnmanagedApp(ApplicationId appId) throws YarnException, IOException { + SubmitApplicationRequest submitRequest = this.recordFactory.newRecordInstance(SubmitApplicationRequest.class); @@ -422,8 +420,7 @@ private void submitUnmanagedApp(ApplicationId appId) context.setApplicationId(appId); context.setApplicationName(APP_NAME + "-" + appNameSuffix); if (StringUtils.isBlank(this.queueName)) { - context.setQueue(this.conf.get(DEFAULT_QUEUE_CONFIG, - YarnConfiguration.DEFAULT_QUEUE_NAME)); + context.setQueue(this.conf.get(DEFAULT_QUEUE_CONFIG, YarnConfiguration.DEFAULT_QUEUE_NAME)); } else { context.setQueue(this.queueName); } @@ -467,8 +464,7 @@ private void submitUnmanagedApp(ApplicationId appId) * @throws IOException if getApplicationReport fails */ private ApplicationAttemptReport monitorCurrentAppAttempt(ApplicationId appId, - Set appStates, - YarnApplicationAttemptState attemptState) + Set appStates, YarnApplicationAttemptState attemptState) throws YarnException, IOException { long startTime = System.currentTimeMillis(); @@ -495,25 +491,26 @@ private ApplicationAttemptReport monitorCurrentAppAttempt(ApplicationId appId, } if (appAttemptId != null) { - GetApplicationAttemptReportRequest req = this.recordFactory - .newRecordInstance(GetApplicationAttemptReportRequest.class); + GetApplicationAttemptReportRequest req = + this.recordFactory.newRecordInstance(GetApplicationAttemptReportRequest.class); req.setApplicationAttemptId(appAttemptId); - ApplicationAttemptReport attemptReport = this.rmClient - .getApplicationAttemptReport(req).getApplicationAttemptReport(); - if (attemptState - .equals(attemptReport.getYarnApplicationAttemptState())) { + GetApplicationAttemptReportResponse appAttemptReport = + this.rmClient.getApplicationAttemptReport(req); + ApplicationAttemptReport attemptReport = appAttemptReport.getApplicationAttemptReport(); + YarnApplicationAttemptState appAttemptState = + attemptReport.getYarnApplicationAttemptState(); + if (attemptState.equals(appAttemptState)) { return attemptReport; } - LOG.info("Current attempt state of " + appAttemptId + " is " - + attemptReport.getYarnApplicationAttemptState() - + ", waiting for current attempt to reach " + attemptState); + LOG.info("Current attempt state of {} is {}, waiting for current attempt to reach {}.", + appAttemptId, appAttemptState, attemptState); } try { Thread.sleep(this.asyncApiPollIntervalMillis); } catch (InterruptedException e) { - LOG.warn("Interrupted while waiting for current attempt of " + appId - + " to reach " + attemptState); + LOG.warn("Interrupted while waiting for current attempt of {} to reach {}.", + appId, attemptState); } if (System.currentTimeMillis() - startTime > AM_STATE_WAIT_TIMEOUT_MS) { @@ -538,8 +535,7 @@ protected Token getUAMToken() if (amrmToken != null) { token = ConverterUtils.convertFromYarn(amrmToken, (Text) null); } else { - LOG.warn( - "AMRMToken not found in the application report for application: {}", + LOG.warn("AMRMToken not found in the application report for application: {}", this.applicationId); } return token; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java index 65266bf411..4a57e26cb8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java @@ -351,23 +351,15 @@ protected Token launchUAM( ApplicationAttemptId appAttemptId) throws IOException, InterruptedException { return getUGIWithToken(appAttemptId) - .doAs(new PrivilegedExceptionAction>() { - @Override - public Token run() throws Exception { - return uam.launchUAM(); - } - }); + .doAs((PrivilegedExceptionAction>) () -> uam.launchUAM()); } protected void reAttachUAM(final Token uamToken, ApplicationAttemptId appAttemptId) throws IOException, InterruptedException { - getUGIWithToken(appAttemptId).doAs(new PrivilegedExceptionAction() { - @Override - public Token run() throws Exception { - uam.reAttachUAM(uamToken); - return null; - } + getUGIWithToken(appAttemptId).doAs((PrivilegedExceptionAction) () -> { + uam.reAttachUAM(uamToken); + return null; }); } @@ -376,25 +368,16 @@ protected RegisterApplicationMasterResponse registerApplicationMaster( ApplicationAttemptId appAttemptId) throws YarnException, IOException, InterruptedException { return getUGIWithToken(appAttemptId).doAs( - new PrivilegedExceptionAction() { - @Override - public RegisterApplicationMasterResponse run() - throws YarnException, IOException { - return uam.registerApplicationMaster(request); - } - }); + (PrivilegedExceptionAction) + () -> uam.registerApplicationMaster(request)); } protected void allocateAsync(final AllocateRequest request, - final AsyncCallback callBack, - ApplicationAttemptId appAttemptId) + final AsyncCallback callBack, ApplicationAttemptId appAttemptId) throws YarnException, IOException, InterruptedException { - getUGIWithToken(appAttemptId).doAs(new PrivilegedExceptionAction() { - @Override - public Object run() throws YarnException { - uam.allocateAsync(request, callBack); - return null; - } + getUGIWithToken(appAttemptId).doAs((PrivilegedExceptionAction) () -> { + uam.allocateAsync(request, callBack); + return null; }); } @@ -402,16 +385,9 @@ protected FinishApplicationMasterResponse finishApplicationMaster( final FinishApplicationMasterRequest request, ApplicationAttemptId appAttemptId) throws YarnException, IOException, InterruptedException { - return getUGIWithToken(appAttemptId) - .doAs(new PrivilegedExceptionAction() { - @Override - public FinishApplicationMasterResponse run() - throws YarnException, IOException { - FinishApplicationMasterResponse response = - uam.finishApplicationMaster(request); - return response; - } - }); + return getUGIWithToken(appAttemptId).doAs( + (PrivilegedExceptionAction) () -> + uam.finishApplicationMaster(request)); } protected class CountingCallback implements AsyncCallback { @@ -497,14 +473,10 @@ public TestableAMRequestHandlerThread(Configuration conf, @Override public void run() { try { - getUGIWithToken(attemptId) - .doAs(new PrivilegedExceptionAction() { - @Override - public Object run() { - TestableAMRequestHandlerThread.super.run(); - return null; - } - }); + getUGIWithToken(attemptId).doAs((PrivilegedExceptionAction) () -> { + TestableAMRequestHandlerThread.super.run(); + return null; + }); } catch (Exception e) { LOG.error("Exception running TestableAMRequestHandlerThread", e); }