Merge remote-tracking branch 'apache-commit/trunk' into HDFS-6581

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
This commit is contained in:
arp 2014-08-29 13:37:17 -07:00
commit f65183eba3
51 changed files with 779 additions and 418 deletions

View File

@ -61,6 +61,16 @@
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.tomcat.embed</groupId>
<artifactId>tomcat-embed-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.tomcat.embed</groupId>
<artifactId>tomcat-embed-logging-juli</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>

View File

@ -519,9 +519,7 @@ public static void createAuthCookie(HttpServletResponse resp, String token,
StringBuilder sb = new StringBuilder(AuthenticatedURL.AUTH_COOKIE)
.append("=");
if (token != null && token.length() > 0) {
sb.append("\"")
.append(token)
.append("\"");
sb.append(token);
}
sb.append("; Version=1");

View File

@ -13,7 +13,22 @@
*/
package org.apache.hadoop.security.authentication.client;
import org.apache.catalina.deploy.FilterDef;
import org.apache.catalina.deploy.FilterMap;
import org.apache.catalina.startup.Tomcat;
import org.apache.hadoop.security.authentication.server.AuthenticationFilter;
import org.apache.http.HttpResponse;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.Credentials;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.client.params.AuthPolicy;
import org.apache.http.entity.InputStreamEntity;
import org.apache.http.impl.auth.SPNegoSchemeFactory;
import org.apache.http.impl.client.SystemDefaultHttpClient;
import org.apache.http.util.EntityUtils;
import org.mortbay.jetty.Server;
import org.mortbay.jetty.servlet.Context;
import org.mortbay.jetty.servlet.FilterHolder;
@ -24,16 +39,19 @@
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.Writer;
import java.net.HttpURLConnection;
import java.net.ServerSocket;
import java.net.URL;
import java.security.Principal;
import java.util.Properties;
import org.junit.Assert;
@ -41,10 +59,18 @@ public class AuthenticatorTestCase {
private Server server;
private String host = null;
private int port = -1;
private boolean useTomcat = false;
private Tomcat tomcat = null;
Context context;
private static Properties authenticatorConfig;
public AuthenticatorTestCase() {}
public AuthenticatorTestCase(boolean useTomcat) {
this.useTomcat = useTomcat;
}
protected static void setAuthenticationHandlerConfig(Properties config) {
authenticatorConfig = config;
}
@ -80,7 +106,19 @@ protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws S
}
}
protected int getLocalPort() throws Exception {
ServerSocket ss = new ServerSocket(0);
int ret = ss.getLocalPort();
ss.close();
return ret;
}
protected void start() throws Exception {
if (useTomcat) startTomcat();
else startJetty();
}
protected void startJetty() throws Exception {
server = new Server(0);
context = new Context();
context.setContextPath("/foo");
@ -88,16 +126,42 @@ protected void start() throws Exception {
context.addFilter(new FilterHolder(TestFilter.class), "/*", 0);
context.addServlet(new ServletHolder(TestServlet.class), "/bar");
host = "localhost";
ServerSocket ss = new ServerSocket(0);
port = ss.getLocalPort();
ss.close();
port = getLocalPort();
server.getConnectors()[0].setHost(host);
server.getConnectors()[0].setPort(port);
server.start();
System.out.println("Running embedded servlet container at: http://" + host + ":" + port);
}
protected void startTomcat() throws Exception {
tomcat = new Tomcat();
File base = new File(System.getProperty("java.io.tmpdir"));
org.apache.catalina.Context ctx =
tomcat.addContext("/foo",base.getAbsolutePath());
FilterDef fd = new FilterDef();
fd.setFilterClass(TestFilter.class.getName());
fd.setFilterName("TestFilter");
FilterMap fm = new FilterMap();
fm.setFilterName("TestFilter");
fm.addURLPattern("/*");
fm.addServletName("/bar");
ctx.addFilterDef(fd);
ctx.addFilterMap(fm);
tomcat.addServlet(ctx, "/bar", TestServlet.class.getName());
ctx.addServletMapping("/bar", "/bar");
host = "localhost";
port = getLocalPort();
tomcat.setHostname(host);
tomcat.setPort(port);
tomcat.start();
}
protected void stop() throws Exception {
if (useTomcat) stopTomcat();
else stopJetty();
}
protected void stopJetty() throws Exception {
try {
server.stop();
} catch (Exception e) {
@ -109,6 +173,18 @@ protected void stop() throws Exception {
}
}
protected void stopTomcat() throws Exception {
try {
tomcat.stop();
} catch (Exception e) {
}
try {
tomcat.destroy();
} catch (Exception e) {
}
}
protected String getBaseURL() {
return "http://" + host + ":" + port + "/foo/bar";
}
@ -165,4 +241,57 @@ protected void _testAuthentication(Authenticator authenticator, boolean doPost)
}
}
private SystemDefaultHttpClient getHttpClient() {
final SystemDefaultHttpClient httpClient = new SystemDefaultHttpClient();
httpClient.getAuthSchemes().register(AuthPolicy.SPNEGO, new SPNegoSchemeFactory(true));
Credentials use_jaas_creds = new Credentials() {
public String getPassword() {
return null;
}
public Principal getUserPrincipal() {
return null;
}
};
httpClient.getCredentialsProvider().setCredentials(
AuthScope.ANY, use_jaas_creds);
return httpClient;
}
private void doHttpClientRequest(HttpClient httpClient, HttpUriRequest request) throws Exception {
HttpResponse response = null;
try {
response = httpClient.execute(request);
final int httpStatus = response.getStatusLine().getStatusCode();
Assert.assertEquals(HttpURLConnection.HTTP_OK, httpStatus);
} finally {
if (response != null) EntityUtils.consumeQuietly(response.getEntity());
}
}
protected void _testAuthenticationHttpClient(Authenticator authenticator, boolean doPost) throws Exception {
start();
try {
SystemDefaultHttpClient httpClient = getHttpClient();
doHttpClientRequest(httpClient, new HttpGet(getBaseURL()));
// Always do a GET before POST to trigger the SPNego negotiation
if (doPost) {
HttpPost post = new HttpPost(getBaseURL());
byte [] postBytes = POST.getBytes();
ByteArrayInputStream bis = new ByteArrayInputStream(postBytes);
InputStreamEntity entity = new InputStreamEntity(bis, postBytes.length);
// Important that the entity is not repeatable -- this means if
// we have to renegotiate (e.g. b/c the cookie wasn't handled properly)
// the test will fail.
Assert.assertFalse(entity.isRepeatable());
post.setEntity(entity);
doHttpClientRequest(httpClient, post);
}
} finally {
stop();
}
}
}

View File

@ -20,16 +20,36 @@
import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler;
import org.junit.Assert;
import org.junit.Before;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.junit.runner.RunWith;
import org.junit.Test;
import java.io.File;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.Arrays;
import java.util.Collection;
import java.util.Properties;
import java.util.concurrent.Callable;
@RunWith(Parameterized.class)
public class TestKerberosAuthenticator extends KerberosSecurityTestcase {
private boolean useTomcat = false;
public TestKerberosAuthenticator(boolean useTomcat) {
this.useTomcat = useTomcat;
}
@Parameterized.Parameters
public static Collection booleans() {
return Arrays.asList(new Object[][] {
{ false },
{ true }
});
}
@Before
public void setup() throws Exception {
// create keytab
@ -53,7 +73,7 @@ private Properties getAuthenticationHandlerConfiguration() {
@Test(timeout=60000)
public void testFallbacktoPseudoAuthenticator() throws Exception {
AuthenticatorTestCase auth = new AuthenticatorTestCase();
AuthenticatorTestCase auth = new AuthenticatorTestCase(useTomcat);
Properties props = new Properties();
props.setProperty(AuthenticationFilter.AUTH_TYPE, "simple");
props.setProperty(PseudoAuthenticationHandler.ANONYMOUS_ALLOWED, "false");
@ -63,7 +83,7 @@ public void testFallbacktoPseudoAuthenticator() throws Exception {
@Test(timeout=60000)
public void testFallbacktoPseudoAuthenticatorAnonymous() throws Exception {
AuthenticatorTestCase auth = new AuthenticatorTestCase();
AuthenticatorTestCase auth = new AuthenticatorTestCase(useTomcat);
Properties props = new Properties();
props.setProperty(AuthenticationFilter.AUTH_TYPE, "simple");
props.setProperty(PseudoAuthenticationHandler.ANONYMOUS_ALLOWED, "true");
@ -73,7 +93,7 @@ public void testFallbacktoPseudoAuthenticatorAnonymous() throws Exception {
@Test(timeout=60000)
public void testNotAuthenticated() throws Exception {
AuthenticatorTestCase auth = new AuthenticatorTestCase();
AuthenticatorTestCase auth = new AuthenticatorTestCase(useTomcat);
AuthenticatorTestCase.setAuthenticationHandlerConfig(getAuthenticationHandlerConfiguration());
auth.start();
try {
@ -89,7 +109,7 @@ public void testNotAuthenticated() throws Exception {
@Test(timeout=60000)
public void testAuthentication() throws Exception {
final AuthenticatorTestCase auth = new AuthenticatorTestCase();
final AuthenticatorTestCase auth = new AuthenticatorTestCase(useTomcat);
AuthenticatorTestCase.setAuthenticationHandlerConfig(
getAuthenticationHandlerConfiguration());
KerberosTestUtils.doAsClient(new Callable<Void>() {
@ -103,7 +123,7 @@ public Void call() throws Exception {
@Test(timeout=60000)
public void testAuthenticationPost() throws Exception {
final AuthenticatorTestCase auth = new AuthenticatorTestCase();
final AuthenticatorTestCase auth = new AuthenticatorTestCase(useTomcat);
AuthenticatorTestCase.setAuthenticationHandlerConfig(
getAuthenticationHandlerConfiguration());
KerberosTestUtils.doAsClient(new Callable<Void>() {
@ -114,4 +134,32 @@ public Void call() throws Exception {
}
});
}
@Test(timeout=60000)
public void testAuthenticationHttpClient() throws Exception {
final AuthenticatorTestCase auth = new AuthenticatorTestCase(useTomcat);
AuthenticatorTestCase.setAuthenticationHandlerConfig(
getAuthenticationHandlerConfiguration());
KerberosTestUtils.doAsClient(new Callable<Void>() {
@Override
public Void call() throws Exception {
auth._testAuthenticationHttpClient(new KerberosAuthenticator(), false);
return null;
}
});
}
@Test(timeout=60000)
public void testAuthenticationHttpClientPost() throws Exception {
final AuthenticatorTestCase auth = new AuthenticatorTestCase(useTomcat);
AuthenticatorTestCase.setAuthenticationHandlerConfig(
getAuthenticationHandlerConfiguration());
KerberosTestUtils.doAsClient(new Callable<Void>() {
@Override
public Void call() throws Exception {
auth._testAuthenticationHttpClient(new KerberosAuthenticator(), true);
return null;
}
});
}
}

View File

@ -473,6 +473,9 @@ Release 2.6.0 - UNRELEASED
HADOOP-11005. Fix HTTP content type for ReconfigurationServlet.
(Lei Xu via wang)
HADOOP-10814. Update Tomcat version used by HttpFS and KMS to latest
6.x version. (rkanter via tucu)
OPTIMIZATIONS
HADOOP-10838. Byte array native checksumming. (James Thomas via todd)
@ -710,6 +713,9 @@ Release 2.6.0 - UNRELEASED
loaded. (umamahesh)
--
HADOOP-10911. hadoop.auth cookie after HADOOP-10710 still not proper
according to RFC2109. (gchanan via tucu)
Release 2.5.1 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -34,7 +34,6 @@
<description>Apache Hadoop KMS</description>
<properties>
<tomcat.version>6.0.36</tomcat.version>
<kms.tomcat.dist.dir>
${project.build.directory}/${project.artifactId}-${project.version}/share/hadoop/kms/tomcat
</kms.tomcat.dist.dir>

View File

@ -34,7 +34,6 @@
<description>Apache Hadoop HttpFS</description>
<properties>
<tomcat.version>6.0.36</tomcat.version>
<httpfs.source.repository>REPO NOT AVAIL</httpfs.source.repository>
<httpfs.source.repository>REPO NOT AVAIL</httpfs.source.repository>
<httpfs.source.revision>REVISION NOT AVAIL</httpfs.source.revision>

View File

@ -427,6 +427,9 @@ Release 2.6.0 - UNRELEASED
HDFS-6879. Adding tracing to Hadoop RPC (Masatake Iwasaki via Colin Patrick
McCabe)
HDFS-6774. Make FsDataset and DataStore support removing volumes. (Lei Xu
via atm)
OPTIMIZATIONS
HDFS-6690. Deduplicate xattr names in memory. (wang)
@ -571,6 +574,9 @@ Release 2.6.0 - UNRELEASED
HDFS-6902. FileWriter should be closed in finally block in
BlockReceiver#receiveBlock() (Tsuyoshi OZAWA via Colin Patrick McCabe)
HDFS-6800. Support Datanode layout changes with rolling upgrade.
(James Thomas via Arpit Agarwal)
BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS
HDFS-6387. HDFS CLI admin tool for creating & deleting an

View File

@ -201,6 +201,20 @@ private void format(StorageDirectory bpSdir, NamespaceInfo nsInfo) throws IOExce
writeProperties(bpSdir);
}
/**
* Remove storage directories.
* @param storageDirs a set of storage directories to be removed.
*/
void removeVolumes(Set<File> storageDirs) {
for (Iterator<StorageDirectory> it = this.storageDirs.iterator();
it.hasNext(); ) {
StorageDirectory sd = it.next();
if (storageDirs.contains(sd.getRoot())) {
it.remove();
}
}
}
/**
* Set layoutVersion, namespaceID and blockpoolID into block pool storage
* VERSION file
@ -255,7 +269,14 @@ protected void setFieldsFromProperties(Properties props, StorageDirectory sd)
*/
private void doTransition(DataNode datanode, StorageDirectory sd,
NamespaceInfo nsInfo, StartupOption startOpt) throws IOException {
if (startOpt == StartupOption.ROLLBACK) {
if (startOpt == StartupOption.ROLLBACK && sd.getPreviousDir().exists()) {
// we will already restore everything in the trash by rolling back to
// the previous directory, so we must delete the trash to ensure
// that it's not restored by BPOfferService.signalRollingUpgrade()
if (!FileUtil.fullyDelete(getTrashRootDir(sd))) {
throw new IOException("Unable to delete trash directory prior to " +
"restoration of previous directory: " + getTrashRootDir(sd));
}
doRollback(sd, nsInfo); // rollback if applicable
} else {
// Restore all the files in the trash. The restored files are retained

View File

@ -244,10 +244,9 @@ public class DataNode extends Configured
LogFactory.getLog(DataNode.class.getName() + ".clienttrace");
private static final String USAGE =
"Usage: java DataNode [-regular | -rollback | -rollingupgrade rollback]\n" +
"Usage: java DataNode [-regular | -rollback]\n" +
" -regular : Normal DataNode startup (default).\n" +
" -rollback : Rollback a standard upgrade.\n" +
" -rollingupgrade rollback : Rollback a rolling upgrade operation.\n" +
" -rollback : Rollback a standard or rolling upgrade.\n" +
" Refer to HDFS documentation for the difference between standard\n" +
" and rolling upgrades.";

View File

@ -337,6 +337,33 @@ private synchronized void addStorageLocations(DataNode datanode,
}
}
/**
* Remove volumes from DataStorage.
* @param locations a collection of volumes.
*/
synchronized void removeVolumes(Collection<StorageLocation> locations) {
if (locations.isEmpty()) {
return;
}
Set<File> dataDirs = new HashSet<File>();
for (StorageLocation sl : locations) {
dataDirs.add(sl.getFile());
}
for (BlockPoolSliceStorage bpsStorage : this.bpStorageMap.values()) {
bpsStorage.removeVolumes(dataDirs);
}
for (Iterator<StorageDirectory> it = this.storageDirs.iterator();
it.hasNext(); ) {
StorageDirectory sd = it.next();
if (dataDirs.contains(sd.getRoot())) {
it.remove();
}
}
}
/**
* Analyze storage directories.
* Recover from previous transitions if required.

View File

@ -97,6 +97,9 @@ public RollingLogs createRollingLogs(String bpid, String prefix
public void addVolumes(Collection<StorageLocation> volumes)
throws IOException;
/** Removes a collection of volumes from FsDataset. */
public void removeVolumes(Collection<StorageLocation> volumes);
/** @return a storage with the given storage ID */
public DatanodeStorage getStorage(final String storageUuid);

View File

@ -442,7 +442,7 @@ void addToReplicasMap(ReplicaMap volumeMap, File dir,
loadRwr = false;
}
sc.close();
if (restartMeta.delete()) {
if (!restartMeta.delete()) {
FsDatasetImpl.LOG.warn("Failed to delete restart meta file: " +
restartMeta.getPath());
}

View File

@ -118,6 +118,24 @@ synchronized void addVolume(File volume) {
}
addExecutorForVolume(volume);
}
/**
* Stops AsyncDiskService for a volume.
* @param volume the root of the volume.
*/
synchronized void removeVolume(File volume) {
if (executors == null) {
throw new RuntimeException("AsyncDiskService is already shutdown");
}
ThreadPoolExecutor executor = executors.get(volume);
if (executor == null) {
throw new RuntimeException("Can not find volume " + volume
+ " to remove.");
} else {
executor.shutdown();
executors.remove(volume);
}
}
synchronized long countPendingDeletions() {
long count = 0;

View File

@ -27,7 +27,14 @@
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.*;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import javax.management.NotCompliantMBeanException;
@ -328,6 +335,51 @@ public synchronized void addVolumes(Collection<StorageLocation> volumes)
}
}
/**
* Removes a collection of volumes from FsDataset.
* @param volumes the root directories of the volumes.
*
* DataNode should call this function before calling
* {@link DataStorage#removeVolumes(java.util.Collection)}.
*/
@Override
public synchronized void removeVolumes(Collection<StorageLocation> volumes) {
Set<File> volumeSet = new HashSet<File>();
for (StorageLocation sl : volumes) {
volumeSet.add(sl.getFile());
}
for (int idx = 0; idx < dataStorage.getNumStorageDirs(); idx++) {
Storage.StorageDirectory sd = dataStorage.getStorageDir(idx);
if (volumeSet.contains(sd.getRoot())) {
String volume = sd.getRoot().toString();
LOG.info("Removing " + volume + " from FsDataset.");
this.volumes.removeVolume(volume);
storageMap.remove(sd.getStorageUuid());
asyncDiskService.removeVolume(sd.getCurrentDir());
// Removed all replica information for the blocks on the volume. Unlike
// updating the volumeMap in addVolume(), this operation does not scan
// disks.
for (String bpid : volumeMap.getBlockPoolList()) {
List<Block> blocks = new ArrayList<Block>();
for (Iterator<ReplicaInfo> it = volumeMap.replicas(bpid).iterator();
it.hasNext(); ) {
ReplicaInfo block = it.next();
if (block.getVolume().getBasePath().equals(volume)) {
invalidate(bpid, block.getBlockId());
blocks.add(block);
it.remove();
}
}
// Delete blocks from the block scanner in batch.
datanode.getBlockScanner().deleteBlocks(bpid,
blocks.toArray(new Block[blocks.size()]));
}
}
}
}
private StorageType getStorageTypeFromLocations(
Collection<StorageLocation> dataLocations, File dir) {
for (StorageLocation dataLocation : dataLocations) {
@ -1465,6 +1517,28 @@ public void invalidate(String bpid, Block invalidBlks[]) throws IOException {
}
}
/**
* Invalidate a block but does not delete the actual on-disk block file.
*
* It should only be used for decommissioning disks.
*
* @param bpid the block pool ID.
* @param blockId the ID of the block.
*/
public void invalidate(String bpid, long blockId) {
// If a DFSClient has the replica in its cache of short-circuit file
// descriptors (and the client is using ShortCircuitShm), invalidate it.
// The short-circuit registry is null in the unit tests, because the
// datanode is mock object.
if (datanode.getShortCircuitRegistry() != null) {
datanode.getShortCircuitRegistry().processBlockInvalidation(
new ExtendedBlockId(blockId, bpid));
// If the block is cached, start uncaching it.
cacheManager.uncacheBlock(bpid, blockId);
}
}
/**
* Asynchronously attempts to cache a single block via {@link FsDatasetCache}.
*/

View File

@ -222,6 +222,25 @@ synchronized void addVolume(FsVolumeImpl newVolume) {
FsDatasetImpl.LOG.info("Added new volume: " + newVolume.toString());
}
/**
* Dynamically remove volume to the list.
* @param volume the volume to be removed.
*/
synchronized void removeVolume(String volume) {
// Make a copy of volumes to remove one volume.
final List<FsVolumeImpl> volumeList = new ArrayList<FsVolumeImpl>(volumes);
for (Iterator<FsVolumeImpl> it = volumeList.iterator(); it.hasNext(); ) {
FsVolumeImpl fsVolume = it.next();
if (fsVolume.getBasePath().equals(volume)) {
fsVolume.shutdown();
it.remove();
volumes = Collections.unmodifiableList(volumeList);
FsDatasetImpl.LOG.info("Removed volume: " + volume);
break;
}
}
}
void addBlockPool(final String bpid, final Configuration conf) throws IOException {
long totalStartTime = Time.monotonicNow();

View File

@ -1073,6 +1073,7 @@ public DatanodeCommand blockReport(DatanodeRegistration nodeReg,
}
if (nn.getFSImage().isUpgradeFinalized() &&
!namesystem.isRollingUpgrade() &&
!nn.isStandbyState() &&
noStaleStorages) {
return new FinalizeCommand(poolId);

View File

@ -206,7 +206,7 @@
<li>Restore the pre-upgrade release in all machines.</li>
<li>Start <em>NNs</em> with the
"<a href="#namenode_-rollingUpgrade"><code>-rollingUpgrade rollback</code></a>" option.</li>
<li>Start <em>DNs</em> normally.</li>
<li>Start <em>DNs</em> with the "<code>-rollback</code>" option.</li>
</ol></li>
</ul>

View File

@ -1131,6 +1131,11 @@ public FsVolumeSpi getVolume(ExtendedBlock b) {
throw new UnsupportedOperationException();
}
@Override
public synchronized void removeVolumes(Collection<StorageLocation> volumes) {
throw new UnsupportedOperationException();
}
@Override
public void submitBackgroundSyncFileRangeRequest(ExtendedBlock block,
FileDescriptor fd, long offset, long nbytes, int flags) {

View File

@ -18,12 +18,20 @@
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystemTestHelper;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.datanode.DNConf;
import org.apache.hadoop.hdfs.server.datanode.DataBlockScanner;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.StringUtils;
import org.junit.Before;
import org.junit.Test;
@ -35,25 +43,44 @@
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class TestFsDatasetImpl {
private static final String BASE_DIR =
System.getProperty("test.build.dir") + "/fsdatasetimpl";
new FileSystemTestHelper().getTestRootDir();
private static final int NUM_INIT_VOLUMES = 2;
private static final String[] BLOCK_POOL_IDS = {"bpid-0", "bpid-1"};
// Use to generate storageUuid
private static final DataStorage dsForStorageUuid = new DataStorage(
new StorageInfo(HdfsServerConstants.NodeType.DATA_NODE));
private Configuration conf;
private DataStorage storage;
private DataBlockScanner scanner;
private FsDatasetImpl dataset;
private static Storage.StorageDirectory createStorageDirectory(File root) {
Storage.StorageDirectory sd = new Storage.StorageDirectory(root);
dsForStorageUuid.createStorageID(sd);
return sd;
}
private static void createStorageDirs(DataStorage storage, Configuration conf,
int numDirs) throws IOException {
List<Storage.StorageDirectory> dirs =
new ArrayList<Storage.StorageDirectory>();
List<String> dirStrings = new ArrayList<String>();
for (int i = 0; i < numDirs; i++) {
String loc = BASE_DIR + "/data" + i;
dirStrings.add(loc);
dirs.add(new Storage.StorageDirectory(new File(loc)));
File loc = new File(BASE_DIR + "/data" + i);
dirStrings.add(loc.toString());
loc.mkdirs();
dirs.add(createStorageDirectory(loc));
when(storage.getStorageDir(i)).thenReturn(dirs.get(i));
}
@ -66,14 +93,19 @@ private static void createStorageDirs(DataStorage storage, Configuration conf,
public void setUp() throws IOException {
final DataNode datanode = Mockito.mock(DataNode.class);
storage = Mockito.mock(DataStorage.class);
Configuration conf = new Configuration();
scanner = Mockito.mock(DataBlockScanner.class);
this.conf = new Configuration();
final DNConf dnConf = new DNConf(conf);
when(datanode.getConf()).thenReturn(conf);
when(datanode.getDnConf()).thenReturn(dnConf);
when(datanode.getBlockScanner()).thenReturn(scanner);
createStorageDirs(storage, conf, NUM_INIT_VOLUMES);
dataset = new FsDatasetImpl(datanode, storage, conf);
for (String bpid : BLOCK_POOL_IDS) {
dataset.addBlockPool(bpid, conf);
}
assertEquals(NUM_INIT_VOLUMES, dataset.getVolumes().size());
assertEquals(0, dataset.getNumFailedVolumes());
@ -89,15 +121,63 @@ public void testAddVolumes() throws IOException {
String path = BASE_DIR + "/newData" + i;
newLocations.add(StorageLocation.parse(path));
when(storage.getStorageDir(numExistingVolumes + i))
.thenReturn(new Storage.StorageDirectory(new File(path)));
.thenReturn(createStorageDirectory(new File(path)));
}
when(storage.getNumStorageDirs()).thenReturn(totalVolumes);
dataset.addVolumes(newLocations);
assertEquals(totalVolumes, dataset.getVolumes().size());
assertEquals(totalVolumes, dataset.storageMap.size());
for (int i = 0; i < numNewVolumes; i++) {
assertEquals(newLocations.get(i).getFile().getPath(),
dataset.getVolumes().get(numExistingVolumes + i).getBasePath());
}
}
@Test
public void testRemoveVolumes() throws IOException {
// Feed FsDataset with block metadata.
final int NUM_BLOCKS = 100;
for (int i = 0; i < NUM_BLOCKS; i++) {
String bpid = BLOCK_POOL_IDS[NUM_BLOCKS % BLOCK_POOL_IDS.length];
ExtendedBlock eb = new ExtendedBlock(bpid, i);
dataset.createRbw(StorageType.DEFAULT, eb, false);
}
final String[] dataDirs =
conf.get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY).split(",");
final String volumePathToRemove = dataDirs[0];
List<StorageLocation> volumesToRemove = new ArrayList<StorageLocation>();
volumesToRemove.add(StorageLocation.parse(volumePathToRemove));
dataset.removeVolumes(volumesToRemove);
int expectedNumVolumes = dataDirs.length - 1;
assertEquals("The volume has been removed from the volumeList.",
expectedNumVolumes, dataset.getVolumes().size());
assertEquals("The volume has been removed from the storageMap.",
expectedNumVolumes, dataset.storageMap.size());
try {
dataset.asyncDiskService.execute(volumesToRemove.get(0).getFile(),
new Runnable() {
@Override
public void run() {}
});
fail("Expect RuntimeException: the volume has been removed from the "
+ "AsyncDiskService.");
} catch (RuntimeException e) {
GenericTestUtils.assertExceptionContains("Cannot find root", e);
}
int totalNumReplicas = 0;
for (String bpid : dataset.volumeMap.getBlockPoolList()) {
totalNumReplicas += dataset.volumeMap.size(bpid);
}
assertEquals("The replica infos on this volume has been removed from the "
+ "volumeMap.", NUM_BLOCKS / NUM_INIT_VOLUMES,
totalNumReplicas);
// Verify that every BlockPool deletes the removed blocks from the volume.
verify(scanner, times(BLOCK_POOL_IDS.length))
.deleteBlocks(anyString(), any(Block[].class));
}
}

View File

@ -264,6 +264,9 @@ Release 2.6.0 - UNRELEASED
MAPREDUCE-6051. Fix typos in log messages. (Ray Chiang via cdouglas)
MAPREDUCE-5931. Validate SleepJob command line parameters (Gera Shegalov
via jlowe)
Release 2.5.1 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -1,275 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop;
import java.io.IOException;
import java.io.DataInput;
import java.io.DataOutput;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* Dummy class for testing MR framefork. Sleeps for a defined period
* of time in mapper and reducer. Generates fake input for map / reduce
* jobs. Note that generated number of input pairs is in the order
* of <code>numMappers * mapSleepTime / 100</code>, so the job uses
* some disk space.
*/
public class SleepJob extends Configured implements Tool {
public static String MAP_SLEEP_COUNT = "mapreduce.sleepjob.map.sleep.count";
public static String REDUCE_SLEEP_COUNT =
"mapreduce.sleepjob.reduce.sleep.count";
public static String MAP_SLEEP_TIME = "mapreduce.sleepjob.map.sleep.time";
public static String REDUCE_SLEEP_TIME =
"mapreduce.sleepjob.reduce.sleep.time";
public static class SleepJobPartitioner extends
Partitioner<IntWritable, NullWritable> {
public int getPartition(IntWritable k, NullWritable v, int numPartitions) {
return k.get() % numPartitions;
}
}
public static class EmptySplit extends InputSplit implements Writable {
public void write(DataOutput out) throws IOException { }
public void readFields(DataInput in) throws IOException { }
public long getLength() { return 0L; }
public String[] getLocations() { return new String[0]; }
}
public static class SleepInputFormat
extends InputFormat<IntWritable,IntWritable> {
public List<InputSplit> getSplits(JobContext jobContext) {
List<InputSplit> ret = new ArrayList<InputSplit>();
int numSplits = jobContext.getConfiguration().
getInt(MRJobConfig.NUM_MAPS, 1);
for (int i = 0; i < numSplits; ++i) {
ret.add(new EmptySplit());
}
return ret;
}
public RecordReader<IntWritable,IntWritable> createRecordReader(
InputSplit ignored, TaskAttemptContext taskContext)
throws IOException {
Configuration conf = taskContext.getConfiguration();
final int count = conf.getInt(MAP_SLEEP_COUNT, 1);
if (count < 0) throw new IOException("Invalid map count: " + count);
final int redcount = conf.getInt(REDUCE_SLEEP_COUNT, 1);
if (redcount < 0)
throw new IOException("Invalid reduce count: " + redcount);
final int emitPerMapTask = (redcount * taskContext.getNumReduceTasks());
return new RecordReader<IntWritable,IntWritable>() {
private int records = 0;
private int emitCount = 0;
private IntWritable key = null;
private IntWritable value = null;
public void initialize(InputSplit split, TaskAttemptContext context) {
}
public boolean nextKeyValue()
throws IOException {
if (count == 0) {
return false;
}
key = new IntWritable();
key.set(emitCount);
int emit = emitPerMapTask / count;
if ((emitPerMapTask) % count > records) {
++emit;
}
emitCount += emit;
value = new IntWritable();
value.set(emit);
return records++ < count;
}
public IntWritable getCurrentKey() { return key; }
public IntWritable getCurrentValue() { return value; }
public void close() throws IOException { }
public float getProgress() throws IOException {
return count == 0 ? 100 : records / ((float)count);
}
};
}
}
public static class SleepMapper
extends Mapper<IntWritable, IntWritable, IntWritable, NullWritable> {
private long mapSleepDuration = 100;
private int mapSleepCount = 1;
private int count = 0;
protected void setup(Context context)
throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
this.mapSleepCount =
conf.getInt(MAP_SLEEP_COUNT, mapSleepCount);
this.mapSleepDuration = mapSleepCount == 0 ? 0 :
conf.getLong(MAP_SLEEP_TIME , 100) / mapSleepCount;
}
public void map(IntWritable key, IntWritable value, Context context
) throws IOException, InterruptedException {
//it is expected that every map processes mapSleepCount number of records.
try {
context.setStatus("Sleeping... (" +
(mapSleepDuration * (mapSleepCount - count)) + ") ms left");
Thread.sleep(mapSleepDuration);
}
catch (InterruptedException ex) {
throw (IOException)new IOException(
"Interrupted while sleeping").initCause(ex);
}
++count;
// output reduceSleepCount * numReduce number of random values, so that
// each reducer will get reduceSleepCount number of keys.
int k = key.get();
for (int i = 0; i < value.get(); ++i) {
context.write(new IntWritable(k + i), NullWritable.get());
}
}
}
public static class SleepReducer
extends Reducer<IntWritable, NullWritable, NullWritable, NullWritable> {
private long reduceSleepDuration = 100;
private int reduceSleepCount = 1;
private int count = 0;
protected void setup(Context context)
throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
this.reduceSleepCount =
conf.getInt(REDUCE_SLEEP_COUNT, reduceSleepCount);
this.reduceSleepDuration = reduceSleepCount == 0 ? 0 :
conf.getLong(REDUCE_SLEEP_TIME , 100) / reduceSleepCount;
}
public void reduce(IntWritable key, Iterable<NullWritable> values,
Context context)
throws IOException {
try {
context.setStatus("Sleeping... (" +
(reduceSleepDuration * (reduceSleepCount - count)) + ") ms left");
Thread.sleep(reduceSleepDuration);
}
catch (InterruptedException ex) {
throw (IOException)new IOException(
"Interrupted while sleeping").initCause(ex);
}
count++;
}
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new SleepJob(), args);
System.exit(res);
}
public Job createJob(int numMapper, int numReducer,
long mapSleepTime, int mapSleepCount,
long reduceSleepTime, int reduceSleepCount)
throws IOException {
Configuration conf = getConf();
conf.setLong(MAP_SLEEP_TIME, mapSleepTime);
conf.setLong(REDUCE_SLEEP_TIME, reduceSleepTime);
conf.setInt(MAP_SLEEP_COUNT, mapSleepCount);
conf.setInt(REDUCE_SLEEP_COUNT, reduceSleepCount);
conf.setInt(MRJobConfig.NUM_MAPS, numMapper);
Job job = Job.getInstance(conf, "sleep");
job.setNumReduceTasks(numReducer);
job.setJarByClass(SleepJob.class);
job.setMapperClass(SleepMapper.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(NullWritable.class);
job.setReducerClass(SleepReducer.class);
job.setOutputFormatClass(NullOutputFormat.class);
job.setInputFormatClass(SleepInputFormat.class);
job.setPartitionerClass(SleepJobPartitioner.class);
job.setSpeculativeExecution(false);
job.setJobName("Sleep job");
FileInputFormat.addInputPath(job, new Path("ignored"));
return job;
}
public int run(String[] args) throws Exception {
if(args.length < 1) {
System.err.println("SleepJob [-m numMapper] [-r numReducer]" +
" [-mt mapSleepTime (msec)] [-rt reduceSleepTime (msec)]" +
" [-recordt recordSleepTime (msec)]");
ToolRunner.printGenericCommandUsage(System.err);
return 2;
}
int numMapper = 1, numReducer = 1;
long mapSleepTime = 100, reduceSleepTime = 100, recSleepTime = 100;
int mapSleepCount = 1, reduceSleepCount = 1;
for(int i=0; i < args.length; i++ ) {
if(args[i].equals("-m")) {
numMapper = Integer.parseInt(args[++i]);
}
else if(args[i].equals("-r")) {
numReducer = Integer.parseInt(args[++i]);
}
else if(args[i].equals("-mt")) {
mapSleepTime = Long.parseLong(args[++i]);
}
else if(args[i].equals("-rt")) {
reduceSleepTime = Long.parseLong(args[++i]);
}
else if (args[i].equals("-recordt")) {
recSleepTime = Long.parseLong(args[++i]);
}
}
// sleep for *SleepTime duration in Task by recSleepTime per record
mapSleepCount = (int)Math.ceil(mapSleepTime / ((double)recSleepTime));
reduceSleepCount = (int)Math.ceil(reduceSleepTime / ((double)recSleepTime));
Job job = createJob(numMapper, numReducer, mapSleepTime,
mapSleepCount, reduceSleepTime, reduceSleepCount);
return job.waitForCompletion(true) ? 0 : 1;
}
}

View File

@ -224,11 +224,7 @@ public Job createJob(int numMapper, int numReducer,
public int run(String[] args) throws Exception {
if(args.length < 1) {
System.err.println("SleepJob [-m numMapper] [-r numReducer]" +
" [-mt mapSleepTime (msec)] [-rt reduceSleepTime (msec)]" +
" [-recordt recordSleepTime (msec)]");
ToolRunner.printGenericCommandUsage(System.err);
return 2;
return printUsage("number of arguments must be > 0");
}
int numMapper = 1, numReducer = 1;
@ -238,18 +234,34 @@ public int run(String[] args) throws Exception {
for(int i=0; i < args.length; i++ ) {
if(args[i].equals("-m")) {
numMapper = Integer.parseInt(args[++i]);
if (numMapper < 0) {
return printUsage(numMapper + ": numMapper must be >= 0");
}
}
else if(args[i].equals("-r")) {
numReducer = Integer.parseInt(args[++i]);
if (numReducer < 0) {
return printUsage(numReducer + ": numReducer must be >= 0");
}
}
else if(args[i].equals("-mt")) {
mapSleepTime = Long.parseLong(args[++i]);
if (mapSleepTime < 0) {
return printUsage(mapSleepTime + ": mapSleepTime must be >= 0");
}
}
else if(args[i].equals("-rt")) {
reduceSleepTime = Long.parseLong(args[++i]);
if (reduceSleepTime < 0) {
return printUsage(
reduceSleepTime + ": reduceSleepTime must be >= 0");
}
}
else if (args[i].equals("-recordt")) {
recSleepTime = Long.parseLong(args[++i]);
if (recSleepTime < 0) {
return printUsage(recSleepTime + ": recordSleepTime must be >= 0");
}
}
}
@ -261,4 +273,14 @@ else if (args[i].equals("-recordt")) {
return job.waitForCompletion(true) ? 0 : 1;
}
private int printUsage(String error) {
if (error != null) {
System.err.println("ERROR: " + error);
}
System.err.println("SleepJob [-m numMapper] [-r numReducer]" +
" [-mt mapSleepTime (msec)] [-rt reduceSleepTime (msec)]" +
" [-recordt recordSleepTime (msec)]");
ToolRunner.printGenericCommandUsage(System.err);
return 2;
}
}

View File

@ -25,7 +25,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.SleepJob;
import org.apache.hadoop.mapreduce.SleepJob;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

View File

@ -40,8 +40,8 @@
import org.apache.hadoop.FailingMapper;
import org.apache.hadoop.RandomTextWriterJob;
import org.apache.hadoop.RandomTextWriterJob.RandomInputFormat;
import org.apache.hadoop.SleepJob;
import org.apache.hadoop.SleepJob.SleepMapper;
import org.apache.hadoop.mapreduce.SleepJob;
import org.apache.hadoop.mapreduce.SleepJob.SleepMapper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FSDataOutputStream;

View File

@ -28,7 +28,7 @@
import org.apache.avro.AvroRemoteException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.SleepJob;
import org.apache.hadoop.mapreduce.SleepJob;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

View File

@ -29,7 +29,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.SleepJob;
import org.apache.hadoop.mapreduce.SleepJob;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.permission.FsPermission;

View File

@ -67,6 +67,8 @@
<protoc.path>${env.HADOOP_PROTOC_PATH}</protoc.path>
<zookeeper.version>3.4.6</zookeeper.version>
<tomcat.version>6.0.41</tomcat.version>
</properties>
<dependencyManagement>
@ -398,6 +400,16 @@
<artifactId>jetty-util</artifactId>
<version>6.1.26</version>
</dependency>
<dependency>
<groupId>org.apache.tomcat.embed</groupId>
<artifactId>tomcat-embed-core</artifactId>
<version>7.0.55</version>
</dependency>
<dependency>
<groupId>org.apache.tomcat.embed</groupId>
<artifactId>tomcat-embed-logging-juli</artifactId>
<version>7.0.55</version>
</dependency>
<dependency>
<groupId>javax.servlet.jsp</groupId>
<artifactId>jsp-api</artifactId>

View File

@ -160,6 +160,9 @@ Release 2.6.0 - UNRELEASED
YARN-2182. Updated ContainerId#toString() to append RM Epoch number.
(Tsuyoshi OZAWA via jianhe)
YARN-2406. Move RM recovery related proto to
yarn_server_resourcemanager_recovery.proto. (Tsuyoshi Ozawa via jianhe)
OPTIMIZATIONS
BUG FIXES
@ -246,6 +249,20 @@ Release 2.6.0 - UNRELEASED
YARN-2035. FileSystemApplicationHistoryStore should not make working dir
when it already exists. (Jonathan Eagles via zjshen)
YARN-2405. NPE in FairSchedulerAppsBlock. (Tsuyoshi Ozawa via kasha)
YARN-2449. Fixed the bug that TimelineAuthenticationFilterInitializer
is not automatically added when hadoop.http.filter.initializers is not
configured. (Varun Vasudev via zjshen)
YARN-2450. Fix typos in log messages. (Ray Chiang via hitesh)
YARN-2447. RM web service app submission doesn't pass secrets correctly.
(Varun Vasudev via jianhe)
YARN-2462. TestNodeManagerResync#testBlockNewContainerRequestsOnStartAndResync
should have a test timeout (Eric Payne via jlowe)
Release 2.5.1 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -75,64 +75,6 @@ message UpdateNodeResourceRequestProto {
message UpdateNodeResourceResponseProto {
}
////////////////////////////////////////////////////////////////////////
////// RM recovery related records /////////////////////////////////////
////////////////////////////////////////////////////////////////////////
enum RMAppAttemptStateProto {
RMATTEMPT_NEW = 1;
RMATTEMPT_SUBMITTED = 2;
RMATTEMPT_SCHEDULED = 3;
RMATTEMPT_ALLOCATED = 4;
RMATTEMPT_LAUNCHED = 5;
RMATTEMPT_FAILED = 6;
RMATTEMPT_RUNNING = 7;
RMATTEMPT_FINISHING = 8;
RMATTEMPT_FINISHED = 9;
RMATTEMPT_KILLED = 10;
RMATTEMPT_ALLOCATED_SAVING = 11;
RMATTEMPT_LAUNCHED_UNMANAGED_SAVING = 12;
RMATTEMPT_RECOVERED = 13;
RMATTEMPT_FINAL_SAVING = 14;
}
enum RMAppStateProto {
RMAPP_NEW = 1;
RMAPP_NEW_SAVING = 2;
RMAPP_SUBMITTED = 3;
RMAPP_ACCEPTED = 4;
RMAPP_RUNNING = 5;
RMAPP_FINAL_SAVING = 6;
RMAPP_FINISHING = 7;
RMAPP_FINISHED = 8;
RMAPP_FAILED = 9;
RMAPP_KILLED = 10;
}
message ApplicationStateDataProto {
optional int64 submit_time = 1;
optional ApplicationSubmissionContextProto application_submission_context = 2;
optional string user = 3;
optional int64 start_time = 4;
optional RMAppStateProto application_state = 5;
optional string diagnostics = 6 [default = "N/A"];
optional int64 finish_time = 7;
}
message ApplicationAttemptStateDataProto {
optional ApplicationAttemptIdProto attemptId = 1;
optional ContainerProto master_container = 2;
optional bytes app_attempt_tokens = 3;
optional RMAppAttemptStateProto app_attempt_state = 4;
optional string final_tracking_url = 5;
optional string diagnostics = 6 [default = "N/A"];
optional int64 start_time = 7;
optional FinalApplicationStatusProto final_application_status = 8;
optional int32 am_container_exit_status = 9 [default = -1000];
}
message EpochProto {
optional int64 epoch = 1;
}
//////////////////////////////////////////////////////////////////
///////////// RM Failover related records ////////////////////////

View File

@ -519,7 +519,7 @@ public void run() throws YarnException, IOException {
publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
DSEvent.DS_APP_ATTEMPT_START);
} catch (Exception e) {
LOG.error("App Attempt start event coud not be pulished for "
LOG.error("App Attempt start event could not be published for "
+ appAttemptID.toString(), e);
}
@ -616,7 +616,7 @@ public void run() throws YarnException, IOException {
publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
DSEvent.DS_APP_ATTEMPT_END);
} catch (Exception e) {
LOG.error("App Attempt start event coud not be pulished for "
LOG.error("App Attempt start event could not be published for "
+ appAttemptID.toString(), e);
}
}
@ -726,7 +726,7 @@ public void onContainersCompleted(List<ContainerStatus> completedContainers) {
try {
publishContainerEndEvent(timelineClient, containerStatus);
} catch (Exception e) {
LOG.error("Container start event could not be pulished for "
LOG.error("Container start event could not be published for "
+ containerStatus.getContainerId().toString(), e);
}
}
@ -847,7 +847,7 @@ public void onContainerStarted(ContainerId containerId,
ApplicationMaster.publishContainerStartEvent(
applicationMaster.timelineClient, container);
} catch (Exception e) {
LOG.error("Container start event coud not be pulished for "
LOG.error("Container start event could not be published for "
+ container.getId().toString(), e);
}
}

View File

@ -197,7 +197,7 @@ public static void main(String[] args) {
}
result = client.run();
} catch (Throwable t) {
LOG.fatal("Error running CLient", t);
LOG.fatal("Error running Client", t);
System.exit(1);
}
if (result) {

View File

@ -197,6 +197,7 @@ private void startWebApp() {
// the customized filter will be loaded by the timeline server to do Kerberos
// + DT authentication.
String initializers = conf.get("hadoop.http.filter.initializers");
boolean modifiedInitialiers = false;
initializers =
initializers == null || initializers.length() == 0 ? "" : initializers;
@ -206,6 +207,7 @@ private void startWebApp() {
initializers =
TimelineAuthenticationFilterInitializer.class.getName() + ","
+ initializers;
modifiedInitialiers = true;
}
String[] parts = initializers.split(",");
@ -214,13 +216,14 @@ private void startWebApp() {
filterInitializer = filterInitializer.trim();
if (filterInitializer.equals(AuthenticationFilterInitializer.class
.getName())) {
modifiedInitialiers = true;
continue;
}
target.add(filterInitializer);
}
String actualInitializers =
org.apache.commons.lang.StringUtils.join(target, ",");
if (!actualInitializers.equals(initializers)) {
if (modifiedInitialiers) {
conf.set("hadoop.http.filter.initializers", actualInitializers);
}
String bindAddress = WebAppUtils.getWebAppBindURL(conf,

View File

@ -23,6 +23,7 @@
import static org.junit.Assert.fail;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.http.lib.StaticUserWebFilter;
import org.apache.hadoop.security.AuthenticationFilterInitializer;
import org.apache.hadoop.service.Service.STATE;
import org.apache.hadoop.util.ExitUtil;
@ -33,6 +34,9 @@
import org.junit.Assert;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
public class TestApplicationHistoryServer {
ApplicationHistoryServer historyServer = null;
@ -75,23 +79,32 @@ public void testLaunch() throws Exception {
@Test(timeout = 50000)
public void testFilteOverrides() throws Exception {
String[] filterInitializers =
{
AuthenticationFilterInitializer.class.getName(),
TimelineAuthenticationFilterInitializer.class.getName(),
AuthenticationFilterInitializer.class.getName() + ","
+ TimelineAuthenticationFilterInitializer.class.getName(),
AuthenticationFilterInitializer.class.getName() + ", "
+ TimelineAuthenticationFilterInitializer.class.getName() };
for (String filterInitializer : filterInitializers) {
HashMap<String, String> driver = new HashMap<String, String>();
driver.put("", TimelineAuthenticationFilterInitializer.class.getName());
driver.put(StaticUserWebFilter.class.getName(),
TimelineAuthenticationFilterInitializer.class.getName() + ","
+ StaticUserWebFilter.class.getName());
driver.put(AuthenticationFilterInitializer.class.getName(),
TimelineAuthenticationFilterInitializer.class.getName());
driver.put(TimelineAuthenticationFilterInitializer.class.getName(),
TimelineAuthenticationFilterInitializer.class.getName());
driver.put(AuthenticationFilterInitializer.class.getName() + ","
+ TimelineAuthenticationFilterInitializer.class.getName(),
TimelineAuthenticationFilterInitializer.class.getName());
driver.put(AuthenticationFilterInitializer.class.getName() + ", "
+ TimelineAuthenticationFilterInitializer.class.getName(),
TimelineAuthenticationFilterInitializer.class.getName());
for (Map.Entry<String, String> entry : driver.entrySet()) {
String filterInitializer = entry.getKey();
String expectedValue = entry.getValue();
historyServer = new ApplicationHistoryServer();
Configuration config = new YarnConfiguration();
config.set("hadoop.http.filter.initializers", filterInitializer);
historyServer.init(config);
historyServer.start();
Configuration tmp = historyServer.getConfig();
assertEquals(TimelineAuthenticationFilterInitializer.class.getName(),
tmp.get("hadoop.http.filter.initializers"));
assertEquals(expectedValue, tmp.get("hadoop.http.filter.initializers"));
historyServer.stop();
AHSWebApp.resetInstance();
}

View File

@ -801,7 +801,7 @@ public void run() {
try {
Path local = completed.get();
if (null == assoc) {
LOG.error("Localized unkonwn resource to " + completed);
LOG.error("Localized unknown resource to " + completed);
// TODO delete
return;
}
@ -810,7 +810,7 @@ public void run() {
.getDU(new File(local.toUri()))));
assoc.getResource().unlock();
} catch (ExecutionException e) {
LOG.info("Failed to download rsrc " + assoc.getResource(),
LOG.info("Failed to download resource " + assoc.getResource(),
e.getCause());
LocalResourceRequest req = assoc.getResource().getRequest();
publicRsrc.handle(new ResourceFailedLocalizationEvent(req,

View File

@ -159,7 +159,7 @@ protected void testContainerPreservationOnResyncImpl(TestNodeManager1 nm,
// This test tests new container requests are blocked when NM starts from
// scratch until it register with RM AND while NM is resyncing with RM
@SuppressWarnings("unchecked")
@Test
@Test(timeout=60000)
public void testBlockNewContainerRequestsOnStartAndResync()
throws IOException, InterruptedException, YarnException {
NodeManager nm = new TestNodeManager2();

View File

@ -46,9 +46,9 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.AMRMTokenSecretManagerStateProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.EpochProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.EpochProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationAttemptStateDataProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationStateDataProto;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
@ -300,7 +300,7 @@ private void loadRMAppState(RMState rmState) throws Exception {
assert appState != null;
appState.attempts.put(attemptState.getAttemptId(), attemptState);
}
LOG.info("Done Loading applications from FS state store");
LOG.info("Done loading applications from FS state store");
} catch (Exception e) {
LOG.error("Failed to load state.", e);
throw e;

View File

@ -46,9 +46,9 @@
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.AMRMTokenSecretManagerStateProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.EpochProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationAttemptStateDataProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationStateDataProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.EpochProto;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
import org.apache.hadoop.yarn.server.records.Version;
@ -608,7 +608,7 @@ private void loadApplicationAttemptState(ApplicationState appState,
appState.attempts.put(attemptState.getAttemptId(), attemptState);
}
}
LOG.debug("Done Loading applications from ZK state store");
LOG.debug("Done loading applications from ZK state store");
}
@Override

View File

@ -28,7 +28,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationAttemptStateDataProto;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.util.Records;

View File

@ -24,7 +24,7 @@
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationStateDataProto;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.util.Records;

View File

@ -20,7 +20,7 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.EpochProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.EpochProto;
import org.apache.hadoop.yarn.util.Records;
/**

View File

@ -27,9 +27,9 @@
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
import org.apache.hadoop.yarn.proto.YarnProtos.FinalApplicationStatusProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMAppAttemptStateProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationAttemptStateDataProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationAttemptStateDataProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.RMAppAttemptStateProto;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;

View File

@ -20,9 +20,9 @@
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMAppStateProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationStateDataProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationStateDataProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.RMAppStateProto;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;

View File

@ -18,8 +18,8 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.EpochProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.EpochProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.EpochProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.EpochProtoOrBuilder;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.Epoch;

View File

@ -289,7 +289,7 @@ public void run() {
tokenWithConf = queue.take();
final TokenWithConf current = tokenWithConf;
if (LOG.isDebugEnabled()) {
LOG.debug("Canceling token " + tokenWithConf.token.getService());
LOG.debug("Cancelling token " + tokenWithConf.token.getService());
}
// need to use doAs so that http can find the kerberos tgt
UserGroupInformation.getLoginUser()

View File

@ -110,6 +110,10 @@ public class FairSchedulerAppsBlock extends HtmlBlock {
String percent = String.format("%.1f", appInfo.getProgress());
ApplicationAttemptId attemptId = app.getCurrentAppAttempt().getAppAttemptId();
int fairShare = fsinfo.getAppFairShare(attemptId);
if (fairShare == FairSchedulerInfo.INVALID_FAIR_SHARE) {
// FairScheduler#applications don't have the entry. Skip it.
continue;
}
//AppID numerical value parsed by parseHadoopID in yarn.dt.plugins.js
appsTableData.append("[\"<a href='")
.append(url("app", appInfo.getAppId())).append("'>")

View File

@ -1061,7 +1061,7 @@ private Credentials createCredentials(CredentialsInfo credentials) {
token.decodeFromUrlString(entry.getValue());
ret.addToken(alias, token);
}
for (Map.Entry<String, String> entry : credentials.getTokens().entrySet()) {
for (Map.Entry<String, String> entry : credentials.getSecrets().entrySet()) {
Text alias = new Text(entry.getKey());
Base64 decoder = new Base64(0, null, true);
byte[] secret = decoder.decode(entry.getValue());

View File

@ -25,12 +25,14 @@
import javax.xml.bind.annotation.XmlType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
@XmlRootElement(name = "fairScheduler")
@XmlType(name = "fairScheduler")
@XmlAccessorType(XmlAccessType.FIELD)
public class FairSchedulerInfo extends SchedulerInfo {
public static final int INVALID_FAIR_SHARE = -1;
private FairSchedulerQueueInfo rootQueue;
@XmlTransient
@ -44,9 +46,18 @@ public FairSchedulerInfo(FairScheduler fs) {
rootQueue = new FairSchedulerQueueInfo(scheduler.getQueueManager().
getRootQueue(), scheduler);
}
/**
* Get the fair share assigned to the appAttemptId.
* @param appAttemptId
* @return The fair share assigned to the appAttemptId,
* <code>FairSchedulerInfo#INVALID_FAIR_SHARE</code> if the scheduler does
* not know about this application attempt.
*/
public int getAppFairShare(ApplicationAttemptId appAttemptId) {
return scheduler.getSchedulerApp(appAttemptId).getFairShare().getMemory();
FSAppAttempt fsAppAttempt = scheduler.getSchedulerApp(appAttemptId);
return fsAppAttempt == null ?
INVALID_FAIR_SHARE : fsAppAttempt.getFairShare().getMemory();
}
public FairSchedulerQueueInfo getRootQueueInfo() {

View File

@ -23,6 +23,66 @@ option java_generate_equals_and_hash = true;
package hadoop.yarn;
import "yarn_server_common_protos.proto";
import "yarn_protos.proto";
////////////////////////////////////////////////////////////////////////
////// RM recovery related records /////////////////////////////////////
////////////////////////////////////////////////////////////////////////
enum RMAppAttemptStateProto {
RMATTEMPT_NEW = 1;
RMATTEMPT_SUBMITTED = 2;
RMATTEMPT_SCHEDULED = 3;
RMATTEMPT_ALLOCATED = 4;
RMATTEMPT_LAUNCHED = 5;
RMATTEMPT_FAILED = 6;
RMATTEMPT_RUNNING = 7;
RMATTEMPT_FINISHING = 8;
RMATTEMPT_FINISHED = 9;
RMATTEMPT_KILLED = 10;
RMATTEMPT_ALLOCATED_SAVING = 11;
RMATTEMPT_LAUNCHED_UNMANAGED_SAVING = 12;
RMATTEMPT_RECOVERED = 13;
RMATTEMPT_FINAL_SAVING = 14;
}
enum RMAppStateProto {
RMAPP_NEW = 1;
RMAPP_NEW_SAVING = 2;
RMAPP_SUBMITTED = 3;
RMAPP_ACCEPTED = 4;
RMAPP_RUNNING = 5;
RMAPP_FINAL_SAVING = 6;
RMAPP_FINISHING = 7;
RMAPP_FINISHED = 8;
RMAPP_FAILED = 9;
RMAPP_KILLED = 10;
}
message ApplicationStateDataProto {
optional int64 submit_time = 1;
optional ApplicationSubmissionContextProto application_submission_context = 2;
optional string user = 3;
optional int64 start_time = 4;
optional RMAppStateProto application_state = 5;
optional string diagnostics = 6 [default = "N/A"];
optional int64 finish_time = 7;
}
message ApplicationAttemptStateDataProto {
optional ApplicationAttemptIdProto attemptId = 1;
optional ContainerProto master_container = 2;
optional bytes app_attempt_tokens = 3;
optional RMAppAttemptStateProto app_attempt_state = 4;
optional string final_tracking_url = 5;
optional string diagnostics = 6 [default = "N/A"];
optional int64 start_time = 7;
optional FinalApplicationStatusProto final_application_status = 8;
optional int32 am_container_exit_status = 9 [default = -1000];
}
message EpochProto {
optional int64 epoch = 1;
}
message AMRMTokenSecretManagerStateProto {
optional MasterKeyProto current_master_key = 1;

View File

@ -22,20 +22,29 @@
import com.google.inject.Binder;
import com.google.inject.Injector;
import com.google.inject.Module;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.webapp.test.WebAppTests;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
@ -75,12 +84,67 @@ public void configure(Binder binder) {
WebAppTests.flushOutput(injector);
}
/**
* Testing inconsistent state between AbstractYarnScheduler#applications and
* RMContext#applications
*/
@Test
public void testFairSchedulerWebAppPageInInconsistentState() {
List<RMAppState> appStates = Arrays.asList(
RMAppState.NEW,
RMAppState.NEW_SAVING,
RMAppState.SUBMITTED,
RMAppState.RUNNING,
RMAppState.FINAL_SAVING,
RMAppState.ACCEPTED,
RMAppState.FINISHED
);
final RMContext rmContext = mockRMContext(appStates);
Injector injector = WebAppTests.createMockInjector(RMContext.class,
rmContext,
new Module() {
@Override
public void configure(Binder binder) {
try {
ResourceManager mockRmWithFairScheduler =
mockRmWithApps(rmContext);
binder.bind(ResourceManager.class).toInstance
(mockRmWithFairScheduler);
} catch (IOException e) {
throw new IllegalStateException(e);
}
}
});
FairSchedulerPage fsViewInstance =
injector.getInstance(FairSchedulerPage.class);
try {
fsViewInstance.render();
} catch (Exception e) {
Assert.fail("Failed to render FairSchedulerPage: " +
StringUtils.stringifyException(e));
}
WebAppTests.flushOutput(injector);
}
private static RMContext mockRMContext(List<RMAppState> states) {
final ConcurrentMap<ApplicationId, RMApp> applicationsMaps = Maps
.newConcurrentMap();
int i = 0;
for (RMAppState state : states) {
MockRMApp app = new MockRMApp(i, i, state);
MockRMApp app = new MockRMApp(i, i, state) {
@Override
public RMAppMetrics getRMAppMetrics() {
return new RMAppMetrics(Resource.newInstance(0, 0), 0, 0);
}
@Override
public YarnApplicationState createApplicationState() {
return YarnApplicationState.ACCEPTED;
}
};
RMAppAttempt attempt = mock(RMAppAttempt.class);
app.setCurrentAppAttempt(attempt);
applicationsMaps.put(app.getApplicationId(), app);
i++;
}
@ -113,4 +177,34 @@ null, new RMContainerTokenSecretManager(conf),
fs.init(conf);
return fs;
}
private static ResourceManager mockRmWithApps(RMContext rmContext) throws
IOException {
ResourceManager rm = mock(ResourceManager.class);
ResourceScheduler rs = mockFairSchedulerWithoutApps(rmContext);
when(rm.getResourceScheduler()).thenReturn(rs);
when(rm.getRMContext()).thenReturn(rmContext);
return rm;
}
private static FairScheduler mockFairSchedulerWithoutApps(RMContext rmContext)
throws IOException {
FairScheduler fs = new FairScheduler() {
@Override
public FSAppAttempt getSchedulerApp(ApplicationAttemptId
applicationAttemptId) {
return null ;
}
@Override
public FSAppAttempt getApplicationAttempt(ApplicationAttemptId
applicationAttemptId) {
return null;
}
};
FairSchedulerConfiguration conf = new FairSchedulerConfiguration();
fs.setRMContext(rmContext);
fs.init(conf);
return fs;
}
}

View File

@ -22,9 +22,7 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeTrue;
import java.io.IOException;
import java.io.StringReader;
import java.io.StringWriter;
import java.io.*;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
@ -47,6 +45,9 @@
import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.authentication.server.AuthenticationFilter;
import org.apache.hadoop.security.authentication.server.PseudoAuthenticationHandler;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
@ -77,6 +78,7 @@
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -684,7 +686,8 @@ public void testAppSubmit(String acceptMedia, String contentMedia)
CredentialsInfo credentials = new CredentialsInfo();
HashMap<String, String> tokens = new HashMap<String, String>();
HashMap<String, String> secrets = new HashMap<String, String>();
secrets.put("secret1", Base64.encodeBase64URLSafeString("secret1".getBytes("UTF8")));
secrets.put("secret1", Base64.encodeBase64String(
"mysecret".getBytes("UTF8")));
credentials.setSecrets(secrets);
credentials.setTokens(tokens);
ApplicationSubmissionContextInfo appInfo = new ApplicationSubmissionContextInfo();
@ -757,6 +760,16 @@ public void testAppSubmit(String acceptMedia, String contentMedia)
assertEquals(y.getType(), exampleLR.getType());
assertEquals(y.getPattern(), exampleLR.getPattern());
assertEquals(y.getVisibility(), exampleLR.getVisibility());
Credentials cs = new Credentials();
ByteArrayInputStream str =
new ByteArrayInputStream(app.getApplicationSubmissionContext()
.getAMContainerSpec().getTokens().array());
DataInputStream di = new DataInputStream(str);
cs.readTokenStorageStream(di);
Text key = new Text("secret1");
assertTrue("Secrets missing from credentials object", cs
.getAllSecretKeys().contains(key));
assertEquals("mysecret", new String(cs.getSecretKey(key), "UTF-8"));
response =
this.constructWebResource("apps", appId).accept(acceptMedia)