Merge trunk into HA branch.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1209169 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Aaron Myers 2011-12-01 18:15:46 +00:00
commit 2033b52673
17 changed files with 199 additions and 108 deletions

View File

@ -107,6 +107,13 @@
<directory>${project.build.directory}/site</directory> <directory>${project.build.directory}/site</directory>
<outputDirectory>/share/doc/hadoop/${hadoop.component}</outputDirectory> <outputDirectory>/share/doc/hadoop/${hadoop.component}</outputDirectory>
</fileSet> </fileSet>
<fileSet>
<directory>${basedir}/src/main/native</directory>
<includes>
<include>*.h</include>
</includes>
<outputDirectory>/include</outputDirectory>
</fileSet>
</fileSets> </fileSets>
<dependencySets> <dependencySets>
<dependencySet> <dependencySet>

View File

@ -110,10 +110,6 @@ public AppConfigurationEntry[] getAppConfigurationEntry(String appName) {
} }
} }
static {
javax.security.auth.login.Configuration.setConfiguration(new KerberosConfiguration());
}
private URL url; private URL url;
private HttpURLConnection conn; private HttpURLConnection conn;
private Base64 base64; private Base64 base64;
@ -187,7 +183,8 @@ private void doSpnegoSequence(AuthenticatedURL.Token token) throws IOException,
Subject subject = Subject.getSubject(context); Subject subject = Subject.getSubject(context);
if (subject == null) { if (subject == null) {
subject = new Subject(); subject = new Subject();
LoginContext login = new LoginContext("", subject); LoginContext login = new LoginContext("", subject,
null, new KerberosConfiguration());
login.login(); login.login();
} }
Subject.doAs(subject, new PrivilegedExceptionAction<Void>() { Subject.doAs(subject, new PrivilegedExceptionAction<Void>() {

View File

@ -97,6 +97,11 @@ Trunk (unreleased changes)
HADOOP-7833. Fix findbugs warnings in protobuf generated code. HADOOP-7833. Fix findbugs warnings in protobuf generated code.
(John Lee via suresh) (John Lee via suresh)
HADOOP-7853. multiple javax security configurations cause conflicts.
(daryn via tucu)
HDFS-2614. hadoop dist tarball is missing hdfs headers. (tucu)
OPTIMIZATIONS OPTIMIZATIONS
HADOOP-7761. Improve the performance of raw comparisons. (todd) HADOOP-7761. Improve the performance of raw comparisons. (todd)
@ -147,6 +152,8 @@ Release 0.23.1 - Unreleased
HADOOP-7864. Building mvn site with Maven < 3.0.2 causes OOM errors. HADOOP-7864. Building mvn site with Maven < 3.0.2 causes OOM errors.
(Andrew Bayer via eli) (Andrew Bayer via eli)
HADOOP-7854. UGI getCurrentUser is not synchronized. (Daryn Sharp via jitendra)
Release 0.23.0 - 2011-11-01 Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -70,7 +70,7 @@ private static KerberosTicket getTgtFromSubject() throws IOException {
if (isOriginalTGT(t)) if (isOriginalTGT(t))
return t; return t;
} }
throw new IOException("Failed to find TGT from current Subject"); throw new IOException("Failed to find TGT from current Subject:"+current);
} }
/** /**

View File

@ -118,18 +118,30 @@ private <T extends Principal> T getCanonicalUser(Class<T> cls) {
@Override @Override
public boolean commit() throws LoginException { public boolean commit() throws LoginException {
if (LOG.isDebugEnabled()) {
LOG.debug("hadoop login commit");
}
// if we already have a user, we are done. // if we already have a user, we are done.
if (!subject.getPrincipals(User.class).isEmpty()) { if (!subject.getPrincipals(User.class).isEmpty()) {
if (LOG.isDebugEnabled()) {
LOG.debug("using existing subject:"+subject.getPrincipals());
}
return true; return true;
} }
Principal user = null; Principal user = null;
// if we are using kerberos, try it out // if we are using kerberos, try it out
if (useKerberos) { if (useKerberos) {
user = getCanonicalUser(KerberosPrincipal.class); user = getCanonicalUser(KerberosPrincipal.class);
if (LOG.isDebugEnabled()) {
LOG.debug("using kerberos user:"+user);
}
} }
// if we don't have a kerberos user, use the OS user // if we don't have a kerberos user, use the OS user
if (user == null) { if (user == null) {
user = getCanonicalUser(OS_PRINCIPAL_CLASS); user = getCanonicalUser(OS_PRINCIPAL_CLASS);
if (LOG.isDebugEnabled()) {
LOG.debug("using local user:"+user);
}
} }
// if we found the user, add our principal // if we found the user, add our principal
if (user != null) { if (user != null) {
@ -148,11 +160,17 @@ public void initialize(Subject subject, CallbackHandler callbackHandler,
@Override @Override
public boolean login() throws LoginException { public boolean login() throws LoginException {
if (LOG.isDebugEnabled()) {
LOG.debug("hadoop login");
}
return true; return true;
} }
@Override @Override
public boolean logout() throws LoginException { public boolean logout() throws LoginException {
if (LOG.isDebugEnabled()) {
LOG.debug("hadoop logout");
}
return true; return true;
} }
} }
@ -220,26 +238,6 @@ private static synchronized void initUGI(Configuration conf) {
if (!(groups instanceof TestingGroups)) { if (!(groups instanceof TestingGroups)) {
groups = Groups.getUserToGroupsMappingService(conf); groups = Groups.getUserToGroupsMappingService(conf);
} }
// Set the configuration for JAAS to be the Hadoop configuration.
// This is done here rather than a static initializer to avoid a
// circular dependence.
javax.security.auth.login.Configuration existingConfig = null;
try {
existingConfig =
javax.security.auth.login.Configuration.getConfiguration();
} catch (SecurityException se) {
// If no security configuration is on the classpath, then
// we catch this exception, and we don't need to delegate
// to anyone
}
if (existingConfig instanceof HadoopConfiguration) {
LOG.info("JAAS Configuration already set up for Hadoop, not re-installing.");
} else {
javax.security.auth.login.Configuration.setConfiguration(
new HadoopConfiguration(existingConfig));
}
isInitialized = true; isInitialized = true;
UserGroupInformation.conf = conf; UserGroupInformation.conf = conf;
} }
@ -398,12 +396,6 @@ private static class HadoopConfiguration
private static final AppConfigurationEntry[] KEYTAB_KERBEROS_CONF = private static final AppConfigurationEntry[] KEYTAB_KERBEROS_CONF =
new AppConfigurationEntry[]{KEYTAB_KERBEROS_LOGIN, HADOOP_LOGIN}; new AppConfigurationEntry[]{KEYTAB_KERBEROS_LOGIN, HADOOP_LOGIN};
private final javax.security.auth.login.Configuration parent;
HadoopConfiguration(javax.security.auth.login.Configuration parent) {
this.parent = parent;
}
@Override @Override
public AppConfigurationEntry[] getAppConfigurationEntry(String appName) { public AppConfigurationEntry[] getAppConfigurationEntry(String appName) {
if (SIMPLE_CONFIG_NAME.equals(appName)) { if (SIMPLE_CONFIG_NAME.equals(appName)) {
@ -414,13 +406,16 @@ public AppConfigurationEntry[] getAppConfigurationEntry(String appName) {
KEYTAB_KERBEROS_OPTIONS.put("keyTab", keytabFile); KEYTAB_KERBEROS_OPTIONS.put("keyTab", keytabFile);
KEYTAB_KERBEROS_OPTIONS.put("principal", keytabPrincipal); KEYTAB_KERBEROS_OPTIONS.put("principal", keytabPrincipal);
return KEYTAB_KERBEROS_CONF; return KEYTAB_KERBEROS_CONF;
} else if (parent != null) {
return parent.getAppConfigurationEntry(appName);
} }
return null; return null;
} }
} }
private static LoginContext
newLoginContext(String appName, Subject subject) throws LoginException {
return new LoginContext(appName, subject, null, new HadoopConfiguration());
}
private LoginContext getLogin() { private LoginContext getLogin() {
return user.getLogin(); return user.getLogin();
} }
@ -454,7 +449,8 @@ public boolean hasKerberosCredentials() {
* @return the current user * @return the current user
* @throws IOException if login fails * @throws IOException if login fails
*/ */
public static UserGroupInformation getCurrentUser() throws IOException { public synchronized
static UserGroupInformation getCurrentUser() throws IOException {
AccessControlContext context = AccessController.getContext(); AccessControlContext context = AccessController.getContext();
Subject subject = Subject.getSubject(context); Subject subject = Subject.getSubject(context);
if (subject == null || subject.getPrincipals(User.class).isEmpty()) { if (subject == null || subject.getPrincipals(User.class).isEmpty()) {
@ -476,10 +472,10 @@ static UserGroupInformation getLoginUser() throws IOException {
Subject subject = new Subject(); Subject subject = new Subject();
LoginContext login; LoginContext login;
if (isSecurityEnabled()) { if (isSecurityEnabled()) {
login = new LoginContext(HadoopConfiguration.USER_KERBEROS_CONFIG_NAME, login = newLoginContext(HadoopConfiguration.USER_KERBEROS_CONFIG_NAME,
subject); subject);
} else { } else {
login = new LoginContext(HadoopConfiguration.SIMPLE_CONFIG_NAME, login = newLoginContext(HadoopConfiguration.SIMPLE_CONFIG_NAME,
subject); subject);
} }
login.login(); login.login();
@ -503,6 +499,9 @@ static UserGroupInformation getLoginUser() throws IOException {
} catch (LoginException le) { } catch (LoginException le) {
throw new IOException("failure to login", le); throw new IOException("failure to login", le);
} }
if (LOG.isDebugEnabled()) {
LOG.debug("UGI loginUser:"+loginUser);
}
} }
return loginUser; return loginUser;
} }
@ -616,7 +615,7 @@ static void loginUserFromKeytab(String user,
long start = 0; long start = 0;
try { try {
login = login =
new LoginContext(HadoopConfiguration.KEYTAB_KERBEROS_CONFIG_NAME, subject); newLoginContext(HadoopConfiguration.KEYTAB_KERBEROS_CONFIG_NAME, subject);
start = System.currentTimeMillis(); start = System.currentTimeMillis();
login.login(); login.login();
metrics.loginSuccess.add(System.currentTimeMillis() - start); metrics.loginSuccess.add(System.currentTimeMillis() - start);
@ -695,7 +694,7 @@ public synchronized void reloginFromKeytab()
login.logout(); login.logout();
// login and also update the subject field of this instance to // login and also update the subject field of this instance to
// have the new credentials (pass it to the LoginContext constructor) // have the new credentials (pass it to the LoginContext constructor)
login = new LoginContext( login = newLoginContext(
HadoopConfiguration.KEYTAB_KERBEROS_CONFIG_NAME, getSubject()); HadoopConfiguration.KEYTAB_KERBEROS_CONFIG_NAME, getSubject());
LOG.info("Initiating re-login for " + keytabPrincipal); LOG.info("Initiating re-login for " + keytabPrincipal);
start = System.currentTimeMillis(); start = System.currentTimeMillis();
@ -744,7 +743,7 @@ public synchronized void reloginFromTicketCache()
//login and also update the subject field of this instance to //login and also update the subject field of this instance to
//have the new credentials (pass it to the LoginContext constructor) //have the new credentials (pass it to the LoginContext constructor)
login = login =
new LoginContext(HadoopConfiguration.USER_KERBEROS_CONFIG_NAME, newLoginContext(HadoopConfiguration.USER_KERBEROS_CONFIG_NAME,
getSubject()); getSubject());
LOG.info("Initiating re-login for " + getUserName()); LOG.info("Initiating re-login for " + getUserName());
login.login(); login.login();
@ -781,7 +780,7 @@ static UserGroupInformation loginUserFromKeytabAndReturnUGI(String user,
Subject subject = new Subject(); Subject subject = new Subject();
LoginContext login = LoginContext login =
new LoginContext(HadoopConfiguration.KEYTAB_KERBEROS_CONFIG_NAME, subject); newLoginContext(HadoopConfiguration.KEYTAB_KERBEROS_CONFIG_NAME, subject);
start = System.currentTimeMillis(); start = System.currentTimeMillis();
login.login(); login.login();
@ -1053,11 +1052,12 @@ public synchronized String[] getGroupNames() {
*/ */
@Override @Override
public String toString() { public String toString() {
StringBuilder sb = new StringBuilder(getUserName());
sb.append(" (auth:"+getAuthenticationMethod()+")");
if (getRealUser() != null) { if (getRealUser() != null) {
return getUserName() + " via " + getRealUser().toString(); sb.append(" via ").append(getRealUser().toString());
} else {
return getUserName();
} }
return sb.toString();
} }
/** /**
@ -1132,6 +1132,7 @@ protected Subject getSubject() {
* @return the value from the run method * @return the value from the run method
*/ */
public <T> T doAs(PrivilegedAction<T> action) { public <T> T doAs(PrivilegedAction<T> action) {
logPrivilegedAction(subject, action);
return Subject.doAs(subject, action); return Subject.doAs(subject, action);
} }
@ -1149,9 +1150,11 @@ public <T> T doAs(PrivilegedAction<T> action) {
public <T> T doAs(PrivilegedExceptionAction<T> action public <T> T doAs(PrivilegedExceptionAction<T> action
) throws IOException, InterruptedException { ) throws IOException, InterruptedException {
try { try {
logPrivilegedAction(subject, action);
return Subject.doAs(subject, action); return Subject.doAs(subject, action);
} catch (PrivilegedActionException pae) { } catch (PrivilegedActionException pae) {
Throwable cause = pae.getCause(); Throwable cause = pae.getCause();
LOG.error("PriviledgedActionException as:"+this+" cause:"+cause);
if (cause instanceof IOException) { if (cause instanceof IOException) {
throw (IOException) cause; throw (IOException) cause;
} else if (cause instanceof Error) { } else if (cause instanceof Error) {
@ -1166,6 +1169,14 @@ public <T> T doAs(PrivilegedExceptionAction<T> action
} }
} }
private void logPrivilegedAction(Subject subject, Object action) {
if (LOG.isDebugEnabled()) {
// would be nice if action included a descriptive toString()
String where = new Throwable().getStackTrace()[2].toString();
LOG.debug("PrivilegedAction as:"+this+" from:"+where);
}
}
private void print() throws IOException { private void print() throws IOException {
System.out.println("User: " + getUserName()); System.out.println("User: " + getUserName());
System.out.print("Group Ids: "); System.out.print("Group Ids: ");

View File

@ -116,8 +116,9 @@ public UserGroupInformation run() throws IOException {
return UserGroupInformation.getCurrentUser(); return UserGroupInformation.getCurrentUser();
} }
}); });
Assert.assertTrue(curUGI.toString().equals( Assert.assertEquals(
PROXY_USER_NAME + " via " + REAL_USER_NAME)); PROXY_USER_NAME + " (auth:PROXY) via " + REAL_USER_NAME + " (auth:SIMPLE)",
curUGI.toString());
} }
@TokenInfo(TestTokenSelector.class) @TokenInfo(TestTokenSelector.class)
@ -174,7 +175,7 @@ public String run() throws IOException {
} }
}); });
Assert.assertEquals(PROXY_USER_NAME + " via " + REAL_USER_NAME, retVal); Assert.assertEquals(PROXY_USER_NAME + " (auth:SIMPLE) via " + REAL_USER_NAME + " (auth:SIMPLE)", retVal);
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
Assert.fail(); Assert.fail();
@ -216,7 +217,7 @@ public String run() throws IOException {
} }
}); });
Assert.assertEquals(PROXY_USER_NAME + " via " + REAL_USER_NAME, retVal); Assert.assertEquals(PROXY_USER_NAME + " (auth:SIMPLE) via " + REAL_USER_NAME + " (auth:SIMPLE)", retVal);
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
Assert.fail(); Assert.fail();
@ -446,7 +447,7 @@ public String run() throws Exception {
} }
}); });
//The user returned by server must be the one in the token. //The user returned by server must be the one in the token.
Assert.assertEquals(REAL_USER_NAME + " via SomeSuperUser", retVal); Assert.assertEquals(REAL_USER_NAME + " (auth:TOKEN) via SomeSuperUser (auth:SIMPLE)", retVal);
} }
/* /*
@ -498,7 +499,7 @@ public String run() throws Exception {
} }
} }
}); });
String expected = REAL_USER_NAME + " via SomeSuperUser"; String expected = REAL_USER_NAME + " (auth:TOKEN) via SomeSuperUser (auth:SIMPLE)";
Assert.assertEquals(retVal + "!=" + expected, expected, retVal); Assert.assertEquals(retVal + "!=" + expected, expected, retVal);
} }

View File

@ -17,6 +17,8 @@
package org.apache.hadoop.security; package org.apache.hadoop.security;
import static org.junit.Assert.*; import static org.junit.Assert.*;
import org.junit.*;
import org.mockito.Mockito; import org.mockito.Mockito;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
@ -32,9 +34,6 @@
import javax.security.auth.login.AppConfigurationEntry; import javax.security.auth.login.AppConfigurationEntry;
import javax.security.auth.login.LoginContext; import javax.security.auth.login.LoginContext;
import junit.framework.Assert;
import org.junit.Test;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
@ -50,17 +49,32 @@ public class TestUserGroupInformation {
final private static String[] GROUP_NAMES = final private static String[] GROUP_NAMES =
new String[]{GROUP1_NAME, GROUP2_NAME, GROUP3_NAME}; new String[]{GROUP1_NAME, GROUP2_NAME, GROUP3_NAME};
private static javax.security.auth.login.Configuration mockJaasConf; /**
* UGI should not use the default security conf, else it will collide
static { * with other classes that may change the default conf. Using this dummy
setupMockJaasParent(); * class that simply throws an exception will ensure that the tests fail
* if UGI uses the static default config instead of its own config
*/
private static class DummyLoginConfiguration extends
javax.security.auth.login.Configuration
{
@Override
public AppConfigurationEntry[] getAppConfigurationEntry(String name) {
throw new RuntimeException("UGI is not using its own security conf!");
}
}
/** configure ugi */
@BeforeClass
public static void setup() {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.set("hadoop.security.auth_to_local", conf.set("hadoop.security.auth_to_local",
"RULE:[2:$1@$0](.*@HADOOP.APACHE.ORG)s/@.*//" + "RULE:[2:$1@$0](.*@HADOOP.APACHE.ORG)s/@.*//" +
"RULE:[1:$1@$0](.*@HADOOP.APACHE.ORG)s/@.*//" "RULE:[1:$1@$0](.*@HADOOP.APACHE.ORG)s/@.*//"
+ "DEFAULT"); + "DEFAULT");
UserGroupInformation.setConfiguration(conf); UserGroupInformation.setConfiguration(conf);
javax.security.auth.login.Configuration.setConfiguration(
new DummyLoginConfiguration());
} }
/** Test login method */ /** Test login method */
@ -351,37 +365,6 @@ public static void verifyLoginMetrics(long success, int failure)
} }
} }
/**
* Setup a JAAS Configuration that handles a fake app.
* This runs before UserGroupInformation has been initialized,
* so UGI picks up this Configuration as the parent.
*/
private static void setupMockJaasParent() {
javax.security.auth.login.Configuration existing = null;
try {
existing =javax.security.auth.login.Configuration.getConfiguration();
assertFalse("setupMockJaasParent should run before the Hadoop " +
"configuration provider is installed.",
existing.getClass().getCanonicalName()
.startsWith("org.apache.hadoop"));
} catch (SecurityException se) {
// We get this if no configuration has been set. So it's OK.
}
mockJaasConf = mock(javax.security.auth.login.Configuration.class);
Mockito.doReturn(new AppConfigurationEntry[] {})
.when(mockJaasConf)
.getAppConfigurationEntry("foobar-app");
javax.security.auth.login.Configuration.setConfiguration(mockJaasConf);
}
@Test
public void testDelegateJaasConfiguration() throws Exception {
// This will throw if the Configuration doesn't have any entries
// for "foobar"
LoginContext login = new LoginContext("foobar-app");
}
/** /**
* Test for the case that UserGroupInformation.getCurrentUser() * Test for the case that UserGroupInformation.getCurrentUser()
* is called when the AccessControlContext has a Subject associated * is called when the AccessControlContext has a Subject associated

View File

@ -66,6 +66,8 @@ Trunk (unreleased changes)
MAPREDUCE-3014. Rename and invert logic of '-cbuild' profile to 'native' and off MAPREDUCE-3014. Rename and invert logic of '-cbuild' profile to 'native' and off
by default. (tucu) by default. (tucu)
MAPREDUCE-3477. Hadoop site documentation cannot be built anymore. (jeagles via tucu)
Release 0.23.1 - Unreleased Release 0.23.1 - Unreleased
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES
@ -200,6 +202,15 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3450. NM port info no longer available in JobHistory. MAPREDUCE-3450. NM port info no longer available in JobHistory.
(Siddharth Seth via mahadev) (Siddharth Seth via mahadev)
MAPREDUCE-3488. Streaming jobs are failing because the main class
isnt set in the pom files. (mahadev)
MAPREDUCE-3463. Second AM fails to recover properly when first AM is killed with
java.lang.IllegalArgumentException causing lost job. (Siddharth Seth via mahadev)
MAPREDUCE-3452. fifoscheduler web ui page always shows 0% used for the queue.
(Jonathan Eagles via mahadev)
Release 0.23.0 - 2011-11-01 Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -217,8 +217,7 @@ public void init(final Configuration conf) {
&& appAttemptID.getAttemptId() > 1) { && appAttemptID.getAttemptId() > 1) {
LOG.info("Recovery is enabled. " LOG.info("Recovery is enabled. "
+ "Will try to recover from previous life on best effort basis."); + "Will try to recover from previous life on best effort basis.");
recoveryServ = new RecoveryService(appAttemptID, clock, recoveryServ = createRecoveryService(context);
committer);
addIfService(recoveryServ); addIfService(recoveryServ);
dispatcher = recoveryServ.getDispatcher(); dispatcher = recoveryServ.getDispatcher();
clock = recoveryServ.getClock(); clock = recoveryServ.getClock();
@ -425,6 +424,15 @@ protected EventHandler<JobFinishEvent> createJobFinishEventHandler() {
return new JobFinishEventHandler(); return new JobFinishEventHandler();
} }
/**
* Create the recovery service.
* @return an instance of the recovery service.
*/
protected Recovery createRecoveryService(AppContext appContext) {
return new RecoveryService(appContext.getApplicationAttemptId(),
appContext.getClock(), getCommitter());
}
/** Create and initialize (but don't start) a single job. */ /** Create and initialize (but don't start) a single job. */
protected Job createJob(Configuration conf) { protected Job createJob(Configuration conf) {

View File

@ -76,8 +76,6 @@
import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.service.CompositeService; import org.apache.hadoop.yarn.service.CompositeService;
import org.apache.hadoop.yarn.service.Service; import org.apache.hadoop.yarn.service.Service;
import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.BuilderUtils;
@ -97,8 +95,6 @@
public class RecoveryService extends CompositeService implements Recovery { public class RecoveryService extends CompositeService implements Recovery {
private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
private static final Log LOG = LogFactory.getLog(RecoveryService.class); private static final Log LOG = LogFactory.getLog(RecoveryService.class);
private final ApplicationAttemptId applicationAttemptId; private final ApplicationAttemptId applicationAttemptId;
@ -120,7 +116,7 @@ public RecoveryService(ApplicationAttemptId applicationAttemptId,
super("RecoveringDispatcher"); super("RecoveringDispatcher");
this.applicationAttemptId = applicationAttemptId; this.applicationAttemptId = applicationAttemptId;
this.committer = committer; this.committer = committer;
this.dispatcher = new RecoveryDispatcher(); this.dispatcher = createRecoveryDispatcher();
this.clock = new ControlledClock(clock); this.clock = new ControlledClock(clock);
addService((Service) dispatcher); addService((Service) dispatcher);
} }
@ -209,17 +205,32 @@ private void parse() throws IOException {
LOG.info("Read completed tasks from history " LOG.info("Read completed tasks from history "
+ completedTasks.size()); + completedTasks.size());
} }
protected Dispatcher createRecoveryDispatcher() {
return new RecoveryDispatcher();
}
protected Dispatcher createRecoveryDispatcher(boolean exitOnException) {
return new RecoveryDispatcher(exitOnException);
}
@SuppressWarnings("rawtypes")
class RecoveryDispatcher extends AsyncDispatcher { class RecoveryDispatcher extends AsyncDispatcher {
private final EventHandler actualHandler; private final EventHandler actualHandler;
private final EventHandler handler; private final EventHandler handler;
RecoveryDispatcher() { RecoveryDispatcher(boolean exitOnException) {
super(exitOnException);
actualHandler = super.getEventHandler(); actualHandler = super.getEventHandler();
handler = new InterceptingEventHandler(actualHandler); handler = new InterceptingEventHandler(actualHandler);
} }
RecoveryDispatcher() {
this(false);
}
@Override @Override
@SuppressWarnings("unchecked")
public void dispatch(Event event) { public void dispatch(Event event) {
if (recoveryMode) { if (recoveryMode) {
if (event.getType() == TaskAttemptEventType.TA_CONTAINER_LAUNCHED) { if (event.getType() == TaskAttemptEventType.TA_CONTAINER_LAUNCHED) {
@ -267,6 +278,10 @@ else if (event.getType() == TaskEventType.T_ATTEMPT_FAILED
} }
} }
} }
realDispatch(event);
}
public void realDispatch(Event event) {
super.dispatch(event); super.dispatch(event);
} }
@ -281,6 +296,7 @@ private TaskAttemptInfo getTaskAttemptInfo(TaskAttemptId id) {
return taskInfo.getAllTaskAttempts().get(TypeConverter.fromYarn(id)); return taskInfo.getAllTaskAttempts().get(TypeConverter.fromYarn(id));
} }
@SuppressWarnings({"rawtypes", "unchecked"})
private class InterceptingEventHandler implements EventHandler { private class InterceptingEventHandler implements EventHandler {
EventHandler actualHandler; EventHandler actualHandler;
@ -407,7 +423,9 @@ private void sendAssignedEvent(TaskAttemptId yarnAttemptID,
LOG.info("Sending assigned event to " + yarnAttemptID); LOG.info("Sending assigned event to " + yarnAttemptID);
ContainerId cId = attemptInfo.getContainerId(); ContainerId cId = attemptInfo.getContainerId();
NodeId nodeId = ConverterUtils.toNodeId(attemptInfo.getHostname()); NodeId nodeId =
ConverterUtils.toNodeId(attemptInfo.getHostname() + ":"
+ attemptInfo.getPort());
// Resource/Priority/ApplicationACLs are only needed while launching the // Resource/Priority/ApplicationACLs are only needed while launching the
// container on an NM, these are already completed tasks, so setting them // container on an NM, these are already completed tasks, so setting them
// to null // to null

View File

@ -52,7 +52,12 @@
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
import org.apache.hadoop.mapreduce.v2.app.recover.Recovery;
import org.apache.hadoop.mapreduce.v2.app.recover.RecoveryService;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.junit.Test; import org.junit.Test;
@ -407,6 +412,13 @@ public MRAppWithHistory(int maps, int reduces, boolean autoComplete,
super(maps, reduces, autoComplete, testName, cleanOnStart, startCount); super(maps, reduces, autoComplete, testName, cleanOnStart, startCount);
} }
@Override
protected Recovery createRecoveryService(AppContext appContext) {
return new RecoveryServiceWithCustomDispatcher(
appContext.getApplicationAttemptId(), appContext.getClock(),
getCommitter());
}
@Override @Override
protected ContainerLauncher createContainerLauncher(AppContext context) { protected ContainerLauncher createContainerLauncher(AppContext context) {
MockContainerLauncher launcher = new MockContainerLauncher(); MockContainerLauncher launcher = new MockContainerLauncher();
@ -422,7 +434,22 @@ protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
return eventHandler; return eventHandler;
} }
} }
class RecoveryServiceWithCustomDispatcher extends RecoveryService {
public RecoveryServiceWithCustomDispatcher(
ApplicationAttemptId applicationAttemptId, Clock clock,
OutputCommitter committer) {
super(applicationAttemptId, clock, committer);
}
@Override
public Dispatcher createRecoveryDispatcher() {
return super.createRecoveryDispatcher(false);
}
}
public static void main(String[] arg) throws Exception { public static void main(String[] arg) throws Exception {
TestRecovery test = new TestRecovery(); TestRecovery test = new TestRecovery();
test.testCrashed(); test.testCrashed();

View File

@ -45,18 +45,25 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
private Thread eventHandlingThread; private Thread eventHandlingThread;
protected final Map<Class<? extends Enum>, EventHandler> eventDispatchers; protected final Map<Class<? extends Enum>, EventHandler> eventDispatchers;
private boolean exitOnDispatchException;
public AsyncDispatcher() { public AsyncDispatcher() {
this(new HashMap<Class<? extends Enum>, EventHandler>(), this(new HashMap<Class<? extends Enum>, EventHandler>(),
new LinkedBlockingQueue<Event>()); new LinkedBlockingQueue<Event>(), true);
}
public AsyncDispatcher(boolean exitOnException) {
this(new HashMap<Class<? extends Enum>, EventHandler>(),
new LinkedBlockingQueue<Event>(), exitOnException);
} }
AsyncDispatcher( AsyncDispatcher(
Map<Class<? extends Enum>, EventHandler> eventDispatchers, Map<Class<? extends Enum>, EventHandler> eventDispatchers,
BlockingQueue<Event> eventQueue) { BlockingQueue<Event> eventQueue, boolean exitOnException) {
super("Dispatcher"); super("Dispatcher");
this.eventQueue = eventQueue; this.eventQueue = eventQueue;
this.eventDispatchers = eventDispatchers; this.eventDispatchers = eventDispatchers;
this.exitOnDispatchException = exitOnException;
} }
Runnable createThread() { Runnable createThread() {
@ -118,7 +125,9 @@ protected void dispatch(Event event) {
catch (Throwable t) { catch (Throwable t) {
//TODO Maybe log the state of the queue //TODO Maybe log the state of the queue
LOG.fatal("Error in dispatcher thread. Exiting..", t); LOG.fatal("Error in dispatcher thread. Exiting..", t);
System.exit(-1); if (exitOnDispatchException) {
System.exit(-1);
}
} }
} }

View File

@ -36,7 +36,7 @@ public DrainDispatcher() {
} }
private DrainDispatcher(BlockingQueue<Event> eventQueue) { private DrainDispatcher(BlockingQueue<Event> eventQueue) {
super(new HashMap<Class<? extends Enum>, EventHandler>(), eventQueue); super(new HashMap<Class<? extends Enum>, EventHandler>(), eventQueue, true);
this.queue = eventQueue; this.queue = eventQueue;
} }

View File

@ -145,8 +145,9 @@ public QueueInfo getQueueInfo(
boolean includeChildQueues, boolean recursive) { boolean includeChildQueues, boolean recursive) {
QueueInfo queueInfo = recordFactory.newRecordInstance(QueueInfo.class); QueueInfo queueInfo = recordFactory.newRecordInstance(QueueInfo.class);
queueInfo.setQueueName(DEFAULT_QUEUE.getQueueName()); queueInfo.setQueueName(DEFAULT_QUEUE.getQueueName());
queueInfo.setCapacity(100.0f); queueInfo.setCapacity(1.0f);
queueInfo.setMaximumCapacity(100.0f); queueInfo.setCurrentCapacity((float)usedResource.getMemory() / clusterResource.getMemory());
queueInfo.setMaximumCapacity(1.0f);
queueInfo.setChildQueues(new ArrayList<QueueInfo>()); queueInfo.setChildQueues(new ArrayList<QueueInfo>());
queueInfo.setQueueState(QueueState.RUNNING); queueInfo.setQueueState(QueueState.RUNNING);
return queueInfo; return queueInfo;

View File

@ -123,8 +123,8 @@ public void render(Block html) {
span().$style(Q_END)._("100% ")._(). span().$style(Q_END)._("100% ")._().
span(".q", "default")._()._(); span(".q", "default")._()._();
} else { } else {
float used = qInfo.getCurrentCapacity() / 100.0f; float used = qInfo.getCurrentCapacity();
float set = qInfo.getCapacity() / 100.0f; float set = qInfo.getCapacity();
float delta = Math.abs(set - used) + 0.001f; float delta = Math.abs(set - used) + 0.001f;
ul. ul.
li(). li().

View File

@ -145,7 +145,7 @@ Add the following configs to your <<<yarn-site.xml>>>
</property> </property>
+---+ +---+
** Setting up <<<capacity-scheduler.xml>>> * Setting up <<<capacity-scheduler.xml>>>
Make sure you populate the root queues in <<<capacity-scheduler.xml>>>. Make sure you populate the root queues in <<<capacity-scheduler.xml>>>.

View File

@ -116,6 +116,17 @@
</execution> </execution>
</executions> </executions>
</plugin> </plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>org.apache.hadoop.streaming.HadoopStreaming</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins> </plugins>
</build> </build>
</project> </project>