YARN-11711. Clean Up ServiceScheduler Code. (#6977) Contributed by Shilun Fan.

Reviewed-by: 	Steve Loughran <stevel@apache.org>
Signed-off-by: Shilun Fan <slfan1989@apache.org>
This commit is contained in:
slfan1989 2024-08-22 17:49:42 +08:00 committed by GitHub
parent f6c45e0bcf
commit 6be04633b5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 58 additions and 57 deletions

View File

@ -725,4 +725,13 @@
<Match> <Match>
<Package name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.placement.schema" /> <Package name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.placement.schema" />
</Match> </Match>
<!-- The ServiceScheduler#createConfigFileCache method uses the `load` method,
which is not allowed to return null; we can ignore it here. -->
<Match>
<Class name="org.apache.hadoop.yarn.service.ServiceScheduler"/>
<Method name="$1.load(ConfigFile)" />
<Bug pattern="NP_NONNULL_RETURN_VIOLATION"/>
</Match>
</FindBugsFilter> </FindBugsFilter>

View File

@ -123,6 +123,8 @@
.EXIT_FALSE; .EXIT_FALSE;
import static org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes import static org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes
.EXIT_SUCCESS; .EXIT_SUCCESS;
import static org.apache.hadoop.yarn.webapp.util.WebAppUtils.HTTPS_PREFIX;
import static org.apache.hadoop.yarn.webapp.util.WebAppUtils.HTTP_PREFIX;
/** /**
* *
@ -153,10 +155,10 @@ public class ServiceScheduler extends CompositeService {
private boolean timelineServiceEnabled; private boolean timelineServiceEnabled;
// Global diagnostics that will be reported to RM on eRxit. // Global diagnostics that will be reported to RM on exit.
// The unit the number of characters. This will be limited to 64 * 1024 // The unit the number of characters. This will be limited to 64 * 1024
// characters. // characters.
private BoundedAppender diagnostics = new BoundedAppender(64 * 1024); private final BoundedAppender diagnostics = new BoundedAppender(64 * 1024);
// A cache for loading config files from remote such as hdfs // A cache for loading config files from remote such as hdfs
public LoadingCache<ConfigFile, Object> configFileCache = null; public LoadingCache<ConfigFile, Object> configFileCache = null;
@ -168,7 +170,7 @@ public class ServiceScheduler extends CompositeService {
private NMClientAsync nmClient; private NMClientAsync nmClient;
private AsyncDispatcher dispatcher; private AsyncDispatcher dispatcher;
private YarnRegistryViewForProviders yarnRegistryOperations; private YarnRegistryViewForProviders yarnRegistryOperations;
private ServiceContext context; private final ServiceContext context;
private ContainerLaunchService containerLaunchService; private ContainerLaunchService containerLaunchService;
private final Map<ContainerId, ComponentInstance> unRecoveredInstances = private final Map<ContainerId, ComponentInstance> unRecoveredInstances =
new ConcurrentHashMap<>(); new ConcurrentHashMap<>();
@ -185,10 +187,10 @@ public class ServiceScheduler extends CompositeService {
private volatile FinalApplicationStatus finalApplicationStatus = private volatile FinalApplicationStatus finalApplicationStatus =
FinalApplicationStatus.ENDED; FinalApplicationStatus.ENDED;
private Clock systemClock; private final Clock systemClock;
// For unit test override since we don't want to terminate UT process. // For unit test override since we don't want to terminate UT process.
private ServiceUtils.ProcessTerminationHandler private final ServiceUtils.ProcessTerminationHandler
terminationHandler = new ServiceUtils.ProcessTerminationHandler(); terminationHandler = new ServiceUtils.ProcessTerminationHandler();
public ServiceScheduler(ServiceContext context) { public ServiceScheduler(ServiceContext context) {
@ -199,10 +201,10 @@ public ServiceScheduler(ServiceContext context) {
} }
public void buildInstance(ServiceContext context, Configuration configuration) public void buildInstance(ServiceContext context, Configuration configuration)
throws YarnException, IOException { throws YarnException {
app = context.service; app = context.service;
executorService = Executors.newScheduledThreadPool(10); executorService = Executors.newScheduledThreadPool(10);
RegistryOperations registryClient = null; RegistryOperations registryClient;
if (UserGroupInformation.isSecurityEnabled() && if (UserGroupInformation.isSecurityEnabled() &&
!StringUtils.isEmpty(context.principal) !StringUtils.isEmpty(context.principal)
&& !StringUtils.isEmpty(context.keytab)) { && !StringUtils.isEmpty(context.keytab)) {
@ -480,7 +482,7 @@ private void recoverComponents(RegisterApplicationMasterResponse response) {
} }
}); });
if (unRecoveredInstances.size() > 0) { if (!unRecoveredInstances.isEmpty()) {
executorService.schedule(() -> { executorService.schedule(() -> {
synchronized (unRecoveredInstances) { synchronized (unRecoveredInstances) {
// after containerRecoveryTimeout, all the containers that haven't be // after containerRecoveryTimeout, all the containers that haven't be
@ -532,7 +534,8 @@ private void createConfigFileCache(final FileSystem fileSystem) {
this.configFileCache = this.configFileCache =
CacheBuilder.newBuilder().expireAfterAccess(10, TimeUnit.MINUTES) CacheBuilder.newBuilder().expireAfterAccess(10, TimeUnit.MINUTES)
.build(new CacheLoader<ConfigFile, Object>() { .build(new CacheLoader<ConfigFile, Object>() {
@Override public Object load(ConfigFile key) throws Exception { @Override
public Object load(ConfigFile key) throws Exception {
switch (key.getType()) { switch (key.getType()) {
case HADOOP_XML: case HADOOP_XML:
try (FSDataInputStream input = fileSystem try (FSDataInputStream input = fileSystem
@ -560,9 +563,8 @@ private void createConfigFileCache(final FileSystem fileSystem) {
} }
private void registerServiceInstance(ApplicationAttemptId attemptId, private void registerServiceInstance(ApplicationAttemptId attemptId,
Service service) throws IOException { Service service) {
LOG.info("Registering " + attemptId + ", " + service.getName() LOG.info("Registering {}, {} into registry.", attemptId, service.getName());
+ " into registry");
ServiceRecord serviceRecord = new ServiceRecord(); ServiceRecord serviceRecord = new ServiceRecord();
serviceRecord.set(YarnRegistryAttributes.YARN_ID, serviceRecord.set(YarnRegistryAttributes.YARN_ID,
attemptId.getApplicationId().toString()); attemptId.getApplicationId().toString());
@ -570,24 +572,21 @@ private void registerServiceInstance(ApplicationAttemptId attemptId,
PersistencePolicies.APPLICATION); PersistencePolicies.APPLICATION);
serviceRecord.description = "YarnServiceMaster"; serviceRecord.description = "YarnServiceMaster";
executorService.submit(new Runnable() { executorService.submit(() -> {
@Override public void run() { try {
try { yarnRegistryOperations.registerSelf(serviceRecord, false);
yarnRegistryOperations.registerSelf(serviceRecord, false); LOG.info("Registered service under {}; absolute path {}",
LOG.info("Registered service under {}; absolute path {}", yarnRegistryOperations.getSelfRegistrationPath(),
yarnRegistryOperations.getSelfRegistrationPath(), yarnRegistryOperations.getAbsoluteSelfRegistrationPath());
yarnRegistryOperations.getAbsoluteSelfRegistrationPath()); boolean isFirstAttempt = 1 == attemptId.getAttemptId();
boolean isFirstAttempt = 1 == attemptId.getAttemptId(); // delete the children in case there are any and this is an AM startup.
// delete the children in case there are any and this is an AM startup. // just to make sure everything underneath is purged
// just to make sure everything underneath is purged if (isFirstAttempt) {
if (isFirstAttempt) { yarnRegistryOperations.deleteChildren(
yarnRegistryOperations.deleteChildren( yarnRegistryOperations.getSelfRegistrationPath(), true);
yarnRegistryOperations.getSelfRegistrationPath(), true);
}
} catch (IOException e) {
LOG.error(
"Failed to register app " + app.getName() + " in registry", e);
} }
} catch (IOException e) {
LOG.error("Failed to register app {} in registry.", app.getName(), e);
} }
}); });
if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) { if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) {
@ -637,7 +636,7 @@ public void handle(ComponentEvent event) {
Component component = componentsByName.get(event.getName()); Component component = componentsByName.get(event.getName());
if (component == null) { if (component == null) {
LOG.error("No component exists for " + event.getName()); LOG.error("No component exists for {}.", event.getName());
return; return;
} }
try { try {
@ -657,14 +656,14 @@ public void handle(ComponentInstanceEvent event) {
ComponentInstance instance = ComponentInstance instance =
liveInstances.get(event.getContainerId()); liveInstances.get(event.getContainerId());
if (instance == null) { if (instance == null) {
LOG.error("No component instance exists for " + event.getContainerId()); LOG.error("No component instance exists for {}.", event.getContainerId());
return; return;
} }
try { try {
instance.handle(event); instance.handle(event);
} catch (Throwable t) { } catch (Throwable t) {
LOG.error(instance.getCompInstanceId() + LOG.error("{} : Error in handling event type {}.",
": Error in handling event type " + event.getType(), t); instance.getCompInstanceId(), event.getType(), t);
} }
} }
} }
@ -673,7 +672,7 @@ class AMRMClientCallback extends AMRMClientAsync.AbstractCallbackHandler {
@Override @Override
public void onContainersAllocated(List<Container> containers) { public void onContainersAllocated(List<Container> containers) {
LOG.info(containers.size() + " containers allocated. "); LOG.info("{} containers allocated. ", containers.size());
for (Container container : containers) { for (Container container : containers) {
Component comp = componentsById.get(container.getAllocationRequestId()); Component comp = componentsById.get(container.getAllocationRequestId());
ComponentEvent event = ComponentEvent event =
@ -684,8 +683,8 @@ public void onContainersAllocated(List<Container> containers) {
Collection<AMRMClient.ContainerRequest> requests = amRMClient Collection<AMRMClient.ContainerRequest> requests = amRMClient
.getMatchingRequests(container.getAllocationRequestId()); .getMatchingRequests(container.getAllocationRequestId());
LOG.info("[COMPONENT {}]: remove {} outstanding container requests " + LOG.info("[COMPONENT {}]: remove {} outstanding container requests " +
"for allocateId " + container.getAllocationRequestId(), "for allocateId {}.", comp.getName(), requests.size(),
comp.getName(), requests.size()); container.getAllocationRequestId());
// remove the corresponding request // remove the corresponding request
if (requests.iterator().hasNext()) { if (requests.iterator().hasNext()) {
AMRMClient.ContainerRequest request = requests.iterator().next(); AMRMClient.ContainerRequest request = requests.iterator().next();
@ -799,7 +798,7 @@ private class NMClientCallback extends NMClientAsync.AbstractCallbackHandler {
Map<String, ByteBuffer> allServiceResponse) { Map<String, ByteBuffer> allServiceResponse) {
ComponentInstance instance = liveInstances.get(containerId); ComponentInstance instance = liveInstances.get(containerId);
if (instance == null) { if (instance == null) {
LOG.error("No component instance exists for " + containerId); LOG.error("No component instance exists for {}.", containerId);
return; return;
} }
ComponentEvent event = ComponentEvent event =
@ -821,10 +820,10 @@ private class NMClientCallback extends NMClientAsync.AbstractCallbackHandler {
public void onStartContainerError(ContainerId containerId, Throwable t) { public void onStartContainerError(ContainerId containerId, Throwable t) {
ComponentInstance instance = liveInstances.get(containerId); ComponentInstance instance = liveInstances.get(containerId);
if (instance == null) { if (instance == null) {
LOG.error("No component instance exists for " + containerId); LOG.error("No component instance exists for {}.", containerId);
return; return;
} }
LOG.error("Failed to start " + containerId, t); LOG.error("Failed to start {}.", containerId, t);
amRMClient.releaseAssignedContainer(containerId); amRMClient.releaseAssignedContainer(containerId);
// After container released, it'll get CONTAINER_COMPLETED event from RM // After container released, it'll get CONTAINER_COMPLETED event from RM
// automatically which will trigger stopping COMPONENT INSTANCE // automatically which will trigger stopping COMPONENT INSTANCE
@ -950,15 +949,14 @@ public boolean hasAtLeastOnePlacementConstraint() {
} }
public boolean terminateServiceIfNeeded(Component component) { public boolean terminateServiceIfNeeded(Component component) {
boolean serviceIsTerminated = return
terminateServiceIfDominantComponentFinished(component) || terminateServiceIfDominantComponentFinished(component) ||
terminateServiceIfAllComponentsFinished(); terminateServiceIfAllComponentsFinished();
return serviceIsTerminated;
} }
/** /**
* If the service state component is finished, the service is also terminated. * If the service state component is finished, the service is also terminated.
* @param component * @param component service component.
*/ */
private boolean terminateServiceIfDominantComponentFinished(Component private boolean terminateServiceIfDominantComponentFinished(Component
component) { component) {
@ -981,8 +979,7 @@ private boolean terminateServiceIfDominantComponentFinished(Component
state); state);
component.getComponentSpec().setState(state); component.getComponentSpec().setState(state);
LOG.info("Dominate component {} finished, exiting Service Master... " + LOG.info("Dominate component {} finished, exiting Service Master... " +
", final status=" + (isSucceeded ? "Succeeded" : "Failed"), ", final status={}.", component.getName(), (isSucceeded ? "Succeeded" : "Failed"));
component.getName());
terminateService(isSucceeded); terminateService(isSucceeded);
} }
} }
@ -1042,14 +1039,10 @@ private boolean terminateServiceIfAllComponentsFinished() {
} }
if (shouldTerminate) { if (shouldTerminate) {
LOG.info("All component finished, exiting Service Master... " LOG.info("All component finished, exiting Service Master... " +
+ ", final status=" + (failedComponents.isEmpty() ? ", final status={}", (failedComponents.isEmpty() ? "Succeeded" : "Failed"));
"Succeeded" : LOG.info("Succeeded components: [" + StringUtils.join(succeededComponents, ",") + "]");
"Failed")); LOG.info("Failed components: [" + StringUtils.join(failedComponents, ",") + "]");
LOG.info("Succeeded components: [" + org.apache.commons.lang3.StringUtils
.join(succeededComponents, ",") + "]");
LOG.info("Failed components: [" + org.apache.commons.lang3.StringUtils
.join(failedComponents, ",") + "]");
terminateService(failedComponents.isEmpty()); terminateService(failedComponents.isEmpty());
} }
@ -1093,7 +1086,7 @@ public void syncSysFs(Service yarnApp) {
spec = ServiceApiUtil.jsonSerDeser.toJson(yarnApp); spec = ServiceApiUtil.jsonSerDeser.toJson(yarnApp);
for (org.apache.hadoop.yarn.service.api.records.Component c : for (org.apache.hadoop.yarn.service.api.records.Component c :
yarnApp.getComponents()) { yarnApp.getComponents()) {
Set<String> nodes = new HashSet<String>(); Set<String> nodes = new HashSet<>();
boolean update = Boolean.parseBoolean(c.getConfiguration() boolean update = Boolean.parseBoolean(c.getConfiguration()
.getEnv(ApplicationConstants.Environment .getEnv(ApplicationConstants.Environment
.YARN_CONTAINER_RUNTIME_YARN_SYSFS_ENABLE.name())); .YARN_CONTAINER_RUNTIME_YARN_SYSFS_ENABLE.name()));
@ -1109,9 +1102,9 @@ public void syncSysFs(Service yarnApp) {
for (String bareHost : nodes) { for (String bareHost : nodes) {
StringBuilder requestPath = new StringBuilder(); StringBuilder requestPath = new StringBuilder();
if (YarnConfiguration.useHttps(conf)) { if (YarnConfiguration.useHttps(conf)) {
requestPath.append("https://"); requestPath.append(HTTPS_PREFIX);
} else { } else {
requestPath.append("http://"); requestPath.append(HTTP_PREFIX);
} }
requestPath.append(bareHost) requestPath.append(bareHost)
.append(":") .append(":")
@ -1129,8 +1122,7 @@ public void syncSysFs(Service yarnApp) {
Builder builder = HttpUtil.connect(requestPath.toString()); Builder builder = HttpUtil.connect(requestPath.toString());
ClientResponse response = builder.put(ClientResponse.class, spec); ClientResponse response = builder.put(ClientResponse.class, spec);
if (response.getStatus()!=ClientResponse.Status.OK.getStatusCode()) { if (response.getStatus()!=ClientResponse.Status.OK.getStatusCode()) {
LOG.warn("Error synchronize YARN sysfs: " + LOG.warn("Error synchronize YARN sysfs: {}.", response.getEntity(String.class));
response.getEntity(String.class));
success = false; success = false;
} }
} }