YARN-8160. Support upgrade of service that use docker containers.
Contributed by Chandni Singh
This commit is contained in:
parent
d1d129aa9d
commit
4cba0741d5
@ -635,7 +635,8 @@ public ContainerLaunchService.ComponentLaunchContext createLaunchContext(
|
||||
version);
|
||||
launchContext.setArtifact(compSpec.getArtifact())
|
||||
.setConfiguration(compSpec.getConfiguration())
|
||||
.setLaunchCommand(compSpec.getLaunchCommand());
|
||||
.setLaunchCommand(compSpec.getLaunchCommand())
|
||||
.setRunPrivilegedContainer(compSpec.getRunPrivilegedContainer());
|
||||
return launchContext;
|
||||
}
|
||||
|
||||
|
@ -509,7 +509,7 @@ public NodeId getNodeId() {
|
||||
return this.container.getNodeId();
|
||||
}
|
||||
|
||||
public org.apache.hadoop.yarn.service.api.records.Component getCompSpec() {
|
||||
private org.apache.hadoop.yarn.service.api.records.Component getCompSpec() {
|
||||
return component.getComponentSpec();
|
||||
}
|
||||
|
||||
|
@ -18,6 +18,7 @@
|
||||
|
||||
package org.apache.hadoop.yarn.service.containerlaunch;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
@ -254,4 +255,8 @@ public void setRunPrivilegedContainer(boolean runPrivilegedContainer) {
|
||||
this.runPrivilegedContainer = runPrivilegedContainer;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public String getDockerImage() {
|
||||
return dockerImage;
|
||||
}
|
||||
}
|
||||
|
@ -139,6 +139,7 @@ public static class ComponentLaunchContext {
|
||||
private org.apache.hadoop.yarn.service.api.records.Configuration
|
||||
configuration;
|
||||
private String launchCommand;
|
||||
private boolean runPrivilegedContainer;
|
||||
|
||||
public ComponentLaunchContext(String name, String serviceVersion) {
|
||||
this.name = Preconditions.checkNotNull(name);
|
||||
@ -166,6 +167,10 @@ public String getLaunchCommand() {
|
||||
return launchCommand;
|
||||
}
|
||||
|
||||
public boolean isRunPrivilegedContainer() {
|
||||
return runPrivilegedContainer;
|
||||
}
|
||||
|
||||
public ComponentLaunchContext setArtifact(Artifact artifact) {
|
||||
this.artifact = artifact;
|
||||
return this;
|
||||
@ -181,5 +186,11 @@ public ComponentLaunchContext setLaunchCommand(String launchCommand) {
|
||||
this.launchCommand = launchCommand;
|
||||
return this;
|
||||
}
|
||||
|
||||
public ComponentLaunchContext setRunPrivilegedContainer(
|
||||
boolean runPrivilegedContainer) {
|
||||
this.runPrivilegedContainer = runPrivilegedContainer;
|
||||
return this;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -56,7 +56,8 @@ public abstract class AbstractProviderService implements ProviderService,
|
||||
|
||||
public abstract void processArtifact(AbstractLauncher launcher,
|
||||
ComponentInstance compInstance, SliderFileSystem fileSystem,
|
||||
Service service)
|
||||
Service service,
|
||||
ContainerLaunchService.ComponentLaunchContext compLaunchCtx)
|
||||
throws IOException;
|
||||
|
||||
public Map<String, String> buildContainerTokens(ComponentInstance instance,
|
||||
@ -140,7 +141,7 @@ public void buildContainerLaunchContext(AbstractLauncher launcher,
|
||||
SliderFileSystem fileSystem, Configuration yarnConf, Container container,
|
||||
ContainerLaunchService.ComponentLaunchContext compLaunchContext)
|
||||
throws IOException, SliderException {
|
||||
processArtifact(launcher, instance, fileSystem, service);
|
||||
processArtifact(launcher, instance, fileSystem, service, compLaunchContext);
|
||||
|
||||
ServiceContext context =
|
||||
instance.getComponent().getScheduler().getContext();
|
||||
|
@ -19,6 +19,7 @@
|
||||
|
||||
import org.apache.hadoop.yarn.service.api.records.Service;
|
||||
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
|
||||
import org.apache.hadoop.yarn.service.containerlaunch.ContainerLaunchService;
|
||||
import org.apache.hadoop.yarn.service.provider.AbstractProviderService;
|
||||
import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
|
||||
import org.apache.hadoop.yarn.service.containerlaunch.AbstractLauncher;
|
||||
@ -30,7 +31,8 @@ public class DefaultProviderService extends AbstractProviderService {
|
||||
@Override
|
||||
public void processArtifact(AbstractLauncher launcher,
|
||||
ComponentInstance compInstance, SliderFileSystem fileSystem,
|
||||
Service service)
|
||||
Service service,
|
||||
ContainerLaunchService.ComponentLaunchContext compLaunchCtx)
|
||||
throws IOException {
|
||||
}
|
||||
}
|
||||
|
@ -23,7 +23,6 @@
|
||||
import org.apache.hadoop.yarn.service.provider.AbstractProviderService;
|
||||
import org.apache.hadoop.yarn.service.provider.ProviderUtils;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.service.api.records.Component;
|
||||
import org.apache.hadoop.yarn.service.api.records.Service;
|
||||
import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
|
||||
import org.apache.hadoop.yarn.service.containerlaunch.AbstractLauncher;
|
||||
@ -38,34 +37,38 @@
|
||||
public class DockerProviderService extends AbstractProviderService
|
||||
implements DockerKeys {
|
||||
|
||||
@Override
|
||||
public void processArtifact(AbstractLauncher launcher,
|
||||
ComponentInstance compInstance, SliderFileSystem fileSystem,
|
||||
Service service) throws IOException{
|
||||
Service service, ContainerLaunchService.ComponentLaunchContext
|
||||
compLaunchCtx) throws IOException{
|
||||
launcher.setYarnDockerMode(true);
|
||||
launcher.setDockerImage(compInstance.getCompSpec().getArtifact().getId());
|
||||
launcher.setDockerNetwork(compInstance.getCompSpec().getConfiguration()
|
||||
launcher.setDockerImage(compLaunchCtx.getArtifact().getId());
|
||||
launcher.setDockerNetwork(compLaunchCtx.getConfiguration()
|
||||
.getProperty(DOCKER_NETWORK));
|
||||
launcher.setDockerHostname(compInstance.getHostname());
|
||||
launcher.setRunPrivilegedContainer(
|
||||
compInstance.getCompSpec().getRunPrivilegedContainer());
|
||||
compLaunchCtx.isRunPrivilegedContainer());
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if system is default to disable docker override or
|
||||
* user requested a Docker container with ENTRY_POINT support.
|
||||
*
|
||||
* @param component - YARN Service component
|
||||
* @param compLaunchContext - launch context for the component.
|
||||
* @return true if Docker launch command override is disabled
|
||||
*/
|
||||
private boolean checkUseEntryPoint(Component component) {
|
||||
private boolean checkUseEntryPoint(
|
||||
ContainerLaunchService.ComponentLaunchContext compLaunchContext) {
|
||||
boolean overrideDisable = false;
|
||||
String overrideDisableKey = Environment.
|
||||
YARN_CONTAINER_RUNTIME_DOCKER_RUN_OVERRIDE_DISABLE.
|
||||
name();
|
||||
String overrideDisableValue = (component
|
||||
.getConfiguration().getEnv(overrideDisableKey) != null) ?
|
||||
component.getConfiguration().getEnv(overrideDisableKey) :
|
||||
System.getenv(overrideDisableKey);
|
||||
String overrideDisableValue = (
|
||||
compLaunchContext.getConfiguration().getEnv(overrideDisableKey)
|
||||
!= null) ?
|
||||
compLaunchContext.getConfiguration().getEnv(
|
||||
overrideDisableKey) : System.getenv(overrideDisableKey);
|
||||
overrideDisable = Boolean.parseBoolean(overrideDisableValue);
|
||||
return overrideDisable;
|
||||
}
|
||||
@ -77,10 +80,9 @@ public void buildContainerLaunchCommand(AbstractLauncher launcher,
|
||||
ContainerLaunchService.ComponentLaunchContext compLaunchContext,
|
||||
Map<String, String> tokensForSubstitution)
|
||||
throws IOException, SliderException {
|
||||
Component component = instance.getComponent().getComponentSpec();
|
||||
boolean useEntryPoint = checkUseEntryPoint(component);
|
||||
boolean useEntryPoint = checkUseEntryPoint(compLaunchContext);
|
||||
if (useEntryPoint) {
|
||||
String launchCommand = component.getLaunchCommand();
|
||||
String launchCommand = compLaunchContext.getLaunchCommand();
|
||||
if (!StringUtils.isEmpty(launchCommand)) {
|
||||
launcher.addCommand(launchCommand);
|
||||
}
|
||||
|
@ -22,6 +22,7 @@
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
||||
import org.apache.hadoop.yarn.service.api.records.Service;
|
||||
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
|
||||
import org.apache.hadoop.yarn.service.containerlaunch.ContainerLaunchService;
|
||||
import org.apache.hadoop.yarn.service.provider.AbstractProviderService;
|
||||
import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
|
||||
import org.apache.hadoop.yarn.service.containerlaunch.AbstractLauncher;
|
||||
@ -33,9 +34,9 @@ public class TarballProviderService extends AbstractProviderService {
|
||||
@Override
|
||||
public void processArtifact(AbstractLauncher launcher,
|
||||
ComponentInstance instance, SliderFileSystem fileSystem,
|
||||
Service service)
|
||||
throws IOException {
|
||||
Path artifact = new Path(instance.getCompSpec().getArtifact().getId());
|
||||
Service service, ContainerLaunchService.ComponentLaunchContext
|
||||
compLaunchCtx) throws IOException {
|
||||
Path artifact = new Path(compLaunchCtx.getArtifact().getId());
|
||||
if (!fileSystem.isFile(artifact)) {
|
||||
throw new IOException(
|
||||
"Package doesn't exist as a resource: " + artifact);
|
||||
|
@ -101,8 +101,8 @@ public void testContainerReadyAfterUpgrade() throws Exception {
|
||||
instance.handle(new ComponentInstanceEvent(instance.getContainer().getId(),
|
||||
ComponentInstanceEventType.BECOME_READY));
|
||||
Assert.assertEquals("instance not ready", ContainerState.READY,
|
||||
instance.getCompSpec().getContainer(
|
||||
instance.getContainer().getId().toString()).getState());
|
||||
component.getComponentSpec().getContainer(instance.getContainer()
|
||||
.getId().toString()).getState());
|
||||
}
|
||||
|
||||
private void upgradeComponent(Component component) {
|
||||
|
@ -0,0 +1,138 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.service.provider;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.service.MockRunningServiceContext;
|
||||
import org.apache.hadoop.yarn.service.ServiceContext;
|
||||
import org.apache.hadoop.yarn.service.ServiceTestUtils;
|
||||
import org.apache.hadoop.yarn.service.TestServiceManager;
|
||||
import org.apache.hadoop.yarn.service.api.records.Artifact;
|
||||
import org.apache.hadoop.yarn.service.api.records.Service;
|
||||
import org.apache.hadoop.yarn.service.component.Component;
|
||||
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
|
||||
import org.apache.hadoop.yarn.service.containerlaunch.AbstractLauncher;
|
||||
import org.apache.hadoop.yarn.service.containerlaunch.ContainerLaunchService;
|
||||
import org.apache.hadoop.yarn.service.provider.docker.DockerProviderService;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
/**
|
||||
* Tests for {@link AbstractProviderService}
|
||||
*/
|
||||
public class TestAbstractProviderService {
|
||||
|
||||
private ServiceContext serviceContext;
|
||||
private Service testService;
|
||||
private AbstractLauncher launcher;
|
||||
|
||||
@Rule
|
||||
public ServiceTestUtils.ServiceFSWatcher rule =
|
||||
new ServiceTestUtils.ServiceFSWatcher();
|
||||
|
||||
@Before
|
||||
public void setup() throws Exception {
|
||||
testService = TestServiceManager.createBaseDef("testService");
|
||||
serviceContext = new MockRunningServiceContext(rule, testService);
|
||||
launcher = new AbstractLauncher(serviceContext);
|
||||
rule.getFs().setAppDir(new Path("target/testAbstractProviderService"));
|
||||
}
|
||||
|
||||
@After
|
||||
public void teardown() throws Exception {
|
||||
FileUtils.deleteQuietly(
|
||||
new File(rule.getFs().getAppDir().toUri().getPath()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBuildContainerLaunchCommand() throws Exception {
|
||||
AbstractProviderService providerService = new DockerProviderService();
|
||||
Component component = serviceContext.scheduler.getAllComponents().entrySet()
|
||||
.iterator().next().getValue();
|
||||
ContainerLaunchService.ComponentLaunchContext clc =
|
||||
createEntryPointCLCFor(testService, component);
|
||||
|
||||
ComponentInstance instance = component.getAllComponentInstances().iterator()
|
||||
.next();
|
||||
Container container = mock(Container.class);
|
||||
providerService.buildContainerLaunchCommand(launcher, testService, instance,
|
||||
rule.getFs(), serviceContext.scheduler.getConfig(), container, clc,
|
||||
null);
|
||||
|
||||
Assert.assertEquals("commands", Lists.newArrayList(clc.getLaunchCommand()),
|
||||
launcher.getCommands());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBuildContainerLaunchContext() throws Exception {
|
||||
AbstractProviderService providerService = new DockerProviderService();
|
||||
Component component = serviceContext.scheduler.getAllComponents().entrySet()
|
||||
.iterator().next().getValue();
|
||||
ContainerLaunchService.ComponentLaunchContext clc =
|
||||
createEntryPointCLCFor(testService, component);
|
||||
|
||||
ComponentInstance instance = component.getAllComponentInstances().iterator()
|
||||
.next();
|
||||
Container container = mock(Container.class);
|
||||
ContainerId containerId = ContainerId.newContainerId(
|
||||
ApplicationAttemptId.newInstance(ApplicationId.newInstance(
|
||||
System.currentTimeMillis(), 1), 1), 1L);
|
||||
when(container.getId()).thenReturn(containerId);
|
||||
providerService.buildContainerLaunchContext(launcher, testService, instance,
|
||||
rule.getFs(), serviceContext.scheduler.getConfig(), container, clc);
|
||||
|
||||
Assert.assertEquals("artifact", clc.getArtifact().getId(),
|
||||
launcher.getDockerImage());
|
||||
}
|
||||
|
||||
private static ContainerLaunchService.ComponentLaunchContext
|
||||
createEntryPointCLCFor(Service service, Component component) {
|
||||
String launchCmd = "sleep,9000";
|
||||
Artifact artifact = new Artifact();
|
||||
artifact.setType(Artifact.TypeEnum.DOCKER);
|
||||
artifact.setId("example");
|
||||
Map<String, String> env = new HashMap<>();
|
||||
env.put("YARN_CONTAINER_RUNTIME_DOCKER_DELAYED_REMOVAL", "true");
|
||||
env.put("YARN_CONTAINER_RUNTIME_DOCKER_RUN_OVERRIDE_DISABLE", "true");
|
||||
component.getComponentSpec().getConfiguration().setEnv(env);
|
||||
|
||||
return new ContainerLaunchService.ComponentLaunchContext(
|
||||
component.getName(),
|
||||
service.getVersion())
|
||||
.setArtifact(artifact)
|
||||
.setConfiguration(component.getComponentSpec().getConfiguration())
|
||||
.setLaunchCommand(launchCmd);
|
||||
}
|
||||
}
|
@ -43,6 +43,8 @@
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
@ -136,6 +138,8 @@ public class ContainerLaunch implements Callable<Integer> {
|
||||
|
||||
protected final LocalDirsHandlerService dirsHandler;
|
||||
|
||||
private final Lock containerExecLock = new ReentrantLock();
|
||||
|
||||
public ContainerLaunch(Context context, Configuration configuration,
|
||||
Dispatcher dispatcher, ContainerExecutor exec, Application app,
|
||||
Container container, LocalDirsHandlerService dirsHandler,
|
||||
@ -486,7 +490,12 @@ protected int launchContainer(ContainerStartContext ctx)
|
||||
throws IOException, ConfigurationException {
|
||||
int launchPrep = prepareForLaunch(ctx);
|
||||
if (launchPrep == 0) {
|
||||
return exec.launchContainer(ctx);
|
||||
containerExecLock.lock();
|
||||
try {
|
||||
return exec.launchContainer(ctx);
|
||||
} finally {
|
||||
containerExecLock.unlock();
|
||||
}
|
||||
}
|
||||
return launchPrep;
|
||||
}
|
||||
@ -495,7 +504,12 @@ protected int relaunchContainer(ContainerStartContext ctx)
|
||||
throws IOException, ConfigurationException {
|
||||
int launchPrep = prepareForLaunch(ctx);
|
||||
if (launchPrep == 0) {
|
||||
return exec.relaunchContainer(ctx);
|
||||
containerExecLock.lock();
|
||||
try {
|
||||
return exec.relaunchContainer(ctx);
|
||||
} finally {
|
||||
containerExecLock.unlock();
|
||||
}
|
||||
}
|
||||
return launchPrep;
|
||||
}
|
||||
@ -803,18 +817,22 @@ public void cleanupContainer() throws IOException {
|
||||
lfs.delete(pidFilePath.suffix(EXIT_CODE_FILE_SUFFIX), false);
|
||||
}
|
||||
}
|
||||
|
||||
// Reap the container
|
||||
boolean result = exec.reapContainer(
|
||||
new ContainerReapContext.Builder()
|
||||
.setContainer(container)
|
||||
.setUser(container.getUser())
|
||||
.build());
|
||||
if (!result) {
|
||||
throw new IOException("Reap container failed for container "
|
||||
+ containerIdStr);
|
||||
containerExecLock.lock();
|
||||
try {
|
||||
// Reap the container
|
||||
boolean result = exec.reapContainer(
|
||||
new ContainerReapContext.Builder()
|
||||
.setContainer(container)
|
||||
.setUser(container.getUser())
|
||||
.build());
|
||||
if (!result) {
|
||||
throw new IOException("Reap container failed for container "
|
||||
+ containerIdStr);
|
||||
}
|
||||
cleanupContainerFiles(getContainerWorkDir());
|
||||
} finally {
|
||||
containerExecLock.unlock();
|
||||
}
|
||||
cleanupContainerFiles(getContainerWorkDir());
|
||||
}
|
||||
|
||||
/**
|
||||
|
Loading…
Reference in New Issue
Block a user