YARN-8126. Support auto-spawning of admin configured services during bootstrap of RM. Contributed by Rohith Sharma K S.
This commit is contained in:
parent
c4d3636c21
commit
427ad7ecc4
@ -167,6 +167,7 @@
|
|||||||
<item name="Concepts" href="hadoop-yarn/hadoop-yarn-site/yarn-service/Concepts.html"/>
|
<item name="Concepts" href="hadoop-yarn/hadoop-yarn-site/yarn-service/Concepts.html"/>
|
||||||
<item name="Yarn Service API" href="hadoop-yarn/hadoop-yarn-site/yarn-service/YarnServiceAPI.html"/>
|
<item name="Yarn Service API" href="hadoop-yarn/hadoop-yarn-site/yarn-service/YarnServiceAPI.html"/>
|
||||||
<item name="Service Discovery" href="hadoop-yarn/hadoop-yarn-site/yarn-service/ServiceDiscovery.html"/>
|
<item name="Service Discovery" href="hadoop-yarn/hadoop-yarn-site/yarn-service/ServiceDiscovery.html"/>
|
||||||
|
<item name="System Services" href="hadoop-yarn/hadoop-yarn-site/yarn-service/SystemServices.html"/>
|
||||||
</menu>
|
</menu>
|
||||||
|
|
||||||
<menu name="Hadoop Compatible File Systems" inherit="top">
|
<menu name="Hadoop Compatible File Systems" inherit="top">
|
||||||
|
@ -92,10 +92,12 @@ public class SystemServiceManagerImpl extends AbstractService
|
|||||||
private Thread serviceLaucher;
|
private Thread serviceLaucher;
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
private int skipCounter;
|
private int badFileNameExtensionSkipCounter;
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
private Map<String, Integer> ignoredUserServices =
|
private Map<String, Integer> ignoredUserServices =
|
||||||
new HashMap<>();
|
new HashMap<>();
|
||||||
|
@VisibleForTesting
|
||||||
|
private int badDirSkipCounter;
|
||||||
|
|
||||||
public SystemServiceManagerImpl() {
|
public SystemServiceManagerImpl() {
|
||||||
super(SystemServiceManagerImpl.class.getName());
|
super(SystemServiceManagerImpl.class.getName());
|
||||||
@ -268,6 +270,7 @@ void scanForUserServices() throws IOException {
|
|||||||
} else if (launchType.getPath().getName().equals(ASYNC)) {
|
} else if (launchType.getPath().getName().equals(ASYNC)) {
|
||||||
scanForUserServiceDefinition(launchType.getPath(), asyncUserServices);
|
scanForUserServiceDefinition(launchType.getPath(), asyncUserServices);
|
||||||
} else {
|
} else {
|
||||||
|
badDirSkipCounter++;
|
||||||
LOG.debug("Scanner skips for unknown dir {}.", launchType.getPath());
|
LOG.debug("Scanner skips for unknown dir {}.", launchType.getPath());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -308,7 +311,7 @@ private void scanForUserServiceDefinition(Path userDirPath,
|
|||||||
if (!filename.endsWith(YARN_FILE_SUFFIX)) {
|
if (!filename.endsWith(YARN_FILE_SUFFIX)) {
|
||||||
LOG.info("Scanner skips for unknown file extension, filename = {}",
|
LOG.info("Scanner skips for unknown file extension, filename = {}",
|
||||||
filename);
|
filename);
|
||||||
skipCounter++;
|
badFileNameExtensionSkipCounter++;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
Service service = getServiceDefinition(serviceCache.getPath());
|
Service service = getServiceDefinition(serviceCache.getPath());
|
||||||
@ -325,13 +328,14 @@ private void scanForUserServiceDefinition(Path userDirPath,
|
|||||||
LOG.warn(
|
LOG.warn(
|
||||||
"Ignoring service {} for the user {} as it is already present,"
|
"Ignoring service {} for the user {} as it is already present,"
|
||||||
+ " filename = {}", service.getName(), userName, filename);
|
+ " filename = {}", service.getName(), userName, filename);
|
||||||
}
|
} else {
|
||||||
LOG.info("Added service {} for the user {}, filename = {}",
|
LOG.info("Added service {} for the user {}, filename = {}",
|
||||||
service.getName(), userName, filename);
|
service.getName(), userName, filename);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private Service getServiceDefinition(Path filePath) {
|
private Service getServiceDefinition(Path filePath) {
|
||||||
Service service = null;
|
Service service = null;
|
||||||
@ -375,7 +379,13 @@ Map<String, Set<Service>> getSyncUserServices() {
|
|||||||
return syncUserServices;
|
return syncUserServices;
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting int getSkipCounter() {
|
@VisibleForTesting
|
||||||
return skipCounter;
|
int getBadFileNameExtensionSkipCounter() {
|
||||||
|
return badFileNameExtensionSkipCounter;
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
int getBadDirSkipCounter() {
|
||||||
|
return badDirSkipCounter;
|
||||||
}
|
}
|
||||||
}
|
}
|
@ -41,13 +41,13 @@
|
|||||||
/**
|
/**
|
||||||
* Test class for system service manager.
|
* Test class for system service manager.
|
||||||
*/
|
*/
|
||||||
public class TestSystemServiceImpl {
|
public class TestSystemServiceManagerImpl {
|
||||||
|
|
||||||
private static final Logger LOG =
|
private static final Logger LOG =
|
||||||
LoggerFactory.getLogger(TestSystemServiceImpl.class);
|
LoggerFactory.getLogger(TestSystemServiceManagerImpl.class);
|
||||||
private SystemServiceManagerImpl systemService;
|
private SystemServiceManagerImpl systemService;
|
||||||
private Configuration conf;
|
private Configuration conf;
|
||||||
private String resourcePath = "users";
|
private String resourcePath = "system-services";
|
||||||
|
|
||||||
private String[] users = new String[] {"user1", "user2"};
|
private String[] users = new String[] {"user1", "user2"};
|
||||||
private static Map<String, Set<String>> loadedServices = new HashMap<>();
|
private static Map<String, Set<String>> loadedServices = new HashMap<>();
|
||||||
@ -88,7 +88,9 @@ public void testSystemServiceSubmission() throws Exception {
|
|||||||
ignoredUserServices.containsKey(users[0]));
|
ignoredUserServices.containsKey(users[0]));
|
||||||
int count = ignoredUserServices.get(users[0]);
|
int count = ignoredUserServices.get(users[0]);
|
||||||
Assert.assertEquals(1, count);
|
Assert.assertEquals(1, count);
|
||||||
Assert.assertEquals(1, systemService.getSkipCounter());
|
Assert.assertEquals(1,
|
||||||
|
systemService.getBadFileNameExtensionSkipCounter());
|
||||||
|
Assert.assertEquals(1, systemService.getBadDirSkipCounter());
|
||||||
|
|
||||||
Map<String, Set<Service>> userServices =
|
Map<String, Set<Service>> userServices =
|
||||||
systemService.getSyncUserServices();
|
systemService.getSyncUserServices();
|
||||||
@ -112,7 +114,7 @@ private void verifyForScannedUserServices(
|
|||||||
while (iterator.hasNext()) {
|
while (iterator.hasNext()) {
|
||||||
Service next = iterator.next();
|
Service next = iterator.next();
|
||||||
Assert.assertTrue(
|
Assert.assertTrue(
|
||||||
"Service name doesn't exist in expected " + "userService "
|
"Service name doesn't exist in expected userService "
|
||||||
+ serviceNames, serviceNames.contains(next.getName()));
|
+ serviceNames, serviceNames.contains(next.getName()));
|
||||||
}
|
}
|
||||||
}
|
}
|
@ -0,0 +1,16 @@
|
|||||||
|
{
|
||||||
|
"name": "bad",
|
||||||
|
"version": "1.0.0",
|
||||||
|
"components" :
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"name": "simple",
|
||||||
|
"number_of_containers": 1,
|
||||||
|
"launch_command": "sleep 2",
|
||||||
|
"resource": {
|
||||||
|
"cpus": 1,
|
||||||
|
"memory": "128"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
@ -1,156 +0,0 @@
|
|||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.service;
|
|
||||||
|
|
||||||
import org.apache.hadoop.fs.Path;
|
|
||||||
import org.apache.hadoop.registry.client.api.RegistryOperations;
|
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
||||||
import org.apache.hadoop.yarn.service.api.records.Artifact;
|
|
||||||
import org.apache.hadoop.yarn.service.api.records.ComponentState;
|
|
||||||
import org.apache.hadoop.yarn.service.api.records.Service;
|
|
||||||
import org.apache.hadoop.yarn.service.api.records.ServiceState;
|
|
||||||
import org.apache.hadoop.yarn.service.exceptions.SliderException;
|
|
||||||
import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders;
|
|
||||||
import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
|
|
||||||
import org.junit.Assert;
|
|
||||||
import org.junit.Rule;
|
|
||||||
import org.junit.Test;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
import static org.mockito.Mockito.mock;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Tests for {@link ServiceManager}.
|
|
||||||
*/
|
|
||||||
public class TestSystemServiceManager {
|
|
||||||
|
|
||||||
@Rule
|
|
||||||
public ServiceTestUtils.ServiceFSWatcher rule =
|
|
||||||
new ServiceTestUtils.ServiceFSWatcher();
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testUpgrade() throws IOException, SliderException {
|
|
||||||
ServiceManager serviceManager = createTestServiceManager("testUpgrade");
|
|
||||||
upgrade(serviceManager, "v2", false);
|
|
||||||
Assert.assertEquals("service not upgraded", ServiceState.UPGRADING,
|
|
||||||
serviceManager.getServiceSpec().getState());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testRestartNothingToUpgrade()
|
|
||||||
throws IOException, SliderException {
|
|
||||||
ServiceManager serviceManager = createTestServiceManager("testRestart");
|
|
||||||
upgrade(serviceManager, "v2", false);
|
|
||||||
|
|
||||||
//make components stable
|
|
||||||
serviceManager.getServiceSpec().getComponents().forEach(comp -> {
|
|
||||||
comp.setState(ComponentState.STABLE);
|
|
||||||
});
|
|
||||||
serviceManager.handle(new ServiceEvent(ServiceEventType.START));
|
|
||||||
Assert.assertEquals("service not re-started", ServiceState.STABLE,
|
|
||||||
serviceManager.getServiceSpec().getState());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testRestartWithPendingUpgrade()
|
|
||||||
throws IOException, SliderException {
|
|
||||||
ServiceManager serviceManager = createTestServiceManager("testRestart");
|
|
||||||
upgrade(serviceManager, "v2", true);
|
|
||||||
serviceManager.handle(new ServiceEvent(ServiceEventType.START));
|
|
||||||
Assert.assertEquals("service should still be upgrading",
|
|
||||||
ServiceState.UPGRADING, serviceManager.getServiceSpec().getState());
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
private void upgrade(ServiceManager service, String version,
|
|
||||||
boolean upgradeArtifact)
|
|
||||||
throws IOException, SliderException {
|
|
||||||
Service upgradedDef = ServiceTestUtils.createExampleApplication();
|
|
||||||
upgradedDef.setName(service.getName());
|
|
||||||
upgradedDef.setVersion(version);
|
|
||||||
if (upgradeArtifact) {
|
|
||||||
Artifact upgradedArtifact = createTestArtifact("2");
|
|
||||||
upgradedDef.getComponents().forEach(component -> {
|
|
||||||
component.setArtifact(upgradedArtifact);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
writeUpgradedDef(upgradedDef);
|
|
||||||
ServiceEvent upgradeEvent = new ServiceEvent(ServiceEventType.UPGRADE);
|
|
||||||
upgradeEvent.setVersion("v2");
|
|
||||||
service.handle(upgradeEvent);
|
|
||||||
}
|
|
||||||
|
|
||||||
private ServiceManager createTestServiceManager(String name)
|
|
||||||
throws IOException {
|
|
||||||
ServiceContext context = new ServiceContext();
|
|
||||||
context.service = createBaseDef(name);
|
|
||||||
context.fs = rule.getFs();
|
|
||||||
|
|
||||||
context.scheduler = new ServiceScheduler(context) {
|
|
||||||
@Override
|
|
||||||
protected YarnRegistryViewForProviders createYarnRegistryOperations(
|
|
||||||
ServiceContext context, RegistryOperations registryClient) {
|
|
||||||
return mock(YarnRegistryViewForProviders.class);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
context.scheduler.init(rule.getConf());
|
|
||||||
|
|
||||||
Map<String, org.apache.hadoop.yarn.service.component.Component>
|
|
||||||
componentState = context.scheduler.getAllComponents();
|
|
||||||
context.service.getComponents().forEach(component -> {
|
|
||||||
componentState.put(component.getName(),
|
|
||||||
new org.apache.hadoop.yarn.service.component.Component(component,
|
|
||||||
1L, context));
|
|
||||||
});
|
|
||||||
return new ServiceManager(context);
|
|
||||||
}
|
|
||||||
|
|
||||||
static Service createBaseDef(String name) {
|
|
||||||
ApplicationId applicationId = ApplicationId.newInstance(
|
|
||||||
System.currentTimeMillis(), 1);
|
|
||||||
Service serviceDef = ServiceTestUtils.createExampleApplication();
|
|
||||||
serviceDef.setId(applicationId.toString());
|
|
||||||
serviceDef.setName(name);
|
|
||||||
serviceDef.setState(ServiceState.STARTED);
|
|
||||||
Artifact artifact = createTestArtifact("1");
|
|
||||||
|
|
||||||
serviceDef.getComponents().forEach(component ->
|
|
||||||
component.setArtifact(artifact));
|
|
||||||
return serviceDef;
|
|
||||||
}
|
|
||||||
|
|
||||||
static Artifact createTestArtifact(String artifactId) {
|
|
||||||
Artifact artifact = new Artifact();
|
|
||||||
artifact.setId(artifactId);
|
|
||||||
artifact.setType(Artifact.TypeEnum.TARBALL);
|
|
||||||
return artifact;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void writeUpgradedDef(Service upgradedDef)
|
|
||||||
throws IOException, SliderException {
|
|
||||||
Path upgradePath = rule.getFs().buildClusterUpgradeDirPath(
|
|
||||||
upgradedDef.getName(), upgradedDef.getVersion());
|
|
||||||
ServiceApiUtil.createDirAndPersistApp(rule.getFs(), upgradePath,
|
|
||||||
upgradedDef);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
@ -0,0 +1,66 @@
|
|||||||
|
<!---
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License. See accompanying LICENSE file.
|
||||||
|
-->
|
||||||
|
|
||||||
|
# System Services
|
||||||
|
|
||||||
|
## Overview
|
||||||
|
System services are admin configured services which are auto deployed during bootstrap of ResourceManager. This would work only when API-Server is started as part of ResourceManager. Refer [Manage services on YARN](QuickStart.html#Manage_services_on_YARN_via_REST_API). This document describes how to configure and deploy system services.
|
||||||
|
|
||||||
|
## Configuration
|
||||||
|
|
||||||
|
| Name | Description |
|
||||||
|
| ------------ | ------------- |
|
||||||
|
|yarn.service.system-service.dir| FS directory path to load and deploy admin configured services. These service spec files should be kept with proper hierarchy.|
|
||||||
|
|
||||||
|
## Hierarchy of FS path
|
||||||
|
After configuring *yarn.service.system-service.dir* path, the spec files should be kept with below hierarchy.
|
||||||
|
````
|
||||||
|
$SYSTEM_SERVICE_DIR_PATH/<Launch-Mode>/<Users>/<Yarnfiles>.
|
||||||
|
````
|
||||||
|
### Launch-Mode
|
||||||
|
Launch-Mode indicates that how the service should be deployed. Services can be auto deployed either synchronously or asynchronously.
|
||||||
|
|
||||||
|
#### sync
|
||||||
|
These services are started synchronously along with RM. This might delay a bit RM transition to active period. This is useful when deploying critical services to get started sooner.
|
||||||
|
|
||||||
|
#### async
|
||||||
|
These services are started asynchronously without impacting RM transition period.
|
||||||
|
|
||||||
|
### Users
|
||||||
|
Users are the owner of the system service who has full access to modify it. Each users can own multiple services. Note that service names are unique per user.
|
||||||
|
|
||||||
|
### Yarnfiles
|
||||||
|
YarnFiles are the spec files to launch services. These files must have .yarnfile extension otherwise those files are ignored.
|
||||||
|
|
||||||
|
### Example of hierarchy to configure system services.
|
||||||
|
|
||||||
|
```
|
||||||
|
SYSTEM_SERVICE_DIR_PATH
|
||||||
|
|---- sync
|
||||||
|
| |--- user1
|
||||||
|
| | |---- service1.yarnfile
|
||||||
|
| | |---- service2.yarnfile
|
||||||
|
| |--- user2
|
||||||
|
| | |---- service3.yarnfile
|
||||||
|
| | ....
|
||||||
|
| |
|
||||||
|
|---- async
|
||||||
|
| |--- user3
|
||||||
|
| | |---- service1.yarnfile
|
||||||
|
| | |---- service2.yarnfile
|
||||||
|
| |--- user4
|
||||||
|
| | |---- service3.yarnfile
|
||||||
|
| | ....
|
||||||
|
| |
|
||||||
|
```
|
Loading…
Reference in New Issue
Block a user