Merge branch 'trunk' into HDFS-6581
This commit is contained in:
commit
dde2ed13c2
@ -8,7 +8,7 @@ Requirements:
|
||||
* Maven 3.0 or later
|
||||
* Findbugs 1.3.9 (if running findbugs)
|
||||
* ProtocolBuffer 2.5.0
|
||||
* CMake 2.6 or newer (if compiling native code)
|
||||
* CMake 2.6 or newer (if compiling native code), must be 3.0 or newer on Mac
|
||||
* Zlib devel (if compiling native code)
|
||||
* openssl devel ( if compiling native hadoop-pipes )
|
||||
* Internet connection for first build (to fetch all Maven and Hadoop dependencies)
|
||||
|
@ -339,6 +339,21 @@ Trunk (Unreleased)
|
||||
|
||||
HADOOP-8589. ViewFs tests fail when tests and home dirs are nested (sanjay Radia)
|
||||
|
||||
Release 2.7.0 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
||||
NEW FEATURES
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
HADOOP-11156. DelegateToFileSystem should implement
|
||||
getFsStatus(final Path f). (Zhihai Xu via wang)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
||||
Release 2.6.0 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
@ -765,6 +780,9 @@ Release 2.6.0 - UNRELEASED
|
||||
|
||||
HADOOP-11130. NFS updateMaps OS check is reversed (brandonli)
|
||||
|
||||
HADOOP-11113. Namenode not able to reconnect to KMS after KMS restart.
|
||||
(Arun Suresh via wang)
|
||||
|
||||
BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS
|
||||
|
||||
HADOOP-10734. Implement high-performance secure random number sources.
|
||||
@ -895,6 +913,14 @@ Release 2.6.0 - UNRELEASED
|
||||
HADOOP-11049. javax package system class default is too broad (Sangjin Lee
|
||||
via jlowe)
|
||||
|
||||
HADOOP-11154. Update BUILDING.txt to state that CMake 3.0 or newer is
|
||||
required on Mac. (cnauroth)
|
||||
|
||||
HADOOP-11145. TestFairCallQueue fails. (Akira AJISAKA via cnauroth)
|
||||
|
||||
HADOOP-11117 UGI HadoopLoginModule doesn't catch & wrap all
|
||||
kerberos-related exceptions (stevel)
|
||||
|
||||
Release 2.5.1 - 2014-09-05
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -415,7 +415,7 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
|
||||
return conn;
|
||||
}
|
||||
|
||||
private static <T> T call(HttpURLConnection conn, Map jsonOutput,
|
||||
private <T> T call(HttpURLConnection conn, Map jsonOutput,
|
||||
int expectedResponse, Class<T> klass)
|
||||
throws IOException {
|
||||
T ret = null;
|
||||
@ -427,6 +427,14 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
|
||||
conn.getInputStream().close();
|
||||
throw ex;
|
||||
}
|
||||
if (conn.getResponseCode() == HttpURLConnection.HTTP_FORBIDDEN) {
|
||||
// Ideally, this should happen only when there is an Authentication
|
||||
// failure. Unfortunately, the AuthenticationFilter returns 403 when it
|
||||
// cannot authenticate (Since a 401 requires Server to send
|
||||
// WWW-Authenticate header as well)..
|
||||
KMSClientProvider.this.authToken =
|
||||
new DelegationTokenAuthenticatedURL.Token();
|
||||
}
|
||||
HttpExceptionUtils.validateResponse(conn, expectedResponse);
|
||||
if (APPLICATION_JSON_MIME.equalsIgnoreCase(conn.getContentType())
|
||||
&& klass != null) {
|
||||
|
@ -128,6 +128,11 @@ public abstract class DelegateToFileSystem extends AbstractFileSystem {
|
||||
return fsImpl.getStatus();
|
||||
}
|
||||
|
||||
@Override
|
||||
public FsStatus getFsStatus(final Path f) throws IOException {
|
||||
return fsImpl.getStatus(f);
|
||||
}
|
||||
|
||||
@Override
|
||||
public FsServerDefaults getServerDefaults() throws IOException {
|
||||
return fsImpl.getServerDefaults();
|
||||
|
@ -47,7 +47,8 @@ class User implements Principal {
|
||||
try {
|
||||
shortName = new HadoopKerberosName(name).getShortName();
|
||||
} catch (IOException ioe) {
|
||||
throw new IllegalArgumentException("Illegal principal name " + name, ioe);
|
||||
throw new IllegalArgumentException("Illegal principal name " + name
|
||||
+": " + ioe.toString(), ioe);
|
||||
}
|
||||
fullName = name;
|
||||
|
||||
|
@ -178,7 +178,21 @@ public class UserGroupInformation {
|
||||
}
|
||||
// if we found the user, add our principal
|
||||
if (user != null) {
|
||||
subject.getPrincipals().add(new User(user.getName()));
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Using user: \"" + user + "\" with name " + user.getName());
|
||||
}
|
||||
|
||||
User userEntry = null;
|
||||
try {
|
||||
userEntry = new User(user.getName());
|
||||
} catch (Exception e) {
|
||||
throw (LoginException)(new LoginException(e.toString()).initCause(e));
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("User entry: \"" + userEntry.toString() + "\"" );
|
||||
}
|
||||
|
||||
subject.getPrincipals().add(userEntry);
|
||||
return true;
|
||||
}
|
||||
LOG.error("Can't find user in " + subject);
|
||||
@ -931,7 +945,7 @@ public class UserGroupInformation {
|
||||
metrics.loginFailure.add(Time.now() - start);
|
||||
}
|
||||
throw new IOException("Login failure for " + user + " from keytab " +
|
||||
path, le);
|
||||
path+ ": " + le, le);
|
||||
}
|
||||
LOG.info("Login successful for user " + keytabPrincipal
|
||||
+ " using keytab file " + keytabFile);
|
||||
|
@ -29,6 +29,7 @@ import static org.mockito.Mockito.when;
|
||||
|
||||
import junit.framework.TestCase;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
|
||||
@ -243,11 +244,14 @@ public class TestFairCallQueue extends TestCase {
|
||||
public final String tag;
|
||||
public volatile int callsAdded = 0; // How many calls we added, accurate unless interrupted
|
||||
private final int maxCalls;
|
||||
private final CountDownLatch latch;
|
||||
|
||||
public Putter(BlockingQueue<Schedulable> aCq, int maxCalls, String tag) {
|
||||
public Putter(BlockingQueue<Schedulable> aCq, int maxCalls, String tag,
|
||||
CountDownLatch latch) {
|
||||
this.maxCalls = maxCalls;
|
||||
this.cq = aCq;
|
||||
this.tag = tag;
|
||||
this.latch = latch;
|
||||
}
|
||||
|
||||
private String getTag() {
|
||||
@ -262,6 +266,7 @@ public class TestFairCallQueue extends TestCase {
|
||||
while (callsAdded < maxCalls || maxCalls < 0) {
|
||||
cq.put(mockCall(getTag()));
|
||||
callsAdded++;
|
||||
latch.countDown();
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
return;
|
||||
@ -280,14 +285,17 @@ public class TestFairCallQueue extends TestCase {
|
||||
public volatile int callsTaken = 0; // total calls taken, accurate if we aren't interrupted
|
||||
public volatile Schedulable lastResult = null; // the last thing we took
|
||||
private final int maxCalls; // maximum calls to take
|
||||
private final CountDownLatch latch;
|
||||
|
||||
private IdentityProvider uip;
|
||||
|
||||
public Taker(BlockingQueue<Schedulable> aCq, int maxCalls, String tag) {
|
||||
public Taker(BlockingQueue<Schedulable> aCq, int maxCalls, String tag,
|
||||
CountDownLatch latch) {
|
||||
this.maxCalls = maxCalls;
|
||||
this.cq = aCq;
|
||||
this.tag = tag;
|
||||
this.uip = new UserIdentityProvider();
|
||||
this.latch = latch;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -303,6 +311,7 @@ public class TestFairCallQueue extends TestCase {
|
||||
cq.put(res);
|
||||
} else {
|
||||
callsTaken++;
|
||||
latch.countDown();
|
||||
lastResult = res;
|
||||
}
|
||||
}
|
||||
@ -316,10 +325,11 @@ public class TestFairCallQueue extends TestCase {
|
||||
public void assertCanTake(BlockingQueue<Schedulable> cq, int numberOfTakes,
|
||||
int takeAttempts) throws InterruptedException {
|
||||
|
||||
Taker taker = new Taker(cq, takeAttempts, "default");
|
||||
CountDownLatch latch = new CountDownLatch(numberOfTakes);
|
||||
Taker taker = new Taker(cq, takeAttempts, "default", latch);
|
||||
Thread t = new Thread(taker);
|
||||
t.start();
|
||||
t.join(100);
|
||||
latch.await();
|
||||
|
||||
assertEquals(numberOfTakes, taker.callsTaken);
|
||||
t.interrupt();
|
||||
@ -329,10 +339,11 @@ public class TestFairCallQueue extends TestCase {
|
||||
public void assertCanPut(BlockingQueue<Schedulable> cq, int numberOfPuts,
|
||||
int putAttempts) throws InterruptedException {
|
||||
|
||||
Putter putter = new Putter(cq, putAttempts, null);
|
||||
CountDownLatch latch = new CountDownLatch(numberOfPuts);
|
||||
Putter putter = new Putter(cq, putAttempts, null, latch);
|
||||
Thread t = new Thread(putter);
|
||||
t.start();
|
||||
t.join(100);
|
||||
latch.await();
|
||||
|
||||
assertEquals(numberOfPuts, putter.callsAdded);
|
||||
t.interrupt();
|
||||
|
@ -340,7 +340,8 @@ public class TestUserGroupInformation {
|
||||
} catch (IllegalArgumentException e) {
|
||||
String expect = (userName == null || userName.isEmpty())
|
||||
? "Null user" : "Illegal principal name "+userName;
|
||||
assertEquals(expect, e.getMessage());
|
||||
assertTrue("Did not find "+ expect + " in " + e,
|
||||
e.toString().contains(expect));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -43,12 +43,12 @@ import java.util.UUID;
|
||||
|
||||
public class MiniKMS {
|
||||
|
||||
private static Server createJettyServer(String keyStore, String password) {
|
||||
private static Server createJettyServer(String keyStore, String password, int inPort) {
|
||||
try {
|
||||
boolean ssl = keyStore != null;
|
||||
InetAddress localhost = InetAddress.getByName("localhost");
|
||||
String host = "localhost";
|
||||
ServerSocket ss = new ServerSocket(0, 50, localhost);
|
||||
ServerSocket ss = new ServerSocket((inPort < 0) ? 0 : inPort, 50, localhost);
|
||||
int port = ss.getLocalPort();
|
||||
ss.close();
|
||||
Server server = new Server(0);
|
||||
@ -91,6 +91,7 @@ public class MiniKMS {
|
||||
private String log4jConfFile;
|
||||
private File keyStoreFile;
|
||||
private String keyStorePassword;
|
||||
private int inPort = -1;
|
||||
|
||||
public Builder() {
|
||||
kmsConfDir = new File("target/test-classes").getAbsoluteFile();
|
||||
@ -111,6 +112,12 @@ public class MiniKMS {
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder setPort(int port) {
|
||||
Preconditions.checkArgument(port > 0, "input port must be greater than 0");
|
||||
this.inPort = port;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder setSslConf(File keyStoreFile, String keyStorePassword) {
|
||||
Preconditions.checkNotNull(keyStoreFile, "keystore file is NULL");
|
||||
Preconditions.checkNotNull(keyStorePassword, "keystore password is NULL");
|
||||
@ -126,7 +133,7 @@ public class MiniKMS {
|
||||
"KMS conf dir does not exist");
|
||||
return new MiniKMS(kmsConfDir.getAbsolutePath(), log4jConfFile,
|
||||
(keyStoreFile != null) ? keyStoreFile.getAbsolutePath() : null,
|
||||
keyStorePassword);
|
||||
keyStorePassword, inPort);
|
||||
}
|
||||
}
|
||||
|
||||
@ -135,14 +142,16 @@ public class MiniKMS {
|
||||
private String keyStore;
|
||||
private String keyStorePassword;
|
||||
private Server jetty;
|
||||
private int inPort;
|
||||
private URL kmsURL;
|
||||
|
||||
public MiniKMS(String kmsConfDir, String log4ConfFile, String keyStore,
|
||||
String password) {
|
||||
String password, int inPort) {
|
||||
this.kmsConfDir = kmsConfDir;
|
||||
this.log4jConfFile = log4ConfFile;
|
||||
this.keyStore = keyStore;
|
||||
this.keyStorePassword = password;
|
||||
this.inPort = inPort;
|
||||
}
|
||||
|
||||
public void start() throws Exception {
|
||||
@ -174,7 +183,7 @@ public class MiniKMS {
|
||||
writer.close();
|
||||
}
|
||||
System.setProperty("log4j.configuration", log4jConfFile);
|
||||
jetty = createJettyServer(keyStore, keyStorePassword);
|
||||
jetty = createJettyServer(keyStore, keyStorePassword, inPort);
|
||||
|
||||
// we need to do a special handling for MiniKMS to work when in a dir and
|
||||
// when in a JAR in the classpath thanks to Jetty way of handling of webapps
|
||||
|
@ -89,7 +89,7 @@ public class TestKMS {
|
||||
return file;
|
||||
}
|
||||
|
||||
public static abstract class KMSCallable implements Callable<Void> {
|
||||
public static abstract class KMSCallable<T> implements Callable<T> {
|
||||
private URL kmsUrl;
|
||||
|
||||
protected URL getKMSUrl() {
|
||||
@ -97,19 +97,27 @@ public class TestKMS {
|
||||
}
|
||||
}
|
||||
|
||||
protected void runServer(String keystore, String password, File confDir,
|
||||
KMSCallable callable) throws Exception {
|
||||
protected <T> T runServer(String keystore, String password, File confDir,
|
||||
KMSCallable<T> callable) throws Exception {
|
||||
return runServer(-1, keystore, password, confDir, callable);
|
||||
}
|
||||
|
||||
protected <T> T runServer(int port, String keystore, String password, File confDir,
|
||||
KMSCallable<T> callable) throws Exception {
|
||||
MiniKMS.Builder miniKMSBuilder = new MiniKMS.Builder().setKmsConfDir(confDir)
|
||||
.setLog4jConfFile("log4j.properties");
|
||||
if (keystore != null) {
|
||||
miniKMSBuilder.setSslConf(new File(keystore), password);
|
||||
}
|
||||
if (port > 0) {
|
||||
miniKMSBuilder.setPort(port);
|
||||
}
|
||||
MiniKMS miniKMS = miniKMSBuilder.build();
|
||||
miniKMS.start();
|
||||
try {
|
||||
System.out.println("Test KMS running at: " + miniKMS.getKMSUrl());
|
||||
callable.kmsUrl = miniKMS.getKMSUrl();
|
||||
callable.call();
|
||||
return callable.call();
|
||||
} finally {
|
||||
miniKMS.stop();
|
||||
}
|
||||
@ -284,7 +292,7 @@ public class TestKMS {
|
||||
|
||||
writeConf(testDir, conf);
|
||||
|
||||
runServer(keystore, password, testDir, new KMSCallable() {
|
||||
runServer(keystore, password, testDir, new KMSCallable<Void>() {
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
final Configuration conf = new Configuration();
|
||||
@ -351,7 +359,7 @@ public class TestKMS {
|
||||
conf.set(KeyAuthorizationKeyProvider.KEY_ACL + "k6.ALL", "*");
|
||||
writeConf(confDir, conf);
|
||||
|
||||
runServer(null, null, confDir, new KMSCallable() {
|
||||
runServer(null, null, confDir, new KMSCallable<Void>() {
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
Date started = new Date();
|
||||
@ -616,7 +624,7 @@ public class TestKMS {
|
||||
|
||||
writeConf(testDir, conf);
|
||||
|
||||
runServer(null, null, testDir, new KMSCallable() {
|
||||
runServer(null, null, testDir, new KMSCallable<Void>() {
|
||||
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
@ -782,6 +790,92 @@ public class TestKMS {
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testKMSRestart() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
conf.set("hadoop.security.authentication", "kerberos");
|
||||
UserGroupInformation.setConfiguration(conf);
|
||||
final File testDir = getTestDir();
|
||||
conf = createBaseKMSConf(testDir);
|
||||
conf.set("hadoop.kms.authentication.kerberos.keytab",
|
||||
keytab.getAbsolutePath());
|
||||
conf.set("hadoop.kms.authentication.kerberos.principal", "HTTP/localhost");
|
||||
conf.set("hadoop.kms.authentication.kerberos.name.rules", "DEFAULT");
|
||||
|
||||
for (KMSACLs.Type type : KMSACLs.Type.values()) {
|
||||
conf.set(type.getAclConfigKey(), type.toString());
|
||||
}
|
||||
conf.set(KMSACLs.Type.CREATE.getAclConfigKey(),
|
||||
KMSACLs.Type.CREATE.toString() + ",SET_KEY_MATERIAL");
|
||||
|
||||
conf.set(KMSACLs.Type.ROLLOVER.getAclConfigKey(),
|
||||
KMSACLs.Type.ROLLOVER.toString() + ",SET_KEY_MATERIAL");
|
||||
|
||||
conf.set(KeyAuthorizationKeyProvider.KEY_ACL + "k0.ALL", "*");
|
||||
conf.set(KeyAuthorizationKeyProvider.KEY_ACL + "k1.ALL", "*");
|
||||
conf.set(KeyAuthorizationKeyProvider.KEY_ACL + "k2.ALL", "*");
|
||||
conf.set(KeyAuthorizationKeyProvider.KEY_ACL + "k3.ALL", "*");
|
||||
|
||||
writeConf(testDir, conf);
|
||||
|
||||
KMSCallable<KeyProvider> c =
|
||||
new KMSCallable<KeyProvider>() {
|
||||
@Override
|
||||
public KeyProvider call() throws Exception {
|
||||
final Configuration conf = new Configuration();
|
||||
conf.setInt(KeyProvider.DEFAULT_BITLENGTH_NAME, 128);
|
||||
final URI uri = createKMSUri(getKMSUrl());
|
||||
|
||||
final KeyProvider kp =
|
||||
doAs("SET_KEY_MATERIAL",
|
||||
new PrivilegedExceptionAction<KeyProvider>() {
|
||||
@Override
|
||||
public KeyProvider run() throws Exception {
|
||||
KMSClientProvider kp = new KMSClientProvider(uri, conf);
|
||||
kp.createKey("k1", new byte[16],
|
||||
new KeyProvider.Options(conf));
|
||||
return kp;
|
||||
}
|
||||
});
|
||||
return kp;
|
||||
}
|
||||
};
|
||||
|
||||
final KeyProvider retKp =
|
||||
runServer(null, null, testDir, c);
|
||||
|
||||
// Restart server (using the same port)
|
||||
runServer(c.getKMSUrl().getPort(), null, null, testDir,
|
||||
new KMSCallable<Void>() {
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
final Configuration conf = new Configuration();
|
||||
conf.setInt(KeyProvider.DEFAULT_BITLENGTH_NAME, 128);
|
||||
doAs("SET_KEY_MATERIAL",
|
||||
new PrivilegedExceptionAction<Void>() {
|
||||
@Override
|
||||
public Void run() throws Exception {
|
||||
try {
|
||||
retKp.createKey("k2", new byte[16],
|
||||
new KeyProvider.Options(conf));
|
||||
Assert.fail("Should fail first time !!");
|
||||
} catch (IOException e) {
|
||||
String message = e.getMessage();
|
||||
Assert.assertTrue("Should be a 403 error : " + message,
|
||||
message.contains("403"));
|
||||
}
|
||||
retKp.createKey("k2", new byte[16],
|
||||
new KeyProvider.Options(conf));
|
||||
retKp.createKey("k3", new byte[16],
|
||||
new KeyProvider.Options(conf));
|
||||
return null;
|
||||
}
|
||||
});
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testACLs() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
@ -809,7 +903,7 @@ public class TestKMS {
|
||||
|
||||
writeConf(testDir, conf);
|
||||
|
||||
runServer(null, null, testDir, new KMSCallable() {
|
||||
runServer(null, null, testDir, new KMSCallable<Void>() {
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
final Configuration conf = new Configuration();
|
||||
@ -1117,7 +1211,7 @@ public class TestKMS {
|
||||
|
||||
writeConf(testDir, conf);
|
||||
|
||||
runServer(null, null, testDir, new KMSCallable() {
|
||||
runServer(null, null, testDir, new KMSCallable<Void>() {
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
final Configuration conf = new Configuration();
|
||||
@ -1201,7 +1295,7 @@ public class TestKMS {
|
||||
|
||||
writeConf(testDir, conf);
|
||||
|
||||
runServer(null, null, testDir, new KMSCallable() {
|
||||
runServer(null, null, testDir, new KMSCallable<Void>() {
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
final Configuration conf = new Configuration();
|
||||
@ -1326,7 +1420,7 @@ public class TestKMS {
|
||||
|
||||
writeConf(testDir, conf);
|
||||
|
||||
runServer(null, null, testDir, new KMSCallable() {
|
||||
runServer(null, null, testDir, new KMSCallable<Void>() {
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
final Configuration conf = new Configuration();
|
||||
@ -1398,7 +1492,7 @@ public class TestKMS {
|
||||
|
||||
writeConf(testDir, conf);
|
||||
|
||||
runServer(null, null, testDir, new KMSCallable() {
|
||||
runServer(null, null, testDir, new KMSCallable<Void>() {
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
final Configuration conf = new Configuration();
|
||||
|
@ -258,6 +258,18 @@ Trunk (Unreleased)
|
||||
|
||||
HDFS-6981. Fix DN upgrade with layout version change. (Arpit Agarwal)
|
||||
|
||||
Release 2.7.0 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
||||
NEW FEATURES
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
||||
Release 2.6.0 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
@ -760,6 +772,11 @@ Release 2.6.0 - UNRELEASED
|
||||
HDFS-7157. Using Time.now() for recording start/end time of reconfiguration
|
||||
tasks (Lei Xu via cmccabe)
|
||||
|
||||
HDFS-6664. HDFS permissions guide documentation states incorrect default
|
||||
group mapping class. (Ray Chiang via aw)
|
||||
|
||||
HDFS-4227. Document dfs.namenode.resource.* (Daisuke Kobayashi via aw)
|
||||
|
||||
BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS
|
||||
|
||||
HDFS-6387. HDFS CLI admin tool for creating & deleting an
|
||||
@ -991,10 +1008,7 @@ Release 2.6.0 - UNRELEASED
|
||||
HDFS-7140. Add a tool to list all the existing block storage policies.
|
||||
(jing9)
|
||||
|
||||
HDFS-6664. HDFS permissions guide documentation states incorrect default
|
||||
group mapping class. (Ray Chiang via aw)
|
||||
|
||||
HDFS-4227. Document dfs.namenode.resource.* (Daisuke Kobayashi via aw)
|
||||
HDFS-7167. NPE while running Mover if the given path is for a file. (jing9)
|
||||
|
||||
Release 2.5.1 - 2014-09-05
|
||||
|
||||
|
@ -252,14 +252,9 @@ public class Mover {
|
||||
*/
|
||||
private boolean processNamespace() {
|
||||
getSnapshottableDirs();
|
||||
boolean hasRemaining = true;
|
||||
try {
|
||||
for (Path target : targetPaths) {
|
||||
hasRemaining = processDirRecursively("", dfs.getFileInfo(target
|
||||
.toUri().getPath()));
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Failed to get root directory status. Ignore and continue.", e);
|
||||
boolean hasRemaining = false;
|
||||
for (Path target : targetPaths) {
|
||||
hasRemaining |= processPath(target.toUri().getPath());
|
||||
}
|
||||
// wait for pending move to finish and retry the failed migration
|
||||
hasRemaining |= Dispatcher.waitForMoveCompletion(storages.targets.values());
|
||||
@ -270,7 +265,7 @@ public class Mover {
|
||||
* @return whether there is still remaing migration work for the next
|
||||
* round
|
||||
*/
|
||||
private boolean processChildrenList(String fullPath) {
|
||||
private boolean processPath(String fullPath) {
|
||||
boolean hasRemaining = false;
|
||||
for (byte[] lastReturnedName = HdfsFileStatus.EMPTY_NAME;;) {
|
||||
final DirectoryListing children;
|
||||
@ -285,7 +280,7 @@ public class Mover {
|
||||
return hasRemaining;
|
||||
}
|
||||
for (HdfsFileStatus child : children.getPartialListing()) {
|
||||
hasRemaining |= processDirRecursively(fullPath, child);
|
||||
hasRemaining |= processRecursively(fullPath, child);
|
||||
}
|
||||
if (children.hasMore()) {
|
||||
lastReturnedName = children.getLastName();
|
||||
@ -296,8 +291,7 @@ public class Mover {
|
||||
}
|
||||
|
||||
/** @return whether the migration requires next round */
|
||||
private boolean processDirRecursively(String parent,
|
||||
HdfsFileStatus status) {
|
||||
private boolean processRecursively(String parent, HdfsFileStatus status) {
|
||||
String fullPath = status.getFullName(parent);
|
||||
boolean hasRemaining = false;
|
||||
if (status.isDir()) {
|
||||
@ -305,11 +299,11 @@ public class Mover {
|
||||
fullPath = fullPath + Path.SEPARATOR;
|
||||
}
|
||||
|
||||
hasRemaining = processChildrenList(fullPath);
|
||||
hasRemaining = processPath(fullPath);
|
||||
// process snapshots if this is a snapshottable directory
|
||||
if (snapshottableDirs.contains(fullPath)) {
|
||||
final String dirSnapshot = fullPath + HdfsConstants.DOT_SNAPSHOT_DIR;
|
||||
hasRemaining |= processChildrenList(dirSnapshot);
|
||||
hasRemaining |= processPath(dirSnapshot);
|
||||
}
|
||||
} else if (!status.isSymlink()) { // file
|
||||
try {
|
||||
|
@ -17,7 +17,6 @@
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.mover;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.ArrayList;
|
||||
@ -27,12 +26,10 @@ import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.commons.logging.impl.Log4JLogger;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.conf.ReconfigurationException;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
@ -47,7 +44,6 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.StorageType;
|
||||
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
@ -533,6 +529,42 @@ public class TestStorageMover {
|
||||
"==================================================\n\n");
|
||||
}
|
||||
|
||||
/**
|
||||
* Run Mover with arguments specifying files and directories
|
||||
*/
|
||||
@Test
|
||||
public void testMoveSpecificPaths() throws Exception {
|
||||
LOG.info("testMoveSpecificPaths");
|
||||
final Path foo = new Path("/foo");
|
||||
final Path barFile = new Path(foo, "bar");
|
||||
final Path foo2 = new Path("/foo2");
|
||||
final Path bar2File = new Path(foo2, "bar2");
|
||||
Map<Path, BlockStoragePolicy> policyMap = Maps.newHashMap();
|
||||
policyMap.put(foo, COLD);
|
||||
policyMap.put(foo2, WARM);
|
||||
NamespaceScheme nsScheme = new NamespaceScheme(Arrays.asList(foo, foo2),
|
||||
Arrays.asList(barFile, bar2File), BLOCK_SIZE, null, policyMap);
|
||||
ClusterScheme clusterScheme = new ClusterScheme(DEFAULT_CONF,
|
||||
NUM_DATANODES, REPL, genStorageTypes(NUM_DATANODES), null);
|
||||
MigrationTest test = new MigrationTest(clusterScheme, nsScheme);
|
||||
test.setupCluster();
|
||||
|
||||
try {
|
||||
test.prepareNamespace();
|
||||
test.setStoragePolicy();
|
||||
|
||||
Map<URI, List<Path>> map = Mover.Cli.getNameNodePathsToMove(test.conf,
|
||||
"-p", "/foo/bar", "/foo2");
|
||||
int result = Mover.run(map, test.conf);
|
||||
Assert.assertEquals(ExitStatus.SUCCESS.getExitCode(), result);
|
||||
|
||||
Thread.sleep(5000);
|
||||
test.verify(true);
|
||||
} finally {
|
||||
test.shutdownCluster();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Move an open file into archival storage
|
||||
*/
|
||||
|
@ -224,6 +224,18 @@ Trunk (Unreleased)
|
||||
|
||||
MAPREDUCE-6078. native-task: fix gtest build on macosx (Binglin Chang)
|
||||
|
||||
Release 2.7.0 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
||||
NEW FEATURES
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
||||
Release 2.6.0 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -27,6 +27,18 @@ Trunk - Unreleased
|
||||
|
||||
YARN-2525. yarn logs command gives error on trunk (Akira AJISAKA via aw)
|
||||
|
||||
Release 2.7.0 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
||||
NEW FEATURES
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
||||
Release 2.6.0 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
@ -268,6 +280,9 @@ Release 2.6.0 - UNRELEASED
|
||||
YARN-1769. CapacityScheduler: Improve reservations (Thomas Graves via
|
||||
jlowe)
|
||||
|
||||
YARN-2627. Added the info logs of attemptFailuresValidityInterval and number
|
||||
of previous failed attempts. (Xuan Gong via zjshen)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
@ -468,6 +483,17 @@ Release 2.6.0 - UNRELEASED
|
||||
YARN-2606. Application History Server tries to access hdfs before doing
|
||||
secure login (Mit Desai via jeagles)
|
||||
|
||||
YARN-2610. Hamlet should close table tags. (Ray Chiang via kasha)
|
||||
|
||||
YARN-2387. Resource Manager crashes with NPE due to lack of
|
||||
synchronization (Mit Desai via jlowe)
|
||||
|
||||
YARN-2594. Potential deadlock in RM when querying
|
||||
ApplicationResourceUsageReport. (Wangda Tan via kasha)
|
||||
|
||||
YARN-2602. Fixed possible NPE in ApplicationHistoryManagerOnTimelineStore.
|
||||
(Zhijie Shen via jianhe)
|
||||
|
||||
Release 2.5.1 - 2014-09-05
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -50,7 +50,7 @@ public class ContainerStatusPBImpl extends ContainerStatus {
|
||||
viaProto = true;
|
||||
}
|
||||
|
||||
public ContainerStatusProto getProto() {
|
||||
public synchronized ContainerStatusProto getProto() {
|
||||
mergeLocalToProto();
|
||||
proto = viaProto ? proto : builder.build();
|
||||
viaProto = true;
|
||||
@ -90,7 +90,7 @@ public class ContainerStatusPBImpl extends ContainerStatus {
|
||||
}
|
||||
}
|
||||
|
||||
private void mergeLocalToProto() {
|
||||
private synchronized void mergeLocalToProto() {
|
||||
if (viaProto)
|
||||
maybeInitBuilder();
|
||||
mergeLocalToBuilder();
|
||||
@ -98,7 +98,7 @@ public class ContainerStatusPBImpl extends ContainerStatus {
|
||||
viaProto = true;
|
||||
}
|
||||
|
||||
private void maybeInitBuilder() {
|
||||
private synchronized void maybeInitBuilder() {
|
||||
if (viaProto || builder == null) {
|
||||
builder = ContainerStatusProto.newBuilder(proto);
|
||||
}
|
||||
@ -107,7 +107,7 @@ public class ContainerStatusPBImpl extends ContainerStatus {
|
||||
|
||||
|
||||
@Override
|
||||
public ContainerState getState() {
|
||||
public synchronized ContainerState getState() {
|
||||
ContainerStatusProtoOrBuilder p = viaProto ? proto : builder;
|
||||
if (!p.hasState()) {
|
||||
return null;
|
||||
@ -116,7 +116,7 @@ public class ContainerStatusPBImpl extends ContainerStatus {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setState(ContainerState state) {
|
||||
public synchronized void setState(ContainerState state) {
|
||||
maybeInitBuilder();
|
||||
if (state == null) {
|
||||
builder.clearState();
|
||||
@ -125,7 +125,7 @@ public class ContainerStatusPBImpl extends ContainerStatus {
|
||||
builder.setState(convertToProtoFormat(state));
|
||||
}
|
||||
@Override
|
||||
public ContainerId getContainerId() {
|
||||
public synchronized ContainerId getContainerId() {
|
||||
ContainerStatusProtoOrBuilder p = viaProto ? proto : builder;
|
||||
if (this.containerId != null) {
|
||||
return this.containerId;
|
||||
@ -138,32 +138,32 @@ public class ContainerStatusPBImpl extends ContainerStatus {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setContainerId(ContainerId containerId) {
|
||||
public synchronized void setContainerId(ContainerId containerId) {
|
||||
maybeInitBuilder();
|
||||
if (containerId == null)
|
||||
builder.clearContainerId();
|
||||
this.containerId = containerId;
|
||||
}
|
||||
@Override
|
||||
public int getExitStatus() {
|
||||
public synchronized int getExitStatus() {
|
||||
ContainerStatusProtoOrBuilder p = viaProto ? proto : builder;
|
||||
return p.getExitStatus();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setExitStatus(int exitStatus) {
|
||||
public synchronized void setExitStatus(int exitStatus) {
|
||||
maybeInitBuilder();
|
||||
builder.setExitStatus(exitStatus);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getDiagnostics() {
|
||||
public synchronized String getDiagnostics() {
|
||||
ContainerStatusProtoOrBuilder p = viaProto ? proto : builder;
|
||||
return (p.getDiagnostics());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setDiagnostics(String diagnostics) {
|
||||
public synchronized void setDiagnostics(String diagnostics) {
|
||||
maybeInitBuilder();
|
||||
builder.setDiagnostics(diagnostics);
|
||||
}
|
||||
|
@ -2423,10 +2423,10 @@ public class Hamlet extends HamletImpl implements HamletSpec._Html {
|
||||
}
|
||||
|
||||
private <T extends _> TH<T> th_(T e, boolean inline) {
|
||||
return new TH<T>("th", e, opt(false, inline, false)); }
|
||||
return new TH<T>("th", e, opt(true, inline, false)); }
|
||||
|
||||
private <T extends _> TD<T> td_(T e, boolean inline) {
|
||||
return new TD<T>("td", e, opt(false, inline, false)); }
|
||||
return new TD<T>("td", e, opt(true, inline, false)); }
|
||||
|
||||
public class COL<T extends _> extends EImp<T> implements HamletSpec.COL {
|
||||
public COL(String name, T parent, EnumSet<EOpt> opts) {
|
||||
@ -3719,10 +3719,10 @@ public class Hamlet extends HamletImpl implements HamletSpec._Html {
|
||||
return new COLGROUP<T>("colgroup", e, opt(false, inline, false)); }
|
||||
|
||||
private <T extends _> THEAD<T> thead_(T e, boolean inline) {
|
||||
return new THEAD<T>("thead", e, opt(false, inline, false)); }
|
||||
return new THEAD<T>("thead", e, opt(true, inline, false)); }
|
||||
|
||||
private <T extends _> TFOOT<T> tfoot_(T e, boolean inline) {
|
||||
return new TFOOT<T>("tfoot", e, opt(false, inline, false)); }
|
||||
return new TFOOT<T>("tfoot", e, opt(true, inline, false)); }
|
||||
|
||||
private <T extends _> TBODY<T> tbody_(T e, boolean inline) {
|
||||
return new TBODY<T>("tbody", e, opt(true, inline, false)); }
|
||||
@ -3731,7 +3731,7 @@ public class Hamlet extends HamletImpl implements HamletSpec._Html {
|
||||
return new COL<T>("col", e, opt(false, inline, false)); }
|
||||
|
||||
private <T extends _> TR<T> tr_(T e, boolean inline) {
|
||||
return new TR<T>("tr", e, opt(false, inline, false)); }
|
||||
return new TR<T>("tr", e, opt(true, inline, false)); }
|
||||
|
||||
public class BUTTON<T extends _> extends EImp<T> implements HamletSpec.BUTTON {
|
||||
public BUTTON(String name, T parent, EnumSet<EOpt> opts) {
|
||||
|
@ -88,8 +88,8 @@ public class TestHamlet {
|
||||
assertEquals(0, h.nestLevel);
|
||||
verify(out).print("<table");
|
||||
verify(out).print("</table>");
|
||||
verify(out, never()).print("</td>");
|
||||
verify(out, never()).print("</tr>");
|
||||
verify(out, atLeast(1)).print("</td>");
|
||||
verify(out, atLeast(1)).print("</tr>");
|
||||
}
|
||||
|
||||
@Test public void testEnumAttrs() {
|
||||
|
@ -68,7 +68,7 @@ public class TestInfoBlock {
|
||||
|
||||
static {
|
||||
resInfo = new ResponseInfo();
|
||||
resInfo._("Single_line_value", "This is one line.");
|
||||
resInfo._("Multiple_line_value", "This is one line.");
|
||||
resInfo._("Multiple_line_value", "This is first line.\nThis is second line.");
|
||||
}
|
||||
|
||||
@ -98,13 +98,14 @@ public class TestInfoBlock {
|
||||
WebAppTests.testBlock(MultilineInfoBlock.class);
|
||||
TestInfoBlock.pw.flush();
|
||||
String output = TestInfoBlock.sw.toString().replaceAll(" +", " ");
|
||||
String expectedSinglelineData = String.format("<tr class=\"odd\">%n"
|
||||
+ " <th>%n Single_line_value%n <td>%n This is one line.%n");
|
||||
String expectedMultilineData = String.format("<tr class=\"even\">%n"
|
||||
+ " <th>%n Multiple_line_value%n <td>%n <div>%n"
|
||||
String expectedMultilineData1 = String.format("<tr class=\"odd\">%n"
|
||||
+ " <th>%n Multiple_line_value%n </th>%n"
|
||||
+ " <td>%n This is one line.%n </td>%n");
|
||||
String expectedMultilineData2 = String.format("<tr class=\"even\">%n"
|
||||
+ " <th>%n Multiple_line_value%n </th>%n <td>%n <div>%n"
|
||||
+ " This is first line.%n </div>%n <div>%n"
|
||||
+ " This is second line.%n </div>%n");
|
||||
assertTrue(output.contains(expectedSinglelineData) && output.contains(expectedMultilineData));
|
||||
assertTrue(output.contains(expectedMultilineData1) && output.contains(expectedMultilineData2));
|
||||
}
|
||||
|
||||
@Test(timeout=60000L)
|
||||
@ -115,4 +116,4 @@ public class TestInfoBlock {
|
||||
assertFalse(output.contains("<script>"));
|
||||
assertTrue(output.contains(JAVASCRIPT_ESCAPED));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -227,7 +227,9 @@ public class ApplicationHistoryManagerOnTimelineStore extends AbstractService
|
||||
if (entityInfo.containsKey(ApplicationMetricsConstants.APP_VIEW_ACLS_ENTITY_INFO)) {
|
||||
String appViewACLsStr = entityInfo.get(
|
||||
ApplicationMetricsConstants.APP_VIEW_ACLS_ENTITY_INFO).toString();
|
||||
appViewACLs.put(ApplicationAccessType.VIEW_APP, appViewACLsStr);
|
||||
if (appViewACLsStr.length() > 0) {
|
||||
appViewACLs.put(ApplicationAccessType.VIEW_APP, appViewACLsStr);
|
||||
}
|
||||
}
|
||||
if (field == ApplicationReportField.USER_AND_ACLS) {
|
||||
return new ApplicationReportExt(ApplicationReport.newInstance(
|
||||
|
@ -122,7 +122,11 @@ public class TestApplicationHistoryManagerOnTimelineStore {
|
||||
for (int i = 1; i <= SCALE; ++i) {
|
||||
TimelineEntities entities = new TimelineEntities();
|
||||
ApplicationId appId = ApplicationId.newInstance(0, i);
|
||||
entities.addEntity(createApplicationTimelineEntity(appId));
|
||||
if (i == 2) {
|
||||
entities.addEntity(createApplicationTimelineEntity(appId, true));
|
||||
} else {
|
||||
entities.addEntity(createApplicationTimelineEntity(appId, false));
|
||||
}
|
||||
store.put(entities);
|
||||
for (int j = 1; j <= SCALE; ++j) {
|
||||
entities = new TimelineEntities();
|
||||
@ -142,50 +146,58 @@ public class TestApplicationHistoryManagerOnTimelineStore {
|
||||
|
||||
@Test
|
||||
public void testGetApplicationReport() throws Exception {
|
||||
final ApplicationId appId = ApplicationId.newInstance(0, 1);
|
||||
ApplicationReport app;
|
||||
if (callerUGI == null) {
|
||||
app = historyManager.getApplication(appId);
|
||||
} else {
|
||||
app =
|
||||
callerUGI.doAs(new PrivilegedExceptionAction<ApplicationReport> () {
|
||||
@Override
|
||||
public ApplicationReport run() throws Exception {
|
||||
return historyManager.getApplication(appId);
|
||||
}
|
||||
});
|
||||
for (int i = 1; i <= 2; ++i) {
|
||||
final ApplicationId appId = ApplicationId.newInstance(0, i);
|
||||
ApplicationReport app;
|
||||
if (callerUGI == null) {
|
||||
app = historyManager.getApplication(appId);
|
||||
} else {
|
||||
app =
|
||||
callerUGI.doAs(new PrivilegedExceptionAction<ApplicationReport> () {
|
||||
@Override
|
||||
public ApplicationReport run() throws Exception {
|
||||
return historyManager.getApplication(appId);
|
||||
}
|
||||
});
|
||||
}
|
||||
Assert.assertNotNull(app);
|
||||
Assert.assertEquals(appId, app.getApplicationId());
|
||||
Assert.assertEquals("test app", app.getName());
|
||||
Assert.assertEquals("test app type", app.getApplicationType());
|
||||
Assert.assertEquals("user1", app.getUser());
|
||||
Assert.assertEquals("test queue", app.getQueue());
|
||||
Assert.assertEquals(Integer.MAX_VALUE + 2L, app.getStartTime());
|
||||
Assert.assertEquals(Integer.MAX_VALUE + 3L, app.getFinishTime());
|
||||
Assert.assertTrue(Math.abs(app.getProgress() - 1.0F) < 0.0001);
|
||||
// App 2 doesn't have the ACLs, such that the default ACLs " " will be used.
|
||||
// Nobody except admin and owner has access to the details of the app.
|
||||
if ((i == 1 && callerUGI != null &&
|
||||
callerUGI.getShortUserName().equals("user3")) ||
|
||||
(i == 2 && callerUGI != null &&
|
||||
(callerUGI.getShortUserName().equals("user2") ||
|
||||
callerUGI.getShortUserName().equals("user3")))) {
|
||||
Assert.assertEquals(ApplicationAttemptId.newInstance(appId, -1),
|
||||
app.getCurrentApplicationAttemptId());
|
||||
Assert.assertEquals(null, app.getHost());
|
||||
Assert.assertEquals(-1, app.getRpcPort());
|
||||
Assert.assertEquals(null, app.getTrackingUrl());
|
||||
Assert.assertEquals(null, app.getOriginalTrackingUrl());
|
||||
Assert.assertEquals(null, app.getDiagnostics());
|
||||
} else {
|
||||
Assert.assertEquals(ApplicationAttemptId.newInstance(appId, 1),
|
||||
app.getCurrentApplicationAttemptId());
|
||||
Assert.assertEquals("test host", app.getHost());
|
||||
Assert.assertEquals(-100, app.getRpcPort());
|
||||
Assert.assertEquals("test tracking url", app.getTrackingUrl());
|
||||
Assert.assertEquals("test original tracking url",
|
||||
app.getOriginalTrackingUrl());
|
||||
Assert.assertEquals("test diagnostics info", app.getDiagnostics());
|
||||
}
|
||||
Assert.assertEquals(FinalApplicationStatus.UNDEFINED,
|
||||
app.getFinalApplicationStatus());
|
||||
Assert.assertEquals(YarnApplicationState.FINISHED,
|
||||
app.getYarnApplicationState());
|
||||
}
|
||||
Assert.assertNotNull(app);
|
||||
Assert.assertEquals(appId, app.getApplicationId());
|
||||
Assert.assertEquals("test app", app.getName());
|
||||
Assert.assertEquals("test app type", app.getApplicationType());
|
||||
Assert.assertEquals("user1", app.getUser());
|
||||
Assert.assertEquals("test queue", app.getQueue());
|
||||
Assert.assertEquals(Integer.MAX_VALUE + 2L, app.getStartTime());
|
||||
Assert.assertEquals(Integer.MAX_VALUE + 3L, app.getFinishTime());
|
||||
Assert.assertTrue(Math.abs(app.getProgress() - 1.0F) < 0.0001);
|
||||
if (callerUGI != null && callerUGI.getShortUserName().equals("user3")) {
|
||||
Assert.assertEquals(ApplicationAttemptId.newInstance(appId, -1),
|
||||
app.getCurrentApplicationAttemptId());
|
||||
Assert.assertEquals(null, app.getHost());
|
||||
Assert.assertEquals(-1, app.getRpcPort());
|
||||
Assert.assertEquals(null, app.getTrackingUrl());
|
||||
Assert.assertEquals(null, app.getOriginalTrackingUrl());
|
||||
Assert.assertEquals(null, app.getDiagnostics());
|
||||
} else {
|
||||
Assert.assertEquals(ApplicationAttemptId.newInstance(appId, 1),
|
||||
app.getCurrentApplicationAttemptId());
|
||||
Assert.assertEquals("test host", app.getHost());
|
||||
Assert.assertEquals(-100, app.getRpcPort());
|
||||
Assert.assertEquals("test tracking url", app.getTrackingUrl());
|
||||
Assert.assertEquals("test original tracking url",
|
||||
app.getOriginalTrackingUrl());
|
||||
Assert.assertEquals("test diagnostics info", app.getDiagnostics());
|
||||
}
|
||||
Assert.assertEquals(FinalApplicationStatus.UNDEFINED,
|
||||
app.getFinalApplicationStatus());
|
||||
Assert.assertEquals(YarnApplicationState.FINISHED,
|
||||
app.getYarnApplicationState());
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -396,7 +408,7 @@ public class TestApplicationHistoryManagerOnTimelineStore {
|
||||
}
|
||||
|
||||
private static TimelineEntity createApplicationTimelineEntity(
|
||||
ApplicationId appId) {
|
||||
ApplicationId appId, boolean emptyACLs) {
|
||||
TimelineEntity entity = new TimelineEntity();
|
||||
entity.setEntityType(ApplicationMetricsConstants.ENTITY_TYPE);
|
||||
entity.setEntityId(appId.toString());
|
||||
@ -410,8 +422,12 @@ public class TestApplicationHistoryManagerOnTimelineStore {
|
||||
entityInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO, "test queue");
|
||||
entityInfo.put(ApplicationMetricsConstants.SUBMITTED_TIME_ENTITY_INFO,
|
||||
Integer.MAX_VALUE + 1L);
|
||||
entityInfo.put(ApplicationMetricsConstants.APP_VIEW_ACLS_ENTITY_INFO,
|
||||
"user2");
|
||||
if (emptyACLs) {
|
||||
entityInfo.put(ApplicationMetricsConstants.APP_VIEW_ACLS_ENTITY_INFO, "");
|
||||
} else {
|
||||
entityInfo.put(ApplicationMetricsConstants.APP_VIEW_ACLS_ENTITY_INFO,
|
||||
"user2");
|
||||
}
|
||||
entity.setOtherInfo(entityInfo);
|
||||
TimelineEvent tEvent = new TimelineEvent();
|
||||
tEvent.setEventType(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
|
||||
|
@ -137,7 +137,7 @@ public class SystemMetricsPublisher extends CompositeService {
|
||||
dispatcher.getEventHandler().handle(
|
||||
new ApplicationACLsUpdatedEvent(
|
||||
app.getApplicationId(),
|
||||
appViewACLs,
|
||||
appViewACLs == null ? "" : appViewACLs,
|
||||
updatedTime));
|
||||
}
|
||||
}
|
||||
|
@ -128,7 +128,8 @@ public class RMAppImpl implements RMApp, Recoverable {
|
||||
private long startTime;
|
||||
private long finishTime = 0;
|
||||
private long storedFinishTime = 0;
|
||||
private RMAppAttempt currentAttempt;
|
||||
// This field isn't protected by readlock now.
|
||||
private volatile RMAppAttempt currentAttempt;
|
||||
private String queue;
|
||||
private EventHandler handler;
|
||||
private static final AppFinishedTransition FINISHED_TRANSITION =
|
||||
@ -376,6 +377,11 @@ public class RMAppImpl implements RMApp, Recoverable {
|
||||
|
||||
this.attemptFailuresValidityInterval =
|
||||
submissionContext.getAttemptFailuresValidityInterval();
|
||||
if (this.attemptFailuresValidityInterval > 0) {
|
||||
LOG.info("The attemptFailuresValidityInterval for the application: "
|
||||
+ this.applicationId + " is " + this.attemptFailuresValidityInterval
|
||||
+ ".");
|
||||
}
|
||||
|
||||
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
|
||||
this.readLock = lock.readLock();
|
||||
@ -433,16 +439,11 @@ public class RMAppImpl implements RMApp, Recoverable {
|
||||
|
||||
@Override
|
||||
public float getProgress() {
|
||||
this.readLock.lock();
|
||||
|
||||
try {
|
||||
if (this.currentAttempt != null) {
|
||||
return this.currentAttempt.getProgress();
|
||||
}
|
||||
return 0;
|
||||
} finally {
|
||||
this.readLock.unlock();
|
||||
RMAppAttempt attempt = this.currentAttempt;
|
||||
if (attempt != null) {
|
||||
return attempt.getProgress();
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -473,13 +474,7 @@ public class RMAppImpl implements RMApp, Recoverable {
|
||||
|
||||
@Override
|
||||
public RMAppAttempt getCurrentAppAttempt() {
|
||||
this.readLock.lock();
|
||||
|
||||
try {
|
||||
return this.currentAttempt;
|
||||
} finally {
|
||||
this.readLock.unlock();
|
||||
}
|
||||
return this.currentAttempt;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -650,30 +645,20 @@ public class RMAppImpl implements RMApp, Recoverable {
|
||||
|
||||
@Override
|
||||
public String getTrackingUrl() {
|
||||
this.readLock.lock();
|
||||
|
||||
try {
|
||||
if (this.currentAttempt != null) {
|
||||
return this.currentAttempt.getTrackingUrl();
|
||||
}
|
||||
return null;
|
||||
} finally {
|
||||
this.readLock.unlock();
|
||||
RMAppAttempt attempt = this.currentAttempt;
|
||||
if (attempt != null) {
|
||||
return attempt.getTrackingUrl();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getOriginalTrackingUrl() {
|
||||
this.readLock.lock();
|
||||
|
||||
try {
|
||||
if (this.currentAttempt != null) {
|
||||
return this.currentAttempt.getOriginalTrackingUrl();
|
||||
}
|
||||
return null;
|
||||
} finally {
|
||||
this.readLock.unlock();
|
||||
RMAppAttempt attempt = this.currentAttempt;
|
||||
if (attempt != null) {
|
||||
return attempt.getOriginalTrackingUrl();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -1179,6 +1164,11 @@ public class RMAppImpl implements RMApp, Recoverable {
|
||||
@Override
|
||||
public RMAppState transition(RMAppImpl app, RMAppEvent event) {
|
||||
int numberOfFailure = app.getNumFailedAppAttempts();
|
||||
LOG.info("The number of failed attempts"
|
||||
+ (app.attemptFailuresValidityInterval > 0 ? " in previous "
|
||||
+ app.attemptFailuresValidityInterval + " milliseconds " : " ")
|
||||
+ "is " + numberOfFailure + ". The max attempts is "
|
||||
+ app.maxAppAttempts);
|
||||
if (!app.submissionContext.getUnmanagedAM()
|
||||
&& numberOfFailure < app.maxAppAttempts) {
|
||||
boolean transferStateFromPreviousAttempt;
|
||||
@ -1293,4 +1283,5 @@ public class RMAppImpl implements RMApp, Recoverable {
|
||||
public void setSystemClock(Clock clock) {
|
||||
this.systemClock = clock;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -95,77 +95,89 @@ public class TestSystemMetricsPublisher {
|
||||
|
||||
@Test(timeout = 10000)
|
||||
public void testPublishApplicationMetrics() throws Exception {
|
||||
ApplicationId appId = ApplicationId.newInstance(0, 1);
|
||||
RMApp app = createRMApp(appId);
|
||||
metricsPublisher.appCreated(app, app.getStartTime());
|
||||
metricsPublisher.appFinished(app, RMAppState.FINISHED, app.getFinishTime());
|
||||
metricsPublisher.appACLsUpdated(app, "uers1,user2", 4L);
|
||||
TimelineEntity entity = null;
|
||||
do {
|
||||
entity =
|
||||
store.getEntity(appId.toString(),
|
||||
ApplicationMetricsConstants.ENTITY_TYPE,
|
||||
EnumSet.allOf(Field.class));
|
||||
// ensure three events are both published before leaving the loop
|
||||
} while (entity == null || entity.getEvents().size() < 3);
|
||||
// verify all the fields
|
||||
Assert.assertEquals(ApplicationMetricsConstants.ENTITY_TYPE,
|
||||
entity.getEntityType());
|
||||
Assert
|
||||
.assertEquals(app.getApplicationId().toString(), entity.getEntityId());
|
||||
Assert
|
||||
.assertEquals(
|
||||
app.getName(),
|
||||
entity.getOtherInfo().get(
|
||||
ApplicationMetricsConstants.NAME_ENTITY_INFO));
|
||||
Assert.assertEquals(app.getQueue(),
|
||||
entity.getOtherInfo()
|
||||
.get(ApplicationMetricsConstants.QUEUE_ENTITY_INFO));
|
||||
Assert
|
||||
.assertEquals(
|
||||
app.getUser(),
|
||||
entity.getOtherInfo().get(
|
||||
ApplicationMetricsConstants.USER_ENTITY_INFO));
|
||||
Assert
|
||||
.assertEquals(
|
||||
app.getApplicationType(),
|
||||
entity.getOtherInfo().get(
|
||||
ApplicationMetricsConstants.TYPE_ENTITY_INFO));
|
||||
Assert.assertEquals(app.getSubmitTime(),
|
||||
entity.getOtherInfo().get(
|
||||
ApplicationMetricsConstants.SUBMITTED_TIME_ENTITY_INFO));
|
||||
Assert.assertEquals("uers1,user2",
|
||||
entity.getOtherInfo().get(
|
||||
ApplicationMetricsConstants.APP_VIEW_ACLS_ENTITY_INFO));
|
||||
boolean hasCreatedEvent = false;
|
||||
boolean hasFinishedEvent = false;
|
||||
boolean hasACLsUpdatedEvent = false;
|
||||
for (TimelineEvent event : entity.getEvents()) {
|
||||
if (event.getEventType().equals(
|
||||
ApplicationMetricsConstants.CREATED_EVENT_TYPE)) {
|
||||
hasCreatedEvent = true;
|
||||
Assert.assertEquals(app.getStartTime(), event.getTimestamp());
|
||||
} else if (event.getEventType().equals(
|
||||
ApplicationMetricsConstants.FINISHED_EVENT_TYPE)) {
|
||||
hasFinishedEvent = true;
|
||||
Assert.assertEquals(app.getFinishTime(), event.getTimestamp());
|
||||
Assert.assertEquals(
|
||||
app.getDiagnostics().toString(),
|
||||
event.getEventInfo().get(
|
||||
ApplicationMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO));
|
||||
Assert.assertEquals(
|
||||
app.getFinalApplicationStatus().toString(),
|
||||
event.getEventInfo().get(
|
||||
ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO));
|
||||
Assert.assertEquals(YarnApplicationState.FINISHED.toString(), event
|
||||
.getEventInfo().get(ApplicationMetricsConstants.STATE_EVENT_INFO));
|
||||
} else if (event.getEventType().equals(
|
||||
ApplicationMetricsConstants.ACLS_UPDATED_EVENT_TYPE)) {
|
||||
hasACLsUpdatedEvent = true;
|
||||
Assert.assertEquals(4L, event.getTimestamp());
|
||||
for (int i = 1; i <= 2; ++i) {
|
||||
ApplicationId appId = ApplicationId.newInstance(0, i);
|
||||
RMApp app = createRMApp(appId);
|
||||
metricsPublisher.appCreated(app, app.getStartTime());
|
||||
metricsPublisher.appFinished(app, RMAppState.FINISHED, app.getFinishTime());
|
||||
if (i == 1) {
|
||||
metricsPublisher.appACLsUpdated(app, "uers1,user2", 4L);
|
||||
} else {
|
||||
// in case user doesn't specify the ACLs
|
||||
metricsPublisher.appACLsUpdated(app, null, 4L);
|
||||
}
|
||||
TimelineEntity entity = null;
|
||||
do {
|
||||
entity =
|
||||
store.getEntity(appId.toString(),
|
||||
ApplicationMetricsConstants.ENTITY_TYPE,
|
||||
EnumSet.allOf(Field.class));
|
||||
// ensure three events are both published before leaving the loop
|
||||
} while (entity == null || entity.getEvents().size() < 3);
|
||||
// verify all the fields
|
||||
Assert.assertEquals(ApplicationMetricsConstants.ENTITY_TYPE,
|
||||
entity.getEntityType());
|
||||
Assert
|
||||
.assertEquals(app.getApplicationId().toString(), entity.getEntityId());
|
||||
Assert
|
||||
.assertEquals(
|
||||
app.getName(),
|
||||
entity.getOtherInfo().get(
|
||||
ApplicationMetricsConstants.NAME_ENTITY_INFO));
|
||||
Assert.assertEquals(app.getQueue(),
|
||||
entity.getOtherInfo()
|
||||
.get(ApplicationMetricsConstants.QUEUE_ENTITY_INFO));
|
||||
Assert
|
||||
.assertEquals(
|
||||
app.getUser(),
|
||||
entity.getOtherInfo().get(
|
||||
ApplicationMetricsConstants.USER_ENTITY_INFO));
|
||||
Assert
|
||||
.assertEquals(
|
||||
app.getApplicationType(),
|
||||
entity.getOtherInfo().get(
|
||||
ApplicationMetricsConstants.TYPE_ENTITY_INFO));
|
||||
Assert.assertEquals(app.getSubmitTime(),
|
||||
entity.getOtherInfo().get(
|
||||
ApplicationMetricsConstants.SUBMITTED_TIME_ENTITY_INFO));
|
||||
if (i == 1) {
|
||||
Assert.assertEquals("uers1,user2",
|
||||
entity.getOtherInfo().get(
|
||||
ApplicationMetricsConstants.APP_VIEW_ACLS_ENTITY_INFO));
|
||||
} else {
|
||||
Assert.assertEquals("", entity.getOtherInfo().get(
|
||||
ApplicationMetricsConstants.APP_VIEW_ACLS_ENTITY_INFO));
|
||||
}
|
||||
boolean hasCreatedEvent = false;
|
||||
boolean hasFinishedEvent = false;
|
||||
boolean hasACLsUpdatedEvent = false;
|
||||
for (TimelineEvent event : entity.getEvents()) {
|
||||
if (event.getEventType().equals(
|
||||
ApplicationMetricsConstants.CREATED_EVENT_TYPE)) {
|
||||
hasCreatedEvent = true;
|
||||
Assert.assertEquals(app.getStartTime(), event.getTimestamp());
|
||||
} else if (event.getEventType().equals(
|
||||
ApplicationMetricsConstants.FINISHED_EVENT_TYPE)) {
|
||||
hasFinishedEvent = true;
|
||||
Assert.assertEquals(app.getFinishTime(), event.getTimestamp());
|
||||
Assert.assertEquals(
|
||||
app.getDiagnostics().toString(),
|
||||
event.getEventInfo().get(
|
||||
ApplicationMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO));
|
||||
Assert.assertEquals(
|
||||
app.getFinalApplicationStatus().toString(),
|
||||
event.getEventInfo().get(
|
||||
ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO));
|
||||
Assert.assertEquals(YarnApplicationState.FINISHED.toString(), event
|
||||
.getEventInfo().get(ApplicationMetricsConstants.STATE_EVENT_INFO));
|
||||
} else if (event.getEventType().equals(
|
||||
ApplicationMetricsConstants.ACLS_UPDATED_EVENT_TYPE)) {
|
||||
hasACLsUpdatedEvent = true;
|
||||
Assert.assertEquals(4L, event.getTimestamp());
|
||||
}
|
||||
}
|
||||
Assert.assertTrue(hasCreatedEvent && hasFinishedEvent && hasACLsUpdatedEvent);
|
||||
}
|
||||
Assert.assertTrue(hasCreatedEvent && hasFinishedEvent && hasACLsUpdatedEvent);
|
||||
}
|
||||
|
||||
@Test(timeout = 10000)
|
||||
|
Loading…
x
Reference in New Issue
Block a user