YARN-7202. Add UT for api-server. Contributed by Eric Yang

This commit is contained in:
Jian He 2017-10-12 10:57:35 -07:00
parent b8a7ef1b64
commit b57144a172
7 changed files with 690 additions and 173 deletions

View File

@ -41,7 +41,6 @@
</resources> </resources>
<plugins> <plugins>
<plugin> <plugin>
<groupId>org.apache.maven.plugins</groupId> <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId> <artifactId>maven-jar-plugin</artifactId>
@ -66,17 +65,6 @@
</execution> </execution>
</executions> </executions>
</plugin> </plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<environmentVariables>
<JAVA_HOME>${java.home}</JAVA_HOME>
</environmentVariables>
</configuration>
</plugin>
</plugins> </plugins>
</build> </build>
@ -84,6 +72,7 @@
</reporting> </reporting>
<dependencies> <dependencies>
<dependency> <dependency>
<groupId>org.apache.hadoop</groupId> <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-services-core</artifactId> <artifactId>hadoop-yarn-services-core</artifactId>
@ -116,5 +105,26 @@
<groupId>javax.ws.rs</groupId> <groupId>javax.ws.rs</groupId>
<artifactId>jsr311-api</artifactId> <artifactId>jsr311-api</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
<!-- ======================================================== -->
<!-- Test dependencies -->
<!-- ======================================================== -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -165,12 +165,12 @@ private Response stopService(String appName, boolean destroy) {
} else { } else {
LOG.info("Successfully stopped service {}", appName); LOG.info("Successfully stopped service {}", appName);
} }
return Response.status(Status.NO_CONTENT).build(); return Response.status(Status.OK).build();
} catch (ApplicationNotFoundException e) { } catch (ApplicationNotFoundException e) {
ServiceStatus serviceStatus = new ServiceStatus(); ServiceStatus serviceStatus = new ServiceStatus();
serviceStatus.setDiagnostics( serviceStatus.setDiagnostics(
"Service " + appName + " not found " + e.getMessage()); "Service " + appName + " is not found in YARN: " + e.getMessage());
return Response.status(Status.NOT_FOUND).entity(serviceStatus) return Response.status(Status.BAD_REQUEST).entity(serviceStatus)
.build(); .build();
} catch (Exception e) { } catch (Exception e) {
ServiceStatus serviceStatus = new ServiceStatus(); ServiceStatus serviceStatus = new ServiceStatus();
@ -245,7 +245,8 @@ public Response updateService(@PathParam(SERVICE_NAME) String appName,
// flex a single component app // flex a single component app
if (updateServiceData.getNumberOfContainers() != null && !ServiceApiUtil if (updateServiceData.getNumberOfContainers() != null && !ServiceApiUtil
.hasComponent(updateServiceData)) { .hasComponent(updateServiceData)) {
Component defaultComp = ServiceApiUtil.createDefaultComponent(updateServiceData); Component defaultComp = ServiceApiUtil
.createDefaultComponent(updateServiceData);
return updateComponent(updateServiceData.getName(), defaultComp.getName(), return updateComponent(updateServiceData.getName(), defaultComp.getName(),
defaultComp); defaultComp);
} }
@ -291,4 +292,16 @@ private Response startService(String appName) {
.entity(status).build(); .entity(status).build();
} }
} }
/**
* Used by negative test case.
*
* @param mockServerClient - A mocked version of ServiceClient
*/
public static void setServiceClient(ServiceClient mockServerClient) {
SERVICE_CLIENT = mockServerClient;
SERVICE_CLIENT.init(YARN_CONFIG);
SERVICE_CLIENT.start();
}
} }

View File

@ -74,7 +74,7 @@ public ApiServerWebApp() {
@Override @Override
protected void serviceStart() throws Exception { protected void serviceStart() throws Exception {
bindAddress = getConfig().getSocketAddr(API_SERVER_ADDRESS, bindAddress = getConfig().getSocketAddr(API_SERVER_ADDRESS,
DEFAULT_API_SERVER_ADDRESS , DEFAULT_API_SERVER_PORT); DEFAULT_API_SERVER_ADDRESS, DEFAULT_API_SERVER_PORT);
logger.info("YARN API server running on " + bindAddress); logger.info("YARN API server running on " + bindAddress);
if (UserGroupInformation.isSecurityEnabled()) { if (UserGroupInformation.isSecurityEnabled()) {
doSecureLogin(getConfig()); doSecureLogin(getConfig());

View File

@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.service;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.client.ServiceClient;
import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
/**
* A mock version of ServiceClient - This class is design
* to simulate various error conditions that will happen
* when a consumer class calls ServiceClient.
*/
public class ServiceClientTest extends ServiceClient {
private Configuration conf = new Configuration();
protected static void init() {
}
public ServiceClientTest() {
super();
}
@Override
public Configuration getConfig() {
return conf;
}
@Override
public ApplicationId actionCreate(Service service) {
String serviceName = service.getName();
ServiceApiUtil.validateNameFormat(serviceName, getConfig());
return ApplicationId.newInstance(System.currentTimeMillis(), 1);
}
@Override
public Service getStatus(String appName) {
if (appName == null) {
throw new NullPointerException();
}
if (appName.equals("jenkins")) {
return new Service();
} else {
throw new IllegalArgumentException();
}
}
@Override
public int actionStart(String serviceName)
throws YarnException, IOException {
if (serviceName == null) {
throw new NullPointerException();
}
if (serviceName.equals("jenkins")) {
return EXIT_SUCCESS;
} else {
throw new ApplicationNotFoundException("");
}
}
@Override
public int actionStop(String serviceName, boolean waitForAppStopped)
throws YarnException, IOException {
if (serviceName == null) {
throw new NullPointerException();
}
if (serviceName.equals("jenkins")) {
return EXIT_SUCCESS;
} else {
throw new ApplicationNotFoundException("");
}
}
@Override
public int actionDestroy(String serviceName) {
if (serviceName == null) {
throw new NullPointerException();
}
if (serviceName.equals("jenkins")) {
return EXIT_SUCCESS;
} else {
throw new IllegalArgumentException();
}
}
}

View File

@ -0,0 +1,366 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.service;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.service.api.records.Artifact;
import org.apache.hadoop.yarn.service.api.records.Artifact.TypeEnum;
import org.apache.hadoop.yarn.service.api.records.Component;
import org.apache.hadoop.yarn.service.api.records.Resource;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.api.records.ServiceState;
import org.apache.hadoop.yarn.service.client.ServiceClient;
import org.apache.hadoop.yarn.service.webapp.ApiServer;
import javax.ws.rs.Path;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import static org.junit.Assert.*;
/**
* Test case for ApiServer REST API.
*
*/
public class TestApiServer {
private ApiServer apiServer;
@Before
public void setup() throws Exception {
ServiceClient mockServerClient = new ServiceClientTest();
Configuration conf = new Configuration();
conf.set("yarn.api-service.service.client.class",
ServiceClientTest.class.getName());
ApiServer.setServiceClient(mockServerClient);
this.apiServer = new ApiServer(conf);
}
@Test
public void testPathAnnotation() {
assertNotNull(this.apiServer.getClass().getAnnotation(Path.class));
assertTrue("The controller has the annotation Path",
this.apiServer.getClass().isAnnotationPresent(Path.class));
final Path path = this.apiServer.getClass()
.getAnnotation(Path.class);
assertEquals("The path has /ws/v1 annotation", path.value(),
"/ws/v1");
}
@Test
public void testGetVersion() {
final Response actual = apiServer.getVersion();
assertEquals("Version number is", actual.getStatus(),
Response.ok().build().getStatus());
}
@Test
public void testBadCreateService() {
Service service = new Service();
// Test for invalid argument
final Response actual = apiServer.createService(service);
assertEquals("Create service is ", actual.getStatus(),
Response.status(Status.BAD_REQUEST).build().getStatus());
}
@Test
public void testGoodCreateService() {
Service service = new Service();
service.setName("jenkins");
Artifact artifact = new Artifact();
artifact.setType(TypeEnum.DOCKER);
artifact.setId("jenkins:latest");
Resource resource = new Resource();
resource.setCpus(1);
resource.setMemory("2048");
List<Component> components = new ArrayList<Component>();
Component c = new Component();
c.setName("jenkins");
c.setNumberOfContainers(1L);
c.setArtifact(artifact);
c.setLaunchCommand("");
c.setResource(resource);
components.add(c);
service.setComponents(components);
final Response actual = apiServer.createService(service);
assertEquals("Create service is ", actual.getStatus(),
Response.status(Status.ACCEPTED).build().getStatus());
}
@Test
public void testBadGetService() {
final Response actual = apiServer.getService("no-jenkins");
assertEquals("Get service is ", actual.getStatus(),
Response.status(Status.NOT_FOUND).build().getStatus());
}
@Test
public void testBadGetService2() {
final Response actual = apiServer.getService(null);
assertEquals("Get service is ", actual.getStatus(),
Response.status(Status.INTERNAL_SERVER_ERROR)
.build().getStatus());
}
@Test
public void testGoodGetService() {
final Response actual = apiServer.getService("jenkins");
assertEquals("Get service is ", actual.getStatus(),
Response.status(Status.OK).build().getStatus());
}
@Test
public void testBadDeleteService() {
final Response actual = apiServer.deleteService("no-jenkins");
assertEquals("Delete service is ", actual.getStatus(),
Response.status(Status.BAD_REQUEST).build().getStatus());
}
@Test
public void testBadDeleteService2() {
final Response actual = apiServer.deleteService(null);
assertEquals("Delete service is ", actual.getStatus(),
Response.status(Status.INTERNAL_SERVER_ERROR)
.build().getStatus());
}
@Test
public void testGoodDeleteService() {
final Response actual = apiServer.deleteService("jenkins");
assertEquals("Delete service is ", actual.getStatus(),
Response.status(Status.OK).build().getStatus());
}
@Test
public void testDecreaseContainerAndStop() {
Service service = new Service();
service.setState(ServiceState.STOPPED);
service.setName("jenkins");
Artifact artifact = new Artifact();
artifact.setType(TypeEnum.DOCKER);
artifact.setId("jenkins:latest");
Resource resource = new Resource();
resource.setCpus(1);
resource.setMemory("2048");
List<Component> components = new ArrayList<Component>();
Component c = new Component();
c.setName("jenkins");
c.setNumberOfContainers(0L);
c.setArtifact(artifact);
c.setLaunchCommand("");
c.setResource(resource);
components.add(c);
service.setComponents(components);
final Response actual = apiServer.updateService("jenkins",
service);
assertEquals("update service is ", actual.getStatus(),
Response.status(Status.OK).build().getStatus());
}
@Test
public void testBadDecreaseContainerAndStop() {
Service service = new Service();
service.setState(ServiceState.STOPPED);
service.setName("no-jenkins");
Artifact artifact = new Artifact();
artifact.setType(TypeEnum.DOCKER);
artifact.setId("jenkins:latest");
Resource resource = new Resource();
resource.setCpus(1);
resource.setMemory("2048");
List<Component> components = new ArrayList<Component>();
Component c = new Component();
c.setName("no-jenkins");
c.setNumberOfContainers(-1L);
c.setArtifact(artifact);
c.setLaunchCommand("");
c.setResource(resource);
components.add(c);
service.setComponents(components);
System.out.println("before stop");
final Response actual = apiServer.updateService("no-jenkins",
service);
assertEquals("flex service is ", actual.getStatus(),
Response.status(Status.BAD_REQUEST).build().getStatus());
}
@Test
public void testIncreaseContainersAndStart() {
Service service = new Service();
service.setState(ServiceState.STARTED);
service.setName("jenkins");
Artifact artifact = new Artifact();
artifact.setType(TypeEnum.DOCKER);
artifact.setId("jenkins:latest");
Resource resource = new Resource();
resource.setCpus(1);
resource.setMemory("2048");
List<Component> components = new ArrayList<Component>();
Component c = new Component();
c.setName("jenkins");
c.setNumberOfContainers(2L);
c.setArtifact(artifact);
c.setLaunchCommand("");
c.setResource(resource);
components.add(c);
service.setComponents(components);
final Response actual = apiServer.updateService("jenkins",
service);
assertEquals("flex service is ", actual.getStatus(),
Response.status(Status.OK).build().getStatus());
}
@Test
public void testBadStartServices() {
Service service = new Service();
service.setState(ServiceState.STARTED);
service.setName("no-jenkins");
Artifact artifact = new Artifact();
artifact.setType(TypeEnum.DOCKER);
artifact.setId("jenkins:latest");
Resource resource = new Resource();
resource.setCpus(1);
resource.setMemory("2048");
List<Component> components = new ArrayList<Component>();
Component c = new Component();
c.setName("jenkins");
c.setNumberOfContainers(2L);
c.setArtifact(artifact);
c.setLaunchCommand("");
c.setResource(resource);
components.add(c);
service.setComponents(components);
final Response actual = apiServer.updateService("no-jenkins",
service);
assertEquals("start service is ", actual.getStatus(),
Response.status(Status.INTERNAL_SERVER_ERROR).build()
.getStatus());
}
@Test
public void testGoodStartServices() {
Service service = new Service();
service.setState(ServiceState.STARTED);
service.setName("jenkins");
Artifact artifact = new Artifact();
artifact.setType(TypeEnum.DOCKER);
artifact.setId("jenkins:latest");
Resource resource = new Resource();
resource.setCpus(1);
resource.setMemory("2048");
List<Component> components = new ArrayList<Component>();
Component c = new Component();
c.setName("jenkins");
c.setNumberOfContainers(2L);
c.setArtifact(artifact);
c.setLaunchCommand("");
c.setResource(resource);
components.add(c);
service.setComponents(components);
final Response actual = apiServer.updateService("jenkins",
service);
assertEquals("start service is ", actual.getStatus(),
Response.status(Status.OK).build().getStatus());
}
@Test
public void testBadStopServices() {
Service service = new Service();
service.setState(ServiceState.STOPPED);
service.setName("no-jenkins");
Artifact artifact = new Artifact();
artifact.setType(TypeEnum.DOCKER);
artifact.setId("jenkins:latest");
Resource resource = new Resource();
resource.setCpus(1);
resource.setMemory("2048");
List<Component> components = new ArrayList<Component>();
Component c = new Component();
c.setName("no-jenkins");
c.setNumberOfContainers(-1L);
c.setArtifact(artifact);
c.setLaunchCommand("");
c.setResource(resource);
components.add(c);
service.setComponents(components);
System.out.println("before stop");
final Response actual = apiServer.updateService("no-jenkins",
service);
assertEquals("stop service is ", actual.getStatus(),
Response.status(Status.BAD_REQUEST).build().getStatus());
}
@Test
public void testGoodStopServices() {
Service service = new Service();
service.setState(ServiceState.STARTED);
service.setName("jenkins");
Artifact artifact = new Artifact();
artifact.setType(TypeEnum.DOCKER);
artifact.setId("jenkins:latest");
Resource resource = new Resource();
resource.setCpus(1);
resource.setMemory("2048");
List<Component> components = new ArrayList<Component>();
Component c = new Component();
c.setName("jenkins");
c.setNumberOfContainers(-1L);
c.setArtifact(artifact);
c.setLaunchCommand("");
c.setResource(resource);
components.add(c);
service.setComponents(components);
System.out.println("before stop");
final Response actual = apiServer.updateService("jenkins",
service);
assertEquals("stop service is ", actual.getStatus(),
Response.status(Status.OK).build().getStatus());
}
@Test
public void testUpdateService() {
Service service = new Service();
service.setState(ServiceState.STARTED);
service.setName("no-jenkins");
Artifact artifact = new Artifact();
artifact.setType(TypeEnum.DOCKER);
artifact.setId("jenkins:latest");
Resource resource = new Resource();
resource.setCpus(1);
resource.setMemory("2048");
List<Component> components = new ArrayList<Component>();
Component c = new Component();
c.setName("no-jenkins");
c.setNumberOfContainers(-1L);
c.setArtifact(artifact);
c.setLaunchCommand("");
c.setResource(resource);
components.add(c);
service.setComponents(components);
System.out.println("before stop");
final Response actual = apiServer.updateService("no-jenkins",
service);
assertEquals("update service is ", actual.getStatus(),
Response.status(Status.INTERNAL_SERVER_ERROR)
.build().getStatus());
}
}

View File

@ -18,24 +18,58 @@
package org.apache.hadoop.yarn.service; package org.apache.hadoop.yarn.service;
import org.apache.commons.io.FileUtils;
import org.apache.curator.test.TestingCluster;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.yarn.service.api.records.Service; import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.conf.YarnServiceConf;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.service.api.records.Component; import org.apache.hadoop.yarn.service.api.records.Component;
import org.apache.hadoop.yarn.service.api.records.Resource; import org.apache.hadoop.yarn.service.api.records.Resource;
import org.apache.hadoop.yarn.service.utils.JsonSerDeser; import org.apache.hadoop.yarn.service.utils.JsonSerDeser;
import org.apache.hadoop.yarn.service.utils.ServiceApiUtil; import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
import org.apache.hadoop.yarn.service.utils.SliderFileSystem; import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin;
import org.apache.hadoop.yarn.util.ProcfsBasedProcessTree;
import org.codehaus.jackson.map.PropertyNamingStrategy; import org.codehaus.jackson.map.PropertyNamingStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream;
import java.net.URL;
import static org.apache.hadoop.registry.client.api.RegistryConstants.KEY_REGISTRY_ZK_QUORUM;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.NM_PMEM_CHECK_ENABLED;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.NM_VMEM_CHECK_ENABLED;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_ENABLED;
import static org.apache.hadoop.yarn.service.conf.YarnServiceConf.AM_RESOURCE_MEM;
import static org.apache.hadoop.yarn.service.conf.YarnServiceConf.YARN_SERVICE_BASE_PATH;
import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
public class ServiceTestUtils { public class ServiceTestUtils {
private static final Logger LOG =
LoggerFactory.getLogger(ServiceTestUtils.class);
private MiniYARNCluster yarnCluster = null;
private MiniDFSCluster hdfsCluster = null;
private FileSystem fs = null;
private Configuration conf = null;
public static final int NUM_NMS = 1;
private File basedir;
public static final JsonSerDeser<Service> JSON_SER_DESER = public static final JsonSerDeser<Service> JSON_SER_DESER =
new JsonSerDeser<>(Service.class, new JsonSerDeser<>(Service.class,
PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES); PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES);
@ -84,4 +118,141 @@ public static SliderFileSystem initMockFs(Service ext) throws IOException {
ServiceApiUtil.setJsonSerDeser(jsonSerDeser); ServiceApiUtil.setJsonSerDeser(jsonSerDeser);
return sfs; return sfs;
} }
protected void setConf(YarnConfiguration conf) {
this.conf = conf;
}
protected Configuration getConf() {
return conf;
}
protected FileSystem getFS() {
return fs;
}
protected void setupInternal(int numNodeManager)
throws Exception {
LOG.info("Starting up YARN cluster");
// Logger rootLogger = LogManager.getRootLogger();
// rootLogger.setLevel(Level.DEBUG);
setConf(new YarnConfiguration());
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 128);
// reduce the teardown waiting time
conf.setLong(YarnConfiguration.DISPATCHER_DRAIN_EVENTS_TIMEOUT, 1000);
conf.set("yarn.log.dir", "target");
// mark if we need to launch the v1 timeline server
// disable aux-service based timeline aggregators
conf.set(YarnConfiguration.NM_AUX_SERVICES, "");
conf.set(YarnConfiguration.NM_VMEM_PMEM_RATIO, "8");
// Enable ContainersMonitorImpl
conf.set(YarnConfiguration.NM_CONTAINER_MON_RESOURCE_CALCULATOR,
LinuxResourceCalculatorPlugin.class.getName());
conf.set(YarnConfiguration.NM_CONTAINER_MON_PROCESS_TREE,
ProcfsBasedProcessTree.class.getName());
conf.setBoolean(
YarnConfiguration.YARN_MINICLUSTER_CONTROL_RESOURCE_MONITORING, true);
conf.setBoolean(TIMELINE_SERVICE_ENABLED, false);
conf.setInt(YarnConfiguration.NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE, 100);
conf.setLong(DEBUG_NM_DELETE_DELAY_SEC, 60000);
conf.setLong(AM_RESOURCE_MEM, 526);
conf.setLong(YarnServiceConf.READINESS_CHECK_INTERVAL, 5);
// Disable vmem check to disallow NM killing the container
conf.setBoolean(NM_VMEM_CHECK_ENABLED, false);
conf.setBoolean(NM_PMEM_CHECK_ENABLED, false);
// setup zk cluster
TestingCluster zkCluster;
zkCluster = new TestingCluster(1);
zkCluster.start();
conf.set(YarnConfiguration.RM_ZK_ADDRESS, zkCluster.getConnectString());
conf.set(KEY_REGISTRY_ZK_QUORUM, zkCluster.getConnectString());
LOG.info("ZK cluster: " + zkCluster.getConnectString());
fs = FileSystem.get(conf);
basedir = new File("target", "apps");
if (basedir.exists()) {
FileUtils.deleteDirectory(basedir);
} else {
basedir.mkdirs();
}
conf.set(YARN_SERVICE_BASE_PATH, basedir.getAbsolutePath());
if (yarnCluster == null) {
yarnCluster =
new MiniYARNCluster(TestYarnNativeServices.class.getSimpleName(), 1,
numNodeManager, 1, 1);
yarnCluster.init(conf);
yarnCluster.start();
waitForNMsToRegister();
URL url = Thread.currentThread().getContextClassLoader()
.getResource("yarn-site.xml");
if (url == null) {
throw new RuntimeException(
"Could not find 'yarn-site.xml' dummy file in classpath");
}
Configuration yarnClusterConfig = yarnCluster.getConfig();
yarnClusterConfig.set(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
new File(url.getPath()).getParent());
//write the document to a buffer (not directly to the file, as that
//can cause the file being written to get read -which will then fail.
ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
yarnClusterConfig.writeXml(bytesOut);
bytesOut.close();
//write the bytes to the file in the classpath
OutputStream os = new FileOutputStream(new File(url.getPath()));
os.write(bytesOut.toByteArray());
os.close();
LOG.info("Write yarn-site.xml configs to: " + url);
}
if (hdfsCluster == null) {
HdfsConfiguration hdfsConfig = new HdfsConfiguration();
hdfsCluster = new MiniDFSCluster.Builder(hdfsConfig)
.numDataNodes(1).build();
}
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
LOG.info("setup thread sleep interrupted. message=" + e.getMessage());
}
}
public void shutdown() throws IOException {
if (yarnCluster != null) {
try {
yarnCluster.stop();
} finally {
yarnCluster = null;
}
}
if (hdfsCluster != null) {
try {
hdfsCluster.shutdown();
} finally {
hdfsCluster = null;
}
}
if (basedir != null) {
FileUtils.deleteDirectory(basedir);
}
SliderFileSystem sfs = new SliderFileSystem(conf);
Path appDir = sfs.getBaseApplicationPath();
sfs.getFileSystem().delete(appDir, true);
}
private void waitForNMsToRegister() throws Exception {
int sec = 60;
while (sec >= 0) {
if (yarnCluster.getResourceManager().getRMContext().getRMNodes().size()
>= NUM_NMS) {
break;
}
Thread.sleep(1000);
sec--;
}
}
} }

View File

@ -18,31 +18,20 @@
package org.apache.hadoop.yarn.service; package org.apache.hadoop.yarn.service;
import org.apache.commons.io.FileUtils;
import org.apache.curator.test.TestingCluster;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.service.api.records.Service; import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.api.records.Component; import org.apache.hadoop.yarn.service.api.records.Component;
import org.apache.hadoop.yarn.service.api.records.Container; import org.apache.hadoop.yarn.service.api.records.Container;
import org.apache.hadoop.yarn.service.api.records.ContainerState; import org.apache.hadoop.yarn.service.api.records.ContainerState;
import org.apache.hadoop.yarn.service.client.ServiceClient; import org.apache.hadoop.yarn.service.client.ServiceClient;
import org.apache.hadoop.yarn.service.conf.YarnServiceConf;
import org.apache.hadoop.yarn.service.exceptions.SliderException; import org.apache.hadoop.yarn.service.exceptions.SliderException;
import org.apache.hadoop.yarn.service.utils.SliderFileSystem; import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin;
import org.apache.hadoop.yarn.util.ProcfsBasedProcessTree;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
@ -52,12 +41,7 @@
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream;
import java.net.URL;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
@ -66,28 +50,18 @@
import java.util.TreeSet; import java.util.TreeSet;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import static org.apache.hadoop.registry.client.api.RegistryConstants.KEY_REGISTRY_ZK_QUORUM;
import static org.apache.hadoop.yarn.api.records.YarnApplicationState.FINISHED; import static org.apache.hadoop.yarn.api.records.YarnApplicationState.FINISHED;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.*; import static org.apache.hadoop.yarn.conf.YarnConfiguration.*;
import static org.apache.hadoop.yarn.service.conf.YarnServiceConf.AM_RESOURCE_MEM;
import static org.apache.hadoop.yarn.service.conf.YarnServiceConf.YARN_SERVICE_BASE_PATH;
/** /**
* End to end tests to test deploying services with MiniYarnCluster and a in-JVM * End to end tests to test deploying services with MiniYarnCluster and a in-JVM
* ZK testing cluster. * ZK testing cluster.
*/ */
public class TestYarnNativeServices extends ServiceTestUtils{ public class TestYarnNativeServices extends ServiceTestUtils {
private static final Logger LOG = private static final Logger LOG =
LoggerFactory.getLogger(TestYarnNativeServices.class); LoggerFactory.getLogger(TestYarnNativeServices.class);
private MiniYARNCluster yarnCluster = null;
private MiniDFSCluster hdfsCluster = null;
private FileSystem fs = null;
protected Configuration conf = null;
private static final int NUM_NMS = 1;
private File basedir;
@Rule @Rule
public TemporaryFolder tmpFolder = new TemporaryFolder(); public TemporaryFolder tmpFolder = new TemporaryFolder();
@ -96,135 +70,11 @@ public void setup() throws Exception {
setupInternal(NUM_NMS); setupInternal(NUM_NMS);
} }
private void setupInternal(int numNodeManager)
throws Exception {
LOG.info("Starting up YARN cluster");
// Logger rootLogger = LogManager.getRootLogger();
// rootLogger.setLevel(Level.DEBUG);
conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 128);
// reduce the teardown waiting time
conf.setLong(YarnConfiguration.DISPATCHER_DRAIN_EVENTS_TIMEOUT, 1000);
conf.set("yarn.log.dir", "target");
// mark if we need to launch the v1 timeline server
// disable aux-service based timeline aggregators
conf.set(YarnConfiguration.NM_AUX_SERVICES, "");
conf.set(YarnConfiguration.NM_VMEM_PMEM_RATIO, "8");
// Enable ContainersMonitorImpl
conf.set(YarnConfiguration.NM_CONTAINER_MON_RESOURCE_CALCULATOR,
LinuxResourceCalculatorPlugin.class.getName());
conf.set(YarnConfiguration.NM_CONTAINER_MON_PROCESS_TREE,
ProcfsBasedProcessTree.class.getName());
conf.setBoolean(
YarnConfiguration.YARN_MINICLUSTER_CONTROL_RESOURCE_MONITORING, true);
conf.setBoolean(TIMELINE_SERVICE_ENABLED, false);
conf.setInt(YarnConfiguration.NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE, 100);
conf.setLong(DEBUG_NM_DELETE_DELAY_SEC, 60000);
conf.setLong(AM_RESOURCE_MEM, 526);
conf.setLong(YarnServiceConf.READINESS_CHECK_INTERVAL, 5);
// Disable vmem check to disallow NM killing the container
conf.setBoolean(NM_VMEM_CHECK_ENABLED, false);
conf.setBoolean(NM_PMEM_CHECK_ENABLED, false);
// setup zk cluster
TestingCluster zkCluster;
zkCluster = new TestingCluster(1);
zkCluster.start();
conf.set(YarnConfiguration.RM_ZK_ADDRESS, zkCluster.getConnectString());
conf.set(KEY_REGISTRY_ZK_QUORUM, zkCluster.getConnectString());
LOG.info("ZK cluster: " + zkCluster.getConnectString());
fs = FileSystem.get(conf);
basedir = new File("target", "apps");
if (basedir.exists()) {
FileUtils.deleteDirectory(basedir);
} else {
basedir.mkdirs();
}
conf.set(YARN_SERVICE_BASE_PATH, basedir.getAbsolutePath());
if (yarnCluster == null) {
yarnCluster =
new MiniYARNCluster(TestYarnNativeServices.class.getSimpleName(), 1,
numNodeManager, 1, 1);
yarnCluster.init(conf);
yarnCluster.start();
waitForNMsToRegister();
URL url = Thread.currentThread().getContextClassLoader()
.getResource("yarn-site.xml");
if (url == null) {
throw new RuntimeException(
"Could not find 'yarn-site.xml' dummy file in classpath");
}
Configuration yarnClusterConfig = yarnCluster.getConfig();
yarnClusterConfig.set(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
new File(url.getPath()).getParent());
//write the document to a buffer (not directly to the file, as that
//can cause the file being written to get read -which will then fail.
ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
yarnClusterConfig.writeXml(bytesOut);
bytesOut.close();
//write the bytes to the file in the classpath
OutputStream os = new FileOutputStream(new File(url.getPath()));
os.write(bytesOut.toByteArray());
os.close();
LOG.info("Write yarn-site.xml configs to: " + url);
}
if (hdfsCluster == null) {
HdfsConfiguration hdfsConfig = new HdfsConfiguration();
hdfsCluster = new MiniDFSCluster.Builder(hdfsConfig)
.numDataNodes(1).build();
}
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
LOG.info("setup thread sleep interrupted. message=" + e.getMessage());
}
}
private void waitForNMsToRegister() throws Exception {
int sec = 60;
while (sec >= 0) {
if (yarnCluster.getResourceManager().getRMContext().getRMNodes().size()
>= NUM_NMS) {
break;
}
Thread.sleep(1000);
sec--;
}
}
@After @After
public void tearDown() throws IOException { public void tearDown() throws IOException {
if (yarnCluster != null) { shutdown();
try {
yarnCluster.stop();
} finally {
yarnCluster = null;
}
}
if (hdfsCluster != null) {
try {
hdfsCluster.shutdown();
} finally {
hdfsCluster = null;
}
}
if (basedir != null) {
FileUtils.deleteDirectory(basedir);
}
SliderFileSystem sfs = new SliderFileSystem(conf);
Path appDir = sfs.getBaseApplicationPath();
sfs.getFileSystem().delete(appDir, true);
} }
// End-to-end test to use ServiceClient to deploy a service. // End-to-end test to use ServiceClient to deploy a service.
// 1. Create a service with 2 components, each of which has 2 containers // 1. Create a service with 2 components, each of which has 2 containers
// 2. Flex up each component to 3 containers and check the component instance names // 2. Flex up each component to 3 containers and check the component instance names
@ -237,11 +87,11 @@ public void testCreateFlexStopDestroyService() throws Exception {
ServiceClient client = createClient(); ServiceClient client = createClient();
Service exampleApp = createExampleApplication(); Service exampleApp = createExampleApplication();
client.actionCreate(exampleApp); client.actionCreate(exampleApp);
SliderFileSystem fileSystem = new SliderFileSystem(conf); SliderFileSystem fileSystem = new SliderFileSystem(getConf());
Path appDir = fileSystem.buildClusterDirPath(exampleApp.getName()); Path appDir = fileSystem.buildClusterDirPath(exampleApp.getName());
// check app.json is persisted. // check app.json is persisted.
Assert.assertTrue( Assert.assertTrue(
fs.exists(new Path(appDir, exampleApp.getName() + ".json"))); getFS().exists(new Path(appDir, exampleApp.getName() + ".json")));
waitForAllCompToBeReady(client, exampleApp); waitForAllCompToBeReady(client, exampleApp);
// Flex two components, each from 2 container to 3 containers. // Flex two components, each from 2 container to 3 containers.
@ -277,7 +127,7 @@ public void testCreateFlexStopDestroyService() throws Exception {
//destroy the service and check the app dir is deleted from fs. //destroy the service and check the app dir is deleted from fs.
client.actionDestroy(exampleApp.getName()); client.actionDestroy(exampleApp.getName());
// check the service dir on hdfs (in this case, local fs) are deleted. // check the service dir on hdfs (in this case, local fs) are deleted.
Assert.assertFalse(fs.exists(appDir)); Assert.assertFalse(getFS().exists(appDir));
} }
// Create compa with 2 containers // Create compa with 2 containers
@ -456,7 +306,7 @@ private ServiceClient createClient() throws Exception {
return null; return null;
} }
}; };
client.init(conf); client.init(getConf());
client.start(); client.start();
return client; return client;
} }