YARN-1757. NM Recovery. Auxiliary service support. (Jason Lowe via kasha)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1585783 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
38e011f4b1
commit
245012a9d9
@ -21,6 +21,8 @@ Release 2.5.0 - UNRELEASED
|
|||||||
|
|
||||||
NEW FEATURES
|
NEW FEATURES
|
||||||
|
|
||||||
|
YARN-1757. NM Recovery. Auxiliary service support. (Jason Lowe via kasha)
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
|
|
||||||
YARN-1479. Invalid NaN values in Hadoop REST API JSON response (Chen He via
|
YARN-1479. Invalid NaN values in Hadoop REST API JSON response (Chen He via
|
||||||
|
@ -884,6 +884,13 @@ public class YarnConfiguration extends Configuration {
|
|||||||
|
|
||||||
public static final String DEFAULT_NM_USER_HOME_DIR= "/home/";
|
public static final String DEFAULT_NM_USER_HOME_DIR= "/home/";
|
||||||
|
|
||||||
|
public static final String NM_RECOVERY_PREFIX = NM_PREFIX + "recovery.";
|
||||||
|
public static final String NM_RECOVERY_ENABLED =
|
||||||
|
NM_RECOVERY_PREFIX + "enabled";
|
||||||
|
public static final boolean DEFAULT_NM_RECOVERY_ENABLED = false;
|
||||||
|
|
||||||
|
public static final String NM_RECOVERY_DIR = NM_RECOVERY_PREFIX + "dir";
|
||||||
|
|
||||||
////////////////////////////////
|
////////////////////////////////
|
||||||
// Web Proxy Configs
|
// Web Proxy Configs
|
||||||
////////////////////////////////
|
////////////////////////////////
|
||||||
|
@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
|
|||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||||
import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.service.AbstractService;
|
import org.apache.hadoop.service.AbstractService;
|
||||||
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
|
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
|
||||||
@ -38,10 +39,21 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|||||||
@Evolving
|
@Evolving
|
||||||
public abstract class AuxiliaryService extends AbstractService {
|
public abstract class AuxiliaryService extends AbstractService {
|
||||||
|
|
||||||
|
private Path recoveryPath = null;
|
||||||
|
|
||||||
protected AuxiliaryService(String name) {
|
protected AuxiliaryService(String name) {
|
||||||
super(name);
|
super(name);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the path specific to this auxiliary service to use for recovery.
|
||||||
|
*
|
||||||
|
* @return state storage path or null if recovery is not enabled
|
||||||
|
*/
|
||||||
|
protected Path getRecoveryPath() {
|
||||||
|
return recoveryPath;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A new application is started on this NodeManager. This is a signal to
|
* A new application is started on this NodeManager. This is a signal to
|
||||||
* this {@link AuxiliaryService} about the application initialization.
|
* this {@link AuxiliaryService} about the application initialization.
|
||||||
@ -102,4 +114,13 @@ public abstract class AuxiliaryService extends AbstractService {
|
|||||||
public void stopContainer(ContainerTerminationContext stopContainerContext) {
|
public void stopContainer(ContainerTerminationContext stopContainerContext) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the path for this auxiliary service to use for storing state
|
||||||
|
* that will be used during recovery.
|
||||||
|
*
|
||||||
|
* @param recoveryPath where recoverable state should be stored
|
||||||
|
*/
|
||||||
|
public void setRecoveryPath(Path recoveryPath) {
|
||||||
|
this.recoveryPath = recoveryPath;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -1019,6 +1019,19 @@
|
|||||||
<value>500</value>
|
<value>500</value>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<description>Enable the node manager to recover after starting</description>
|
||||||
|
<name>yarn.nodemanager.recovery.enabled</name>
|
||||||
|
<value>false</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<description>The local filesystem directory in which the node manager will
|
||||||
|
store state when recovery is enabled.</description>
|
||||||
|
<name>yarn.nodemanager.recovery.dir</name>
|
||||||
|
<value>${hadoop.tmp.dir}/yarn-nm-recovery</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
<!--Map Reduce configuration-->
|
<!--Map Reduce configuration-->
|
||||||
<property>
|
<property>
|
||||||
<name>yarn.nodemanager.aux-services.mapreduce_shuffle.class</name>
|
<name>yarn.nodemanager.aux-services.mapreduce_shuffle.class</name>
|
||||||
|
@ -28,6 +28,9 @@ import org.apache.commons.logging.Log;
|
|||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||||
import org.apache.hadoop.security.SecurityUtil;
|
import org.apache.hadoop.security.SecurityUtil;
|
||||||
import org.apache.hadoop.service.CompositeService;
|
import org.apache.hadoop.service.CompositeService;
|
||||||
@ -127,6 +130,20 @@ public class NodeManager extends CompositeService
|
|||||||
|
|
||||||
conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
|
conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
|
||||||
|
|
||||||
|
boolean recoveryEnabled = conf.getBoolean(
|
||||||
|
YarnConfiguration.NM_RECOVERY_ENABLED,
|
||||||
|
YarnConfiguration.DEFAULT_NM_RECOVERY_ENABLED);
|
||||||
|
if (recoveryEnabled) {
|
||||||
|
FileSystem recoveryFs = FileSystem.getLocal(conf);
|
||||||
|
String recoveryDirName = conf.get(YarnConfiguration.NM_RECOVERY_DIR);
|
||||||
|
if (recoveryDirName == null) {
|
||||||
|
throw new IllegalArgumentException("Recovery is enabled but " +
|
||||||
|
YarnConfiguration.NM_RECOVERY_DIR + " is not set.");
|
||||||
|
}
|
||||||
|
Path recoveryRoot = new Path(recoveryDirName);
|
||||||
|
recoveryFs.mkdirs(recoveryRoot, new FsPermission((short)0700));
|
||||||
|
}
|
||||||
|
|
||||||
NMContainerTokenSecretManager containerTokenSecretManager =
|
NMContainerTokenSecretManager containerTokenSecretManager =
|
||||||
new NMContainerTokenSecretManager(conf);
|
new NMContainerTokenSecretManager(conf);
|
||||||
|
|
||||||
|
@ -29,15 +29,18 @@ import java.util.regex.Pattern;
|
|||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.service.AbstractService;
|
import org.apache.hadoop.service.AbstractService;
|
||||||
import org.apache.hadoop.service.Service;
|
import org.apache.hadoop.service.Service;
|
||||||
import org.apache.hadoop.service.ServiceStateChangeListener;
|
import org.apache.hadoop.service.ServiceStateChangeListener;
|
||||||
import org.apache.hadoop.util.ReflectionUtils;
|
import org.apache.hadoop.util.ReflectionUtils;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
|
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
|
||||||
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
|
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
|
||||||
import org.apache.hadoop.yarn.server.api.AuxiliaryService;
|
import org.apache.hadoop.yarn.server.api.AuxiliaryService;
|
||||||
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
|
|
||||||
import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
|
import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
|
||||||
import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
|
import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
|
||||||
|
|
||||||
@ -46,6 +49,8 @@ import com.google.common.base.Preconditions;
|
|||||||
public class AuxServices extends AbstractService
|
public class AuxServices extends AbstractService
|
||||||
implements ServiceStateChangeListener, EventHandler<AuxServicesEvent> {
|
implements ServiceStateChangeListener, EventHandler<AuxServicesEvent> {
|
||||||
|
|
||||||
|
static final String STATE_STORE_ROOT_NAME = "nm-aux-services";
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(AuxServices.class);
|
private static final Log LOG = LogFactory.getLog(AuxServices.class);
|
||||||
|
|
||||||
protected final Map<String,AuxiliaryService> serviceMap;
|
protected final Map<String,AuxiliaryService> serviceMap;
|
||||||
@ -91,6 +96,17 @@ public class AuxServices extends AbstractService
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void serviceInit(Configuration conf) throws Exception {
|
public void serviceInit(Configuration conf) throws Exception {
|
||||||
|
final FsPermission storeDirPerms = new FsPermission((short)0700);
|
||||||
|
Path stateStoreRoot = null;
|
||||||
|
FileSystem stateStoreFs = null;
|
||||||
|
boolean recoveryEnabled = conf.getBoolean(
|
||||||
|
YarnConfiguration.NM_RECOVERY_ENABLED,
|
||||||
|
YarnConfiguration.DEFAULT_NM_RECOVERY_ENABLED);
|
||||||
|
if (recoveryEnabled) {
|
||||||
|
stateStoreRoot = new Path(conf.get(YarnConfiguration.NM_RECOVERY_DIR),
|
||||||
|
STATE_STORE_ROOT_NAME);
|
||||||
|
stateStoreFs = FileSystem.getLocal(conf);
|
||||||
|
}
|
||||||
Collection<String> auxNames = conf.getStringCollection(
|
Collection<String> auxNames = conf.getStringCollection(
|
||||||
YarnConfiguration.NM_AUX_SERVICES);
|
YarnConfiguration.NM_AUX_SERVICES);
|
||||||
for (final String sName : auxNames) {
|
for (final String sName : auxNames) {
|
||||||
@ -119,6 +135,11 @@ public class AuxServices extends AbstractService
|
|||||||
+"the name in the config.");
|
+"the name in the config.");
|
||||||
}
|
}
|
||||||
addService(sName, s);
|
addService(sName, s);
|
||||||
|
if (recoveryEnabled) {
|
||||||
|
Path storePath = new Path(stateStoreRoot, sName);
|
||||||
|
stateStoreFs.mkdirs(storePath, storeDirPerms);
|
||||||
|
s.setRecoveryPath(storePath);
|
||||||
|
}
|
||||||
s.init(conf);
|
s.init(conf);
|
||||||
} catch (RuntimeException e) {
|
} catch (RuntimeException e) {
|
||||||
LOG.fatal("Failed to initialize " + sName, e);
|
LOG.fatal("Failed to initialize " + sName, e);
|
||||||
|
@ -26,6 +26,8 @@ import static org.junit.Assert.assertNull;
|
|||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
@ -36,6 +38,10 @@ import org.junit.Assert;
|
|||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.service.Service;
|
import org.apache.hadoop.service.Service;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
@ -56,6 +62,10 @@ import org.junit.Test;
|
|||||||
|
|
||||||
public class TestAuxServices {
|
public class TestAuxServices {
|
||||||
private static final Log LOG = LogFactory.getLog(TestAuxServices.class);
|
private static final Log LOG = LogFactory.getLog(TestAuxServices.class);
|
||||||
|
private static final File TEST_DIR = new File(
|
||||||
|
System.getProperty("test.build.data",
|
||||||
|
System.getProperty("java.io.tmpdir")),
|
||||||
|
TestAuxServices.class.getName());
|
||||||
|
|
||||||
static class LightService extends AuxiliaryService implements Service
|
static class LightService extends AuxiliaryService implements Service
|
||||||
{
|
{
|
||||||
@ -319,4 +329,81 @@ public class TestAuxServices {
|
|||||||
"should only contain a-zA-Z0-9_ and can not start with numbers"));
|
"should only contain a-zA-Z0-9_ and can not start with numbers"));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAuxServiceRecoverySetup() throws IOException {
|
||||||
|
Configuration conf = new YarnConfiguration();
|
||||||
|
conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
|
||||||
|
conf.set(YarnConfiguration.NM_RECOVERY_DIR, TEST_DIR.toString());
|
||||||
|
conf.setStrings(YarnConfiguration.NM_AUX_SERVICES,
|
||||||
|
new String[] { "Asrv", "Bsrv" });
|
||||||
|
conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Asrv"),
|
||||||
|
RecoverableServiceA.class, Service.class);
|
||||||
|
conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Bsrv"),
|
||||||
|
RecoverableServiceB.class, Service.class);
|
||||||
|
try {
|
||||||
|
final AuxServices aux = new AuxServices();
|
||||||
|
aux.init(conf);
|
||||||
|
Assert.assertEquals(2, aux.getServices().size());
|
||||||
|
File auxStorageDir = new File(TEST_DIR,
|
||||||
|
AuxServices.STATE_STORE_ROOT_NAME);
|
||||||
|
Assert.assertEquals(2, auxStorageDir.listFiles().length);
|
||||||
|
aux.close();
|
||||||
|
} finally {
|
||||||
|
FileUtil.fullyDelete(TEST_DIR);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static class RecoverableAuxService extends AuxiliaryService {
|
||||||
|
static final FsPermission RECOVERY_PATH_PERMS =
|
||||||
|
new FsPermission((short)0700);
|
||||||
|
|
||||||
|
String auxName;
|
||||||
|
|
||||||
|
RecoverableAuxService(String name, String auxName) {
|
||||||
|
super(name);
|
||||||
|
this.auxName = auxName;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void serviceInit(Configuration conf) throws Exception {
|
||||||
|
super.serviceInit(conf);
|
||||||
|
Path storagePath = getRecoveryPath();
|
||||||
|
Assert.assertNotNull("Recovery path not present when aux service inits",
|
||||||
|
storagePath);
|
||||||
|
Assert.assertTrue(storagePath.toString().contains(auxName));
|
||||||
|
FileSystem fs = FileSystem.getLocal(conf);
|
||||||
|
Assert.assertTrue("Recovery path does not exist",
|
||||||
|
fs.exists(storagePath));
|
||||||
|
Assert.assertEquals("Recovery path has wrong permissions",
|
||||||
|
new FsPermission((short)0700),
|
||||||
|
fs.getFileStatus(storagePath).getPermission());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void initializeApplication(
|
||||||
|
ApplicationInitializationContext initAppContext) {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void stopApplication(ApplicationTerminationContext stopAppContext) {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ByteBuffer getMetaData() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static class RecoverableServiceA extends RecoverableAuxService {
|
||||||
|
RecoverableServiceA() {
|
||||||
|
super("RecoverableServiceA", "Asrv");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static class RecoverableServiceB extends RecoverableAuxService {
|
||||||
|
RecoverableServiceB() {
|
||||||
|
super("RecoverableServiceB", "Bsrv");
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user