diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml index 309c028580..bad2bacb1a 100644 --- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml +++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml @@ -725,4 +725,13 @@ + + + + + + + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java index 9da8f31fe4..b95e2c3184 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java @@ -123,6 +123,8 @@ .EXIT_FALSE; import static org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes .EXIT_SUCCESS; +import static org.apache.hadoop.yarn.webapp.util.WebAppUtils.HTTPS_PREFIX; +import static org.apache.hadoop.yarn.webapp.util.WebAppUtils.HTTP_PREFIX; /** * @@ -153,10 +155,10 @@ public class ServiceScheduler extends CompositeService { private boolean timelineServiceEnabled; - // Global diagnostics that will be reported to RM on eRxit. + // Global diagnostics that will be reported to RM on exit. // The unit the number of characters. This will be limited to 64 * 1024 // characters. - private BoundedAppender diagnostics = new BoundedAppender(64 * 1024); + private final BoundedAppender diagnostics = new BoundedAppender(64 * 1024); // A cache for loading config files from remote such as hdfs public LoadingCache configFileCache = null; @@ -168,7 +170,7 @@ public class ServiceScheduler extends CompositeService { private NMClientAsync nmClient; private AsyncDispatcher dispatcher; private YarnRegistryViewForProviders yarnRegistryOperations; - private ServiceContext context; + private final ServiceContext context; private ContainerLaunchService containerLaunchService; private final Map unRecoveredInstances = new ConcurrentHashMap<>(); @@ -185,10 +187,10 @@ public class ServiceScheduler extends CompositeService { private volatile FinalApplicationStatus finalApplicationStatus = FinalApplicationStatus.ENDED; - private Clock systemClock; + private final Clock systemClock; // For unit test override since we don't want to terminate UT process. - private ServiceUtils.ProcessTerminationHandler + private final ServiceUtils.ProcessTerminationHandler terminationHandler = new ServiceUtils.ProcessTerminationHandler(); public ServiceScheduler(ServiceContext context) { @@ -199,10 +201,10 @@ public ServiceScheduler(ServiceContext context) { } public void buildInstance(ServiceContext context, Configuration configuration) - throws YarnException, IOException { + throws YarnException { app = context.service; executorService = Executors.newScheduledThreadPool(10); - RegistryOperations registryClient = null; + RegistryOperations registryClient; if (UserGroupInformation.isSecurityEnabled() && !StringUtils.isEmpty(context.principal) && !StringUtils.isEmpty(context.keytab)) { @@ -480,7 +482,7 @@ private void recoverComponents(RegisterApplicationMasterResponse response) { } }); - if (unRecoveredInstances.size() > 0) { + if (!unRecoveredInstances.isEmpty()) { executorService.schedule(() -> { synchronized (unRecoveredInstances) { // after containerRecoveryTimeout, all the containers that haven't be @@ -532,7 +534,8 @@ private void createConfigFileCache(final FileSystem fileSystem) { this.configFileCache = CacheBuilder.newBuilder().expireAfterAccess(10, TimeUnit.MINUTES) .build(new CacheLoader() { - @Override public Object load(ConfigFile key) throws Exception { + @Override + public Object load(ConfigFile key) throws Exception { switch (key.getType()) { case HADOOP_XML: try (FSDataInputStream input = fileSystem @@ -560,9 +563,8 @@ private void createConfigFileCache(final FileSystem fileSystem) { } private void registerServiceInstance(ApplicationAttemptId attemptId, - Service service) throws IOException { - LOG.info("Registering " + attemptId + ", " + service.getName() - + " into registry"); + Service service) { + LOG.info("Registering {}, {} into registry.", attemptId, service.getName()); ServiceRecord serviceRecord = new ServiceRecord(); serviceRecord.set(YarnRegistryAttributes.YARN_ID, attemptId.getApplicationId().toString()); @@ -570,24 +572,21 @@ private void registerServiceInstance(ApplicationAttemptId attemptId, PersistencePolicies.APPLICATION); serviceRecord.description = "YarnServiceMaster"; - executorService.submit(new Runnable() { - @Override public void run() { - try { - yarnRegistryOperations.registerSelf(serviceRecord, false); - LOG.info("Registered service under {}; absolute path {}", - yarnRegistryOperations.getSelfRegistrationPath(), - yarnRegistryOperations.getAbsoluteSelfRegistrationPath()); - boolean isFirstAttempt = 1 == attemptId.getAttemptId(); - // delete the children in case there are any and this is an AM startup. - // just to make sure everything underneath is purged - if (isFirstAttempt) { - yarnRegistryOperations.deleteChildren( - yarnRegistryOperations.getSelfRegistrationPath(), true); - } - } catch (IOException e) { - LOG.error( - "Failed to register app " + app.getName() + " in registry", e); + executorService.submit(() -> { + try { + yarnRegistryOperations.registerSelf(serviceRecord, false); + LOG.info("Registered service under {}; absolute path {}", + yarnRegistryOperations.getSelfRegistrationPath(), + yarnRegistryOperations.getAbsoluteSelfRegistrationPath()); + boolean isFirstAttempt = 1 == attemptId.getAttemptId(); + // delete the children in case there are any and this is an AM startup. + // just to make sure everything underneath is purged + if (isFirstAttempt) { + yarnRegistryOperations.deleteChildren( + yarnRegistryOperations.getSelfRegistrationPath(), true); } + } catch (IOException e) { + LOG.error("Failed to register app {} in registry.", app.getName(), e); } }); if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) { @@ -637,7 +636,7 @@ public void handle(ComponentEvent event) { Component component = componentsByName.get(event.getName()); if (component == null) { - LOG.error("No component exists for " + event.getName()); + LOG.error("No component exists for {}.", event.getName()); return; } try { @@ -657,14 +656,14 @@ public void handle(ComponentInstanceEvent event) { ComponentInstance instance = liveInstances.get(event.getContainerId()); if (instance == null) { - LOG.error("No component instance exists for " + event.getContainerId()); + LOG.error("No component instance exists for {}.", event.getContainerId()); return; } try { instance.handle(event); } catch (Throwable t) { - LOG.error(instance.getCompInstanceId() + - ": Error in handling event type " + event.getType(), t); + LOG.error("{} : Error in handling event type {}.", + instance.getCompInstanceId(), event.getType(), t); } } } @@ -673,7 +672,7 @@ class AMRMClientCallback extends AMRMClientAsync.AbstractCallbackHandler { @Override public void onContainersAllocated(List containers) { - LOG.info(containers.size() + " containers allocated. "); + LOG.info("{} containers allocated. ", containers.size()); for (Container container : containers) { Component comp = componentsById.get(container.getAllocationRequestId()); ComponentEvent event = @@ -684,8 +683,8 @@ public void onContainersAllocated(List containers) { Collection requests = amRMClient .getMatchingRequests(container.getAllocationRequestId()); LOG.info("[COMPONENT {}]: remove {} outstanding container requests " + - "for allocateId " + container.getAllocationRequestId(), - comp.getName(), requests.size()); + "for allocateId {}.", comp.getName(), requests.size(), + container.getAllocationRequestId()); // remove the corresponding request if (requests.iterator().hasNext()) { AMRMClient.ContainerRequest request = requests.iterator().next(); @@ -799,7 +798,7 @@ private class NMClientCallback extends NMClientAsync.AbstractCallbackHandler { Map allServiceResponse) { ComponentInstance instance = liveInstances.get(containerId); if (instance == null) { - LOG.error("No component instance exists for " + containerId); + LOG.error("No component instance exists for {}.", containerId); return; } ComponentEvent event = @@ -821,10 +820,10 @@ private class NMClientCallback extends NMClientAsync.AbstractCallbackHandler { public void onStartContainerError(ContainerId containerId, Throwable t) { ComponentInstance instance = liveInstances.get(containerId); if (instance == null) { - LOG.error("No component instance exists for " + containerId); + LOG.error("No component instance exists for {}.", containerId); return; } - LOG.error("Failed to start " + containerId, t); + LOG.error("Failed to start {}.", containerId, t); amRMClient.releaseAssignedContainer(containerId); // After container released, it'll get CONTAINER_COMPLETED event from RM // automatically which will trigger stopping COMPONENT INSTANCE @@ -950,15 +949,14 @@ public boolean hasAtLeastOnePlacementConstraint() { } public boolean terminateServiceIfNeeded(Component component) { - boolean serviceIsTerminated = + return terminateServiceIfDominantComponentFinished(component) || terminateServiceIfAllComponentsFinished(); - return serviceIsTerminated; } /** * If the service state component is finished, the service is also terminated. - * @param component + * @param component service component. */ private boolean terminateServiceIfDominantComponentFinished(Component component) { @@ -981,8 +979,7 @@ private boolean terminateServiceIfDominantComponentFinished(Component state); component.getComponentSpec().setState(state); LOG.info("Dominate component {} finished, exiting Service Master... " + - ", final status=" + (isSucceeded ? "Succeeded" : "Failed"), - component.getName()); + ", final status={}.", component.getName(), (isSucceeded ? "Succeeded" : "Failed")); terminateService(isSucceeded); } } @@ -1042,14 +1039,10 @@ private boolean terminateServiceIfAllComponentsFinished() { } if (shouldTerminate) { - LOG.info("All component finished, exiting Service Master... " - + ", final status=" + (failedComponents.isEmpty() ? - "Succeeded" : - "Failed")); - LOG.info("Succeeded components: [" + org.apache.commons.lang3.StringUtils - .join(succeededComponents, ",") + "]"); - LOG.info("Failed components: [" + org.apache.commons.lang3.StringUtils - .join(failedComponents, ",") + "]"); + LOG.info("All component finished, exiting Service Master... " + + ", final status={}", (failedComponents.isEmpty() ? "Succeeded" : "Failed")); + LOG.info("Succeeded components: [" + StringUtils.join(succeededComponents, ",") + "]"); + LOG.info("Failed components: [" + StringUtils.join(failedComponents, ",") + "]"); terminateService(failedComponents.isEmpty()); } @@ -1093,7 +1086,7 @@ public void syncSysFs(Service yarnApp) { spec = ServiceApiUtil.jsonSerDeser.toJson(yarnApp); for (org.apache.hadoop.yarn.service.api.records.Component c : yarnApp.getComponents()) { - Set nodes = new HashSet(); + Set nodes = new HashSet<>(); boolean update = Boolean.parseBoolean(c.getConfiguration() .getEnv(ApplicationConstants.Environment .YARN_CONTAINER_RUNTIME_YARN_SYSFS_ENABLE.name())); @@ -1109,9 +1102,9 @@ public void syncSysFs(Service yarnApp) { for (String bareHost : nodes) { StringBuilder requestPath = new StringBuilder(); if (YarnConfiguration.useHttps(conf)) { - requestPath.append("https://"); + requestPath.append(HTTPS_PREFIX); } else { - requestPath.append("http://"); + requestPath.append(HTTP_PREFIX); } requestPath.append(bareHost) .append(":") @@ -1129,8 +1122,7 @@ public void syncSysFs(Service yarnApp) { Builder builder = HttpUtil.connect(requestPath.toString()); ClientResponse response = builder.put(ClientResponse.class, spec); if (response.getStatus()!=ClientResponse.Status.OK.getStatusCode()) { - LOG.warn("Error synchronize YARN sysfs: " + - response.getEntity(String.class)); + LOG.warn("Error synchronize YARN sysfs: {}.", response.getEntity(String.class)); success = false; } }