Merge trunk to HDFS-4685.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-4685@1566988 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Chris Nauroth 2014-02-11 06:00:56 +00:00
commit 3bf2f04bac
47 changed files with 1048 additions and 269 deletions

View File

@ -312,6 +312,9 @@ Release 2.4.0 - UNRELEASED
HADOOP-10295. Allow distcp to automatically identify the checksum type of
source files and use it for the target. (jing9 and Laurent Goujon)
HADOOP-10333. Fix grammatical error in overview.html document.
(René Nyffenegger via suresh)
OPTIMIZATIONS
BUG FIXES
@ -328,6 +331,9 @@ Release 2.4.0 - UNRELEASED
HADOOP-10330. TestFrameDecoder fails if it cannot bind port 12345.
(Arpit Agarwal)
HADOOP-10326. M/R jobs can not access S3 if Kerberos is enabled. (bc Wong
via atm)
Release 2.3.0 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -443,6 +443,12 @@ public long getDefaultBlockSize() {
return getConf().getLong("fs.s3.block.size", 64 * 1024 * 1024);
}
@Override
public String getCanonicalServiceName() {
// Does not support Token
return null;
}
// diagnostic methods
void dump() throws IOException {

View File

@ -733,4 +733,10 @@ public void setWorkingDirectory(Path newDir) {
public Path getWorkingDirectory() {
return workingDir;
}
@Override
public String getCanonicalServiceName() {
// Does not support Token
return null;
}
}

View File

@ -66,6 +66,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configuration.IntegerRanges;
@ -454,9 +455,10 @@ public void refreshServiceAcl(Configuration conf, PolicyProvider provider) {
* Refresh the service authorization ACL for the service handled by this server
* using the specified Configuration.
*/
public void refreshServiceAclWithConfigration(Configuration conf,
@Private
public void refreshServiceAclWithLoadedConfiguration(Configuration conf,
PolicyProvider provider) {
serviceAuthorizationManager.refreshWithConfiguration(conf, provider);
serviceAuthorizationManager.refreshWithLoadedConfiguration(conf, provider);
}
/**
* Returns a handle to the serviceAuthorizationManager (required in tests)

View File

@ -26,6 +26,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
@ -122,10 +123,11 @@ public synchronized void refresh(Configuration conf,
// Make a copy of the original config, and load the policy file
Configuration policyConf = new Configuration(conf);
policyConf.addResource(policyFile);
refreshWithConfiguration(policyConf, provider);
refreshWithLoadedConfiguration(policyConf, provider);
}
public synchronized void refreshWithConfiguration(Configuration conf,
@Private
public synchronized void refreshWithLoadedConfiguration(Configuration conf,
PolicyProvider provider) {
final Map<Class<?>, AccessControlList> newAcls =
new IdentityHashMap<Class<?>, AccessControlList>();

View File

@ -57,7 +57,7 @@ <h3>Platforms</h3>
<ul>
<li>
Hadoop was been demonstrated on GNU/Linux clusters with 2000 nodes.
Hadoop has been demonstrated on GNU/Linux clusters with more than 4000 nodes.
</li>
<li>
Windows is also a supported platform.

View File

@ -54,5 +54,10 @@ public void testBlockSize() throws Exception {
assertEquals("Double default block size", newBlockSize,
fs.getFileStatus(file).getBlockSize());
}
public void testCanonicalName() throws Exception {
assertNull("s3 doesn't support security token and shouldn't have canonical name",
fs.getCanonicalServiceName());
}
}

View File

@ -48,7 +48,12 @@ protected void tearDown() throws Exception {
store.purge("test");
super.tearDown();
}
public void testCanonicalName() throws Exception {
assertNull("s3n doesn't support security token and shouldn't have canonical name",
fs.getCanonicalServiceName());
}
public void testListStatusForRoot() throws Exception {
FileStatus[] paths = fs.listStatus(path("/"));
assertEquals("Root directory is not empty; ", 0, paths.length);
@ -60,7 +65,7 @@ public void testListStatusForRoot() throws Exception {
assertEquals(1, paths.length);
assertEquals(path("/test"), paths[0].getPath());
}
public void testNoTrailingBackslashOnBucket() throws Exception {
assertTrue(fs.getFileStatus(new Path(fs.getUri().toString())).isDirectory());
}

View File

@ -545,7 +545,8 @@ public READLINK3Response readlink(XDR xdr, SecurityHandler securityHandler,
return new READLINK3Response(Nfs3Status.NFS3ERR_SERVERFAULT);
}
if (MAX_READ_TRANSFER_SIZE < target.getBytes().length) {
return new READLINK3Response(Nfs3Status.NFS3ERR_IO, postOpAttr, null);
return new READLINK3Response(Nfs3Status.NFS3ERR_IO, postOpAttr,
new byte[0]);
}
return new READLINK3Response(Nfs3Status.NFS3_OK, postOpAttr,

View File

@ -335,6 +335,9 @@ Trunk (Unreleased)
HDFS-5911. The id of a CacheDirective instance does not get serialized in
the protobuf-fsimage. (Haohui Mai via jing9)
HDFS-5915. Refactor FSImageFormatProtobuf to simplify cross section reads.
(Haohui Mai via cnauroth)
Release 2.4.0 - UNRELEASED
INCOMPATIBLE CHANGES
@ -360,6 +363,8 @@ Release 2.4.0 - UNRELEASED
HDFS-4911. Reduce PeerCache timeout to be commensurate with
dfs.datanode.socket.reuse.keepalive (cmccabe)
HDFS-4370. Fix typo Blanacer in DataNode. (Chu Tong via shv)
OPTIMIZATIONS
HDFS-5790. LeaseManager.findPath is very slow when many leases need recovery
@ -402,6 +407,9 @@ Release 2.4.0 - UNRELEASED
HDFS-5900. Cannot set cache pool limit of "unlimited" via CacheAdmin.
(wang)
HDFS-5886. Potential null pointer deference in RpcProgramNfs3#readlink()
(brandonli)
Release 2.3.0 - UNRELEASED
INCOMPATIBLE CHANGES
@ -940,6 +948,12 @@ Release 2.3.0 - UNRELEASED
HDFS-5873. dfs.http.policy should have higher precedence over dfs.https.enable.
(Haohui Mai via jing9)
HDFS-5837. dfs.namenode.replication.considerLoad should consider
decommissioned nodes. (Tao Luo via shv)
HDFS-5921. Cannot browse file system via NN web UI if any directory has
the sticky bit set. (atm)
BREAKDOWN OF HDFS-2832 SUBTASKS AND RELATED JIRAS
HDFS-4985. Add storage type to the protocol and expose it in block report

View File

@ -633,9 +633,11 @@ private boolean isGoodTarget(DatanodeStorageInfo storage,
// check the communication traffic of the target machine
if (considerLoad) {
double avgLoad = 0;
int size = clusterMap.getNumOfLeaves();
if (size != 0 && stats != null) {
avgLoad = (double)stats.getTotalLoad()/size;
if (stats != null) {
int size = stats.getNumDatanodesInService();
if (size != 0) {
avgLoad = (double)stats.getTotalLoad()/size;
}
}
if (node.getXceiverCount() > (2.0 * avgLoad)) {
logNodeIsNotChosen(storage, "the node is too busy ");

View File

@ -2494,7 +2494,7 @@ public void clearAllBlockSecretKeys() {
/**
* Get current value of the max balancer bandwidth in bytes per second.
*
* @return bandwidth Blanacer bandwidth in bytes per second for this datanode.
* @return Balancer bandwidth in bytes per second for this datanode.
*/
public Long getBalancerBandwidth() {
DataXceiverServer dxcs =

View File

@ -42,6 +42,12 @@ public interface FSClusterStats {
* for writing targets, and false otherwise.
*/
public boolean isAvoidingStaleDataNodesForWrite();
/**
* Indicates number of datanodes that are in service.
* @return Number of datanodes that are both alive and not decommissioned.
*/
public int getNumDatanodesInService();
}

View File

@ -38,7 +38,7 @@
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.namenode.FSImageFormatProtobuf.StringMap;
import org.apache.hadoop.hdfs.server.namenode.FSImageFormatProtobuf.SaverContext;
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.FileSummary;
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.FilesUnderConstructionSection.FileUnderConstructionEntry;
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeDirectorySection;
@ -208,7 +208,7 @@ private INode loadINode(INodeSection.INode n) {
case FILE:
return loadINodeFile(n);
case DIRECTORY:
return loadINodeDirectory(n, parent.getStringTable());
return loadINodeDirectory(n, parent.getLoaderContext().getStringTable());
case SYMLINK:
return loadINodeSymlink(n);
default:
@ -228,7 +228,7 @@ private INodeFile loadINodeFile(INodeSection.INode n) {
blocks[i] = new BlockInfo(PBHelper.convert(bp.get(i)), replication);
}
final PermissionStatus permissions = loadPermission(f.getPermission(),
parent.getStringTable());
parent.getLoaderContext().getStringTable());
final INodeFile file = new INodeFile(n.getId(),
n.getName().toByteArray(), permissions, f.getModificationTime(),
@ -253,13 +253,14 @@ private INodeSymlink loadINodeSymlink(INodeSection.INode n) {
assert n.getType() == INodeSection.INode.Type.SYMLINK;
INodeSection.INodeSymlink s = n.getSymlink();
final PermissionStatus permissions = loadPermission(s.getPermission(),
parent.getStringTable());
parent.getLoaderContext().getStringTable());
return new INodeSymlink(n.getId(), n.getName().toByteArray(), permissions,
0, 0, s.getTarget().toStringUtf8());
}
private void loadRootINode(INodeSection.INode p) {
INodeDirectory root = loadINodeDirectory(p, parent.getStringTable());
INodeDirectory root = loadINodeDirectory(p, parent.getLoaderContext()
.getStringTable());
final Quota.Counts q = root.getQuotaCounts();
final long nsQuota = q.get(Quota.NAMESPACE);
final long dsQuota = q.get(Quota.DISKSPACE);
@ -273,16 +274,17 @@ private void loadRootINode(INodeSection.INode p) {
public final static class Saver {
private static long buildPermissionStatus(INodeAttributes n,
final StringMap stringMap) {
long userId = stringMap.getStringId(n.getUserName());
long groupId = stringMap.getStringId(n.getGroupName());
final SaverContext.DeduplicationMap<String> stringMap) {
long userId = stringMap.getId(n.getUserName());
long groupId = stringMap.getId(n.getGroupName());
return ((userId & USER_GROUP_STRID_MASK) << USER_STRID_OFFSET)
| ((groupId & USER_GROUP_STRID_MASK) << GROUP_STRID_OFFSET)
| n.getFsPermissionShort();
}
public static INodeSection.INodeFile.Builder buildINodeFile(
INodeFileAttributes file, final StringMap stringMap) {
INodeFileAttributes file,
final SaverContext.DeduplicationMap<String> stringMap) {
INodeSection.INodeFile.Builder b = INodeSection.INodeFile.newBuilder()
.setAccessTime(file.getAccessTime())
.setModificationTime(file.getModificationTime())
@ -293,7 +295,8 @@ public static INodeSection.INodeFile.Builder buildINodeFile(
}
public static INodeSection.INodeDirectory.Builder buildINodeDirectory(
INodeDirectoryAttributes dir, final StringMap stringMap) {
INodeDirectoryAttributes dir,
final SaverContext.DeduplicationMap<String> stringMap) {
Quota.Counts quota = dir.getQuotaCounts();
INodeSection.INodeDirectory.Builder b = INodeSection.INodeDirectory
.newBuilder().setModificationTime(dir.getModificationTime())
@ -416,7 +419,7 @@ private void save(OutputStream out, INode n) throws IOException {
private void save(OutputStream out, INodeDirectory n) throws IOException {
INodeSection.INodeDirectory.Builder b = buildINodeDirectory(n,
parent.getStringMap());
parent.getSaverContext().getStringMap());
INodeSection.INode r = buildINodeCommon(n)
.setType(INodeSection.INode.Type.DIRECTORY).setDirectory(b).build();
r.writeDelimitedTo(out);
@ -424,7 +427,7 @@ private void save(OutputStream out, INodeDirectory n) throws IOException {
private void save(OutputStream out, INodeFile n) throws IOException {
INodeSection.INodeFile.Builder b = buildINodeFile(n,
parent.getStringMap());
parent.getSaverContext().getStringMap());
for (Block block : n.getBlocks()) {
b.addBlocks(PBHelper.convert(block));
@ -447,7 +450,7 @@ private void save(OutputStream out, INodeFile n) throws IOException {
private void save(OutputStream out, INodeSymlink n) throws IOException {
INodeSection.INodeSymlink.Builder b = INodeSection.INodeSymlink
.newBuilder()
.setPermission(buildPermissionStatus(n, parent.getStringMap()))
.setPermission(buildPermissionStatus(n, parent.getSaverContext().getStringMap()))
.setTarget(ByteString.copyFrom(n.getSymlink()));
INodeSection.INode r = buildINodeCommon(n)
.setType(INodeSection.INode.Type.SYMLINK).setSymlink(b).build();

View File

@ -73,12 +73,56 @@
public final class FSImageFormatProtobuf {
private static final Log LOG = LogFactory.getLog(FSImageFormatProtobuf.class);
public static final class LoaderContext {
private String[] stringTable;
public String[] getStringTable() {
return stringTable;
}
}
public static final class SaverContext {
public static class DeduplicationMap<E> {
private final Map<E, Integer> map = Maps.newHashMap();
private DeduplicationMap() {}
static <T> DeduplicationMap<T> newMap() {
return new DeduplicationMap<T>();
}
int getId(E value) {
if (value == null) {
return 0;
}
Integer v = map.get(value);
if (v == null) {
int nv = map.size() + 1;
map.put(value, nv);
return nv;
}
return v;
}
int size() {
return map.size();
}
Set<Entry<E, Integer>> entrySet() {
return map.entrySet();
}
}
private final DeduplicationMap<String> stringMap = DeduplicationMap.newMap();
public DeduplicationMap<String> getStringMap() {
return stringMap;
}
}
public static final class Loader implements FSImageFormat.AbstractLoader {
static final int MINIMUM_FILE_LENGTH = 8;
private final Configuration conf;
private final FSNamesystem fsn;
private String[] stringTable;
private final LoaderContext ctx;
/** The MD5 sum of the loaded file */
private MD5Hash imgDigest;
@ -88,6 +132,7 @@ public static final class Loader implements FSImageFormat.AbstractLoader {
Loader(Configuration conf, FSNamesystem fsn) {
this.conf = conf;
this.fsn = fsn;
this.ctx = new LoaderContext();
}
@Override
@ -100,8 +145,8 @@ public long getLoadedImageTxId() {
return imgTxId;
}
public String[] getStringTable() {
return stringTable;
public LoaderContext getLoaderContext() {
return ctx;
}
void load(File file) throws IOException {
@ -226,11 +271,11 @@ private void loadNameSystemSection(InputStream in) throws IOException {
private void loadStringTableSection(InputStream in) throws IOException {
StringTableSection s = StringTableSection.parseDelimitedFrom(in);
stringTable = new String[s.getNumEntry() + 1];
ctx.stringTable = new String[s.getNumEntry() + 1];
for (int i = 0; i < s.getNumEntry(); ++i) {
StringTableSection.Entry e = StringTableSection.Entry
.parseDelimitedFrom(in);
stringTable[e.getId()] = e.getStr();
ctx.stringTable[e.getId()] = e.getStr();
}
}
@ -269,9 +314,10 @@ private void loadCacheManagerSection(InputStream in) throws IOException {
public static final class Saver {
private final SaveNamespaceContext context;
private final SaverContext saverContext;
private long currentOffset = FSImageUtil.MAGIC_HEADER.length;
private MD5Hash savedDigest;
private StringMap stringMap = new StringMap();
private FileChannel fileChannel;
// OutputStream for the section data
@ -282,6 +328,7 @@ public static final class Saver {
Saver(SaveNamespaceContext context) {
this.context = context;
this.saverContext = new SaverContext();
}
public MD5Hash getSavedDigest() {
@ -292,6 +339,10 @@ public SaveNamespaceContext getContext() {
return context;
}
public SaverContext getSaverContext() {
return saverContext;
}
public void commitSection(FileSummary.Builder summary, SectionName name)
throws IOException {
long oldOffset = currentOffset;
@ -465,48 +516,15 @@ private void saveStringTableSection(FileSummary.Builder summary)
throws IOException {
OutputStream out = sectionOutputStream;
StringTableSection.Builder b = StringTableSection.newBuilder()
.setNumEntry(stringMap.size());
.setNumEntry(saverContext.stringMap.size());
b.build().writeDelimitedTo(out);
for (Entry<String, Integer> e : stringMap.entrySet()) {
for (Entry<String, Integer> e : saverContext.stringMap.entrySet()) {
StringTableSection.Entry.Builder eb = StringTableSection.Entry
.newBuilder().setId(e.getValue()).setStr(e.getKey());
eb.build().writeDelimitedTo(out);
}
commitSection(summary, SectionName.STRING_TABLE);
}
public StringMap getStringMap() {
return stringMap;
}
}
public static class StringMap {
private final Map<String, Integer> stringMap;
public StringMap() {
stringMap = Maps.newHashMap();
}
int getStringId(String str) {
if (str == null) {
return 0;
}
Integer v = stringMap.get(str);
if (v == null) {
int nv = stringMap.size() + 1;
stringMap.put(str, nv);
return nv;
}
return v;
}
int size() {
return stringMap.size();
}
Set<Entry<String, Integer>> entrySet() {
return stringMap.entrySet();
}
}
/**

View File

@ -6847,7 +6847,12 @@ public boolean isAvoidingStaleDataNodesForWrite() {
return this.blockManager.getDatanodeManager()
.shouldAvoidStaleDataNodesForWrite();
}
@Override // FSClusterStats
public int getNumDatanodesInService() {
return getNumLiveDataNodes() - getNumDecomLiveDataNodes();
}
public SnapshotManager getSnapshotManager() {
return snapshotManager;
}

View File

@ -115,7 +115,7 @@ private void loadSnapshots(InputStream in, int size) throws IOException {
SnapshotSection.Snapshot pbs = SnapshotSection.Snapshot
.parseDelimitedFrom(in);
INodeDirectory root = loadINodeDirectory(pbs.getRoot(),
parent.getStringTable());
parent.getLoaderContext().getStringTable());
int sid = pbs.getSnapshotId();
INodeDirectorySnapshottable parent = (INodeDirectorySnapshottable) fsDir
.getInode(root.getId()).asDirectory();
@ -162,7 +162,8 @@ private void loadFileDiffList(InputStream in, INodeFile file, int size)
if (pbf.hasSnapshotCopy()) {
INodeSection.INodeFile fileInPb = pbf.getSnapshotCopy();
PermissionStatus permission = loadPermission(
fileInPb.getPermission(), parent.getStringTable());
fileInPb.getPermission(), parent.getLoaderContext()
.getStringTable());
copy = new INodeFileAttributes.SnapshotCopy(pbf.getName()
.toByteArray(), permission, null, fileInPb.getModificationTime(),
fileInPb.getAccessTime(), (short) fileInPb.getReplication(),
@ -249,8 +250,9 @@ private void loadDirectoryDiffList(InputStream in, INodeDirectory dir,
}else if (diffInPb.hasSnapshotCopy()) {
INodeSection.INodeDirectory dirCopyInPb = diffInPb.getSnapshotCopy();
final byte[] name = diffInPb.getName().toByteArray();
PermissionStatus permission = loadPermission(dirCopyInPb
.getPermission(), parent.getStringTable());
PermissionStatus permission = loadPermission(
dirCopyInPb.getPermission(), parent.getLoaderContext()
.getStringTable());
long modTime = dirCopyInPb.getModificationTime();
boolean noQuota = dirCopyInPb.getNsQuota() == -1
&& dirCopyInPb.getDsQuota() == -1;
@ -312,7 +314,7 @@ public void serializeSnapshotSection(OutputStream out) throws IOException {
SnapshotSection.Snapshot.Builder sb = SnapshotSection.Snapshot
.newBuilder().setSnapshotId(s.getId());
INodeSection.INodeDirectory.Builder db = buildINodeDirectory(sroot,
parent.getStringMap());
parent.getSaverContext().getStringMap());
INodeSection.INode r = INodeSection.INode.newBuilder()
.setId(sroot.getId())
.setType(INodeSection.INode.Type.DIRECTORY)
@ -370,7 +372,7 @@ private void serializeFileDiffList(INodeFile file, OutputStream out)
INodeFileAttributes copy = diff.snapshotINode;
if (copy != null) {
fb.setName(ByteString.copyFrom(copy.getLocalNameBytes()))
.setSnapshotCopy(buildINodeFile(copy, parent.getStringMap()));
.setSnapshotCopy(buildINodeFile(copy, parent.getSaverContext().getStringMap()));
}
fb.build().writeDelimitedTo(out);
}
@ -411,7 +413,7 @@ private void serializeDirDiffList(INodeDirectory dir, OutputStream out)
if (!diff.isSnapshotRoot() && copy != null) {
db.setName(ByteString.copyFrom(copy.getLocalNameBytes()))
.setSnapshotCopy(
buildINodeDirectory(copy, parent.getStringMap()));
buildINodeDirectory(copy, parent.getSaverContext().getStringMap()));
}
// process created list and deleted list
List<INode> created = diff.getChildrenDiff()

View File

@ -35,8 +35,8 @@
}
if (sticky) {
var exec = ((parms.perm % 10) & 1) == 1;
res[res.length - 1] = exec ? 't' : 'T';
var otherExec = ((ctx.current().permission % 10) & 1) == 1;
res = res.substr(0, res.length - 1) + (otherExec ? 't' : 'T');
}
chunk.write(dir + res);

View File

@ -0,0 +1,161 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.test.PathUtils;
import org.apache.hadoop.util.VersionInfo;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class TestReplicationPolicyConsiderLoad {
private static NameNode namenode;
private static DatanodeManager dnManager;
private static List<DatanodeRegistration> dnrList;
private static DatanodeDescriptor[] dataNodes;
private static DatanodeStorageInfo[] storages;
@BeforeClass
public static void setupCluster() throws IOException {
Configuration conf = new HdfsConfiguration();
final String[] racks = {
"/rack1",
"/rack1",
"/rack1",
"/rack2",
"/rack2",
"/rack2"};
storages = DFSTestUtil.createDatanodeStorageInfos(racks);
dataNodes = DFSTestUtil.toDatanodeDescriptor(storages);
FileSystem.setDefaultUri(conf, "hdfs://localhost:0");
conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
File baseDir = PathUtils.getTestDir(TestReplicationPolicy.class);
conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
new File(baseDir, "name").getPath());
conf.setBoolean(
DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY, true);
conf.setBoolean(
DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY, true);
conf.setBoolean(
DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, true);
DFSTestUtil.formatNameNode(conf);
namenode = new NameNode(conf);
int blockSize = 1024;
dnrList = new ArrayList<DatanodeRegistration>();
dnManager = namenode.getNamesystem().getBlockManager().getDatanodeManager();
// Register DNs
for (int i=0; i < 6; i++) {
DatanodeRegistration dnr = new DatanodeRegistration(dataNodes[i],
new StorageInfo(), new ExportedBlockKeys(), VersionInfo.getVersion());
dnrList.add(dnr);
dnManager.registerDatanode(dnr);
dataNodes[i].getStorageInfos()[0].setUtilizationForTesting(
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*blockSize, 0L,
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*blockSize, 0L);
dataNodes[i].updateHeartbeat(
BlockManagerTestUtil.getStorageReportsForDatanode(dataNodes[i]),
0L, 0L, 0, 0);
}
}
/**
* Tests that chooseTarget with considerLoad set to true correctly calculates
* load with decommissioned nodes.
*/
@Test
public void testChooseTargetWithDecomNodes() throws IOException {
namenode.getNamesystem().writeLock();
try {
// Decommission DNs so BlockPlacementPolicyDefault.isGoodTarget()
// returns false
for (int i = 0; i < 3; i++) {
DatanodeInfo d = dnManager.getDatanodeByXferAddr(
dnrList.get(i).getIpAddr(),
dnrList.get(i).getXferPort());
d.setDecommissioned();
}
String blockPoolId = namenode.getNamesystem().getBlockPoolId();
dnManager.handleHeartbeat(dnrList.get(3),
BlockManagerTestUtil.getStorageReportsForDatanode(dataNodes[3]),
blockPoolId, dataNodes[3].getCacheCapacity(),
dataNodes[3].getCacheRemaining(),
2, 0, 0);
dnManager.handleHeartbeat(dnrList.get(4),
BlockManagerTestUtil.getStorageReportsForDatanode(dataNodes[4]),
blockPoolId, dataNodes[4].getCacheCapacity(),
dataNodes[4].getCacheRemaining(),
4, 0, 0);
dnManager.handleHeartbeat(dnrList.get(5),
BlockManagerTestUtil.getStorageReportsForDatanode(dataNodes[5]),
blockPoolId, dataNodes[5].getCacheCapacity(),
dataNodes[5].getCacheRemaining(),
4, 0, 0);
// Call chooseTarget()
DatanodeStorageInfo[] targets = namenode.getNamesystem().getBlockManager()
.getBlockPlacementPolicy().chooseTarget("testFile.txt", 3,
dataNodes[0], new ArrayList<DatanodeStorageInfo>(), false, null,
1024, StorageType.DEFAULT);
assertEquals(3, targets.length);
Set<DatanodeStorageInfo> targetSet = new HashSet<DatanodeStorageInfo>(
Arrays.asList(targets));
for (int i = 3; i < storages.length; i++) {
assertTrue(targetSet.contains(storages[i]));
}
} finally {
dataNodes[0].stopDecommission();
dataNodes[1].stopDecommission();
dataNodes[2].stopDecommission();
namenode.getNamesystem().writeUnlock();
}
NameNode.LOG.info("Done working on it");
}
@AfterClass
public static void teardownCluster() {
if (namenode != null) namenode.stop();
}
}

View File

@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import org.apache.hadoop.hdfs.server.namenode.FSImageFormatProtobuf.SaverContext.DeduplicationMap;
import org.junit.Assert;
import org.junit.Test;
public class TestDeduplicationMap {
@Test
public void testDeduplicationMap() {
DeduplicationMap<String> m = DeduplicationMap.newMap();
Assert.assertEquals(1, m.getId("1"));
Assert.assertEquals(2, m.getId("2"));
Assert.assertEquals(3, m.getId("3"));
Assert.assertEquals(1, m.getId("1"));
Assert.assertEquals(2, m.getId("2"));
Assert.assertEquals(3, m.getId("3"));
}
}

View File

@ -27,17 +27,12 @@
import java.io.File;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.namenode.FSImageStorageInspector.FSImageFile;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
import org.junit.Test;
public class TestFSImageStorageInspector {
private static final Log LOG = LogFactory.getLog(
TestFSImageStorageInspector.class);
/**
* Simple test with image, edits, and inprogress edits
*/

View File

@ -119,6 +119,9 @@ Release 2.4.0 - UNRELEASED
YARN-1635. Implemented a Leveldb based ApplicationTimelineStore. (Billie
Rinaldi via zjshen)
YARN-1637. Implemented a client library for Java users to post timeline
entities and events. (zjshen)
IMPROVEMENTS
YARN-1007. Enhance History Reader interface for Containers. (Mayank Bansal via
@ -166,6 +169,13 @@ Release 2.4.0 - UNRELEASED
YARN-1493. Changed ResourceManager and Scheduler interfacing to recognize
app-attempts separately from apps. (Jian He via vinodkv)
YARN-1459. Changed ResourceManager to depend its service initialization
on the configuration-provider mechanism during startup too. (Xuan Gong via
vinodkv)
YARN-1706. Created an utility method to dump timeline records to JSON
strings. (zjshen)
OPTIMIZATIONS
BUG FIXES
@ -233,6 +243,9 @@ Release 2.4.0 - UNRELEASED
YARN-1672. YarnConfiguration is missing a default for
yarn.nodemanager.log.retain-seconds (Naren Koneru via kasha)
YARN-1698. Fixed default TimelineStore in code to match what is documented
in yarn-default.xml (Zhijie Shen via vinodkv)
Release 2.3.0 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -309,4 +309,10 @@
<Class name="org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore" />
<Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match>
<!-- Multithreaded correctness warnings need to be ignored here as this is for creating the singleton.-->
<Match>
<Class name="org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider"/>
<Bug pattern="DC_DOUBLECHECK" />
</Match>
</FindBugsFilter>

View File

@ -19,7 +19,6 @@
package org.apache.hadoop.yarn.conf;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
@ -34,8 +33,8 @@
*/
public abstract class ConfigurationProvider {
public void init(Configuration conf) throws Exception {
initInternal(conf);
public void init(Configuration bootstrapConf) throws Exception {
initInternal(bootstrapConf);
}
public void close() throws Exception {
@ -43,19 +42,21 @@ public void close() throws Exception {
}
/**
* Get the configuration.
* Get the configuration and combine with bootstrapConf
* @param bootstrapConf Configuration
* @param name The configuration file name
* @return configuration
* @throws YarnException
* @throws IOException
*/
public abstract Configuration getConfiguration(String name)
throws YarnException, IOException;
public abstract Configuration getConfiguration(Configuration bootstrapConf,
String name) throws YarnException, IOException;
/**
* Derived classes initialize themselves using this method.
*/
public abstract void initInternal(Configuration conf) throws Exception;
public abstract void initInternal(Configuration bootstrapConf)
throws Exception;
/**
* Derived classes close themselves using this method.

View File

@ -33,12 +33,12 @@ public class ConfigurationProviderFactory {
/**
* Creates an instance of {@link ConfigurationProvider} using given
* configuration.
* @param conf
* @param bootstrapConf
* @return configurationProvider
*/
@SuppressWarnings("unchecked")
public static ConfigurationProvider
getConfigurationProvider(Configuration conf) {
getConfigurationProvider(Configuration bootstrapConf) {
Class<? extends ConfigurationProvider> defaultProviderClass;
try {
defaultProviderClass = (Class<? extends ConfigurationProvider>)
@ -49,9 +49,11 @@ public class ConfigurationProviderFactory {
"Invalid default configuration provider class"
+ YarnConfiguration.DEFAULT_RM_CONFIGURATION_PROVIDER_CLASS, e);
}
ConfigurationProvider configurationProvider = ReflectionUtils.newInstance(
conf.getClass(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
defaultProviderClass, ConfigurationProvider.class), conf);
ConfigurationProvider configurationProvider =
ReflectionUtils.newInstance(bootstrapConf.getClass(
YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
defaultProviderClass, ConfigurationProvider.class),
bootstrapConf);
return configurationProvider;
}
}

View File

@ -79,6 +79,10 @@
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty-util</artifactId>
</dependency>
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-client</artifactId>
</dependency>
<!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
<dependency>

View File

@ -0,0 +1,70 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.client.api;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntity;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSPutErrors;
import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;
import org.apache.hadoop.yarn.exceptions.YarnException;
/**
* A client library that can be used to post some information in terms of a
* number of conceptual entities.
*
* @See ATSEntity
*/
@Public
@Unstable
public abstract class TimelineClient extends AbstractService {
@Public
public static TimelineClient createTimelineClient() {
TimelineClient client = new TimelineClientImpl();
return client;
}
@Private
protected TimelineClient(String name) {
super(name);
}
/**
* <p>
* Post the information of a number of conceptual entities of an application
* to the timeline server. It is a blocking API. The method will not return
* until it gets the response from the timeline server.
* </p>
*
* @param entities
* the collection of {@link ATSEntity}
* @return the error information if the post entities are not correctly stored
* @throws IOException
* @throws YarnException
*/
@Public
public abstract ATSPutErrors postEntities(
ATSEntity... entities) throws IOException, YarnException;
}

View File

@ -0,0 +1,106 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.client.api.impl;
import java.io.IOException;
import java.net.URI;
import java.util.Arrays;
import javax.ws.rs.core.MediaType;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntities;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntity;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSPutErrors;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.WebResource;
import com.sun.jersey.api.client.config.ClientConfig;
import com.sun.jersey.api.client.config.DefaultClientConfig;
@Private
@Unstable
public class TimelineClientImpl extends TimelineClient {
private static final Log LOG = LogFactory.getLog(TimelineClientImpl.class);
private static final String RESOURCE_URI_STR = "/ws/v1/apptimeline/";
private static final Joiner JOINER = Joiner.on("");
private Client client;
private URI resURI;
public TimelineClientImpl() {
super(TimelineClientImpl.class.getName());
ClientConfig cc = new DefaultClientConfig();
cc.getClasses().add(YarnJacksonJaxbJsonProvider.class);
client = Client.create(cc);
}
protected void serviceInit(Configuration conf) throws Exception {
resURI = new URI(JOINER.join(HttpConfig.getSchemePrefix(),
HttpConfig.isSecure() ? conf.get(
YarnConfiguration.AHS_WEBAPP_HTTPS_ADDRESS,
YarnConfiguration.DEFAULT_AHS_WEBAPP_HTTPS_ADDRESS) : conf.get(
YarnConfiguration.AHS_WEBAPP_ADDRESS,
YarnConfiguration.DEFAULT_AHS_WEBAPP_ADDRESS), RESOURCE_URI_STR));
super.serviceInit(conf);
}
@Override
public ATSPutErrors postEntities(
ATSEntity... entities) throws IOException, YarnException {
ATSEntities entitiesContainer = new ATSEntities();
entitiesContainer.addEntities(Arrays.asList(entities));
ClientResponse resp = doPostingEntities(entitiesContainer);
if (resp.getClientResponseStatus() != ClientResponse.Status.OK) {
String msg =
"Failed to get the response from the timeline server.";
LOG.error(msg);
if (LOG.isDebugEnabled()) {
String output = resp.getEntity(String.class);
LOG.debug("HTTP error code: " + resp.getStatus()
+ " Server response : \n" + output);
}
throw new YarnException(msg);
}
return resp.getEntity(ATSPutErrors.class);
}
@Private
@VisibleForTesting
public ClientResponse doPostingEntities(ATSEntities entities) {
WebResource webResource = client.resource(resURI);
return webResource.accept(MediaType.APPLICATION_JSON)
.type(MediaType.APPLICATION_JSON)
.post(ClientResponse.class, entities);
}
}

View File

@ -382,11 +382,7 @@ private void killApplication(String applicationId) throws YarnException,
}
/**
* Kills the application with the application id as appId
*
* @param applicationId
* @throws YarnException
* @throws IOException
* Moves the application with the given ID to the given queue.
*/
private void moveApplicationAcrossQueues(String applicationId, String queue)
throws YarnException, IOException {

View File

@ -0,0 +1,137 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.client.api.impl;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import junit.framework.Assert;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntities;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntity;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSEvent;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSPutErrors;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import com.sun.jersey.api.client.ClientResponse;
public class TestTimelineClient {
private TimelineClientImpl client;
@Before
public void setup() {
client = spy((TimelineClientImpl) TimelineClient.createTimelineClient());
client.init(new YarnConfiguration());
client.start();
}
@After
public void tearDown() {
client.stop();
}
@Test
public void testPostEntities() throws Exception {
mockClientResponse(ClientResponse.Status.OK, false);
try {
ATSPutErrors errors = client.postEntities(generateATSEntity());
Assert.assertEquals(0, errors.getErrors().size());
} catch (YarnException e) {
Assert.fail("Exception is not expected");
}
}
@Test
public void testPostEntitiesWithError() throws Exception {
mockClientResponse(ClientResponse.Status.OK, true);
try {
ATSPutErrors errors = client.postEntities(generateATSEntity());
Assert.assertEquals(1, errors.getErrors().size());
Assert.assertEquals("test entity id", errors.getErrors().get(0)
.getEntityId());
Assert.assertEquals("test entity type", errors.getErrors().get(0)
.getEntityType());
Assert.assertEquals(ATSPutErrors.ATSPutError.IO_EXCEPTION,
errors.getErrors().get(0).getErrorCode());
} catch (YarnException e) {
Assert.fail("Exception is not expected");
}
}
@Test
public void testPostEntitiesNoResponse() throws Exception {
mockClientResponse(ClientResponse.Status.INTERNAL_SERVER_ERROR, false);
try {
client.postEntities(generateATSEntity());
Assert.fail("Exception is expected");
} catch (YarnException e) {
Assert.assertTrue(e.getMessage().contains(
"Failed to get the response from the timeline server."));
}
}
private ClientResponse mockClientResponse(ClientResponse.Status status,
boolean hasError) {
ClientResponse response = mock(ClientResponse.class);
doReturn(response).when(client)
.doPostingEntities(any(ATSEntities.class));
when(response.getClientResponseStatus()).thenReturn(status);
ATSPutErrors.ATSPutError error = new ATSPutErrors.ATSPutError();
error.setEntityId("test entity id");
error.setEntityType("test entity type");
error.setErrorCode(ATSPutErrors.ATSPutError.IO_EXCEPTION);
ATSPutErrors errors = new ATSPutErrors();
if (hasError) {
errors.addError(error);
}
when(response.getEntity(ATSPutErrors.class)).thenReturn(errors);
return response;
}
private static ATSEntity generateATSEntity() {
ATSEntity entity = new ATSEntity();
entity.setEntityId("entity id");
entity.setEntityType("entity type");
entity.setStartTime(System.currentTimeMillis());
for (int i = 0; i < 2; ++i) {
ATSEvent event = new ATSEvent();
event.setTimestamp(System.currentTimeMillis());
event.setEventType("test event type " + i);
event.addEventInfo("key1", "val1");
event.addEventInfo("key2", "val2");
entity.addEvent(event);
}
entity.addRelatedEntity("test ref type 1", "test ref id 1");
entity.addRelatedEntity("test ref type 2", "test ref id 2");
entity.addPrimaryFilter("pkey1", "pval1");
entity.addPrimaryFilter("pkey2", "pval2");
entity.addOtherInfo("okey1", "oval1");
entity.addOtherInfo("okey2", "oval2");
return entity;
}
}

View File

@ -675,7 +675,6 @@ public void testAppsHelpCommand() throws Exception {
int result = spyCli.run(new String[] { "-help" });
Assert.assertTrue(result == 0);
verify(spyCli).printUsage(any(Options.class));
System.err.println(sysOutStream.toString()); //todo sandyt remove this hejfkdsl
Assert.assertEquals(createApplicationCLIHelpMessage(),
sysOutStream.toString());

View File

@ -19,7 +19,6 @@
package org.apache.hadoop.yarn;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@ -42,24 +41,24 @@ public class FileSystemBasedConfigurationProvider
private Path configDir;
@Override
public synchronized Configuration getConfiguration(String name)
throws IOException, YarnException {
public synchronized Configuration getConfiguration(Configuration bootstrapConf,
String name) throws IOException, YarnException {
Path configPath = new Path(this.configDir, name);
if (!fs.exists(configPath)) {
throw new YarnException("Can not find Configuration: " + name + " in "
+ configDir);
}
Configuration conf = new Configuration(false);
conf.addResource(fs.open(configPath));
return conf;
bootstrapConf.addResource(fs.open(configPath));
return bootstrapConf;
}
@Override
public synchronized void initInternal(Configuration conf) throws Exception {
public synchronized void initInternal(Configuration bootstrapConf)
throws Exception {
configDir =
new Path(conf.get(YarnConfiguration.FS_BASED_RM_CONF_STORE,
new Path(bootstrapConf.get(YarnConfiguration.FS_BASED_RM_CONF_STORE,
YarnConfiguration.DEFAULT_FS_BASED_RM_CONF_STORE));
fs = configDir.getFileSystem(conf);
fs = configDir.getFileSystem(bootstrapConf);
if (!fs.exists(configDir)) {
fs.mkdirs(configDir);
}

View File

@ -19,7 +19,6 @@
package org.apache.hadoop.yarn;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
@ -31,13 +30,13 @@
public class LocalConfigurationProvider extends ConfigurationProvider {
@Override
public Configuration getConfiguration(String name)
throws IOException, YarnException {
return new Configuration();
public Configuration getConfiguration(Configuration bootstrapConf,
String name) throws IOException, YarnException {
return bootstrapConf;
}
@Override
public void initInternal(Configuration conf) throws Exception {
public void initInternal(Configuration bootstrapConf) throws Exception {
// Do nothing
}

View File

@ -0,0 +1,86 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.util;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.codehaus.jackson.JsonGenerationException;
import org.codehaus.jackson.map.AnnotationIntrospector;
import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.annotate.JsonSerialize.Inclusion;
import org.codehaus.jackson.xc.JaxbAnnotationIntrospector;
/**
* The helper class for the timeline module.
*
*/
@Public
@Evolving
public class TimelineUtils {
private static ObjectMapper mapper;
static {
mapper = new ObjectMapper();
AnnotationIntrospector introspector = new JaxbAnnotationIntrospector();
mapper.setAnnotationIntrospector(introspector);
mapper.getSerializationConfig()
.setSerializationInclusion(Inclusion.NON_NULL);
}
/**
* Serialize a POJO object into a JSON string not in a pretty format
*
* @param o
* an object to serialize
* @return a JSON string
* @throws IOException
* @throws JsonMappingException
* @throws JsonGenerationException
*/
public static String dumpTimelineRecordtoJSON(Object o)
throws JsonGenerationException, JsonMappingException, IOException {
return dumpTimelineRecordtoJSON(o, false);
}
/**
* Serialize a POJO object into a JSON string
*
* @param o
* an object to serialize
* @param pretty
* whether in a pretty format or not
* @return a JSON string
* @throws IOException
* @throws JsonMappingException
* @throws JsonGenerationException
*/
public static String dumpTimelineRecordtoJSON(Object o, boolean pretty)
throws JsonGenerationException, JsonMappingException, IOException {
if (pretty) {
return mapper.defaultPrettyPrintingWriter().writeValueAsString(o);
} else {
return mapper.writeValueAsString(o);
}
}
}

View File

@ -19,18 +19,23 @@
package org.apache.hadoop.yarn.api.records.apptimeline;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSPutErrors.ATSPutError;
import org.apache.hadoop.yarn.util.TimelineUtils;
import org.junit.Test;
public class TestApplicationTimelineRecords {
private static final Log LOG =
LogFactory.getLog(TestApplicationTimelineRecords.class);
@Test
public void testATSEntities() {
public void testATSEntities() throws Exception {
ATSEntities entities = new ATSEntities();
for (int j = 0; j < 2; ++j) {
ATSEntity entity = new ATSEntity();
@ -53,6 +58,9 @@ public void testATSEntities() {
entity.addOtherInfo("okey2", "oval2");
entities.addEntity(entity);
}
LOG.info("Entities in JSON:");
LOG.info(TimelineUtils.dumpTimelineRecordtoJSON(entities, true));
Assert.assertEquals(2, entities.getEntities().size());
ATSEntity entity1 = entities.getEntities().get(0);
Assert.assertEquals("entity id 0", entity1.getEntityId());
@ -71,7 +79,7 @@ public void testATSEntities() {
}
@Test
public void testATSEvents() {
public void testATSEvents() throws Exception {
ATSEvents events = new ATSEvents();
for (int j = 0; j < 2; ++j) {
ATSEvents.ATSEventsOfOneEntity partEvents =
@ -88,6 +96,9 @@ public void testATSEvents() {
}
events.addEvent(partEvents);
}
LOG.info("Events in JSON:");
LOG.info(TimelineUtils.dumpTimelineRecordtoJSON(events, true));
Assert.assertEquals(2, events.getAllEvents().size());
ATSEvents.ATSEventsOfOneEntity partEvents1 = events.getAllEvents().get(0);
Assert.assertEquals("entity id 0", partEvents1.getEntityId());
@ -112,7 +123,7 @@ public void testATSEvents() {
}
@Test
public void testATSPutErrors() {
public void testATSPutErrors() throws Exception {
ATSPutErrors atsPutErrors = new ATSPutErrors();
ATSPutError error1 = new ATSPutError();
error1.setEntityId("entity id 1");
@ -127,6 +138,8 @@ public void testATSPutErrors() {
error2.setErrorCode(ATSPutError.IO_EXCEPTION);
errors.add(error2);
atsPutErrors.addErrors(errors);
LOG.info("Errors in JSON:");
LOG.info(TimelineUtils.dumpTimelineRecordtoJSON(atsPutErrors, true));
Assert.assertEquals(3, atsPutErrors.getErrors().size());
ATSPutError e = atsPutErrors.getErrors().get(0);

View File

@ -34,7 +34,7 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline.ApplicationTimelineStore;
import org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline.MemoryApplicationTimelineStore;
import org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline.LeveldbApplicationTimelineStore;
import org.apache.hadoop.yarn.server.applicationhistoryservice.webapp.AHSWebApp;
import org.apache.hadoop.yarn.webapp.WebApp;
import org.apache.hadoop.yarn.webapp.WebApps;
@ -143,10 +143,8 @@ protected ApplicationHistoryManager createApplicationHistoryManager(
protected ApplicationTimelineStore createApplicationTimelineStore(
Configuration conf) {
// TODO: need to replace the MemoryApplicationTimelineStore.class with the
// LevelDB implementation
return ReflectionUtils.newInstance(conf.getClass(
YarnConfiguration.ATS_STORE, MemoryApplicationTimelineStore.class,
YarnConfiguration.ATS_STORE, LeveldbApplicationTimelineStore.class,
ApplicationTimelineStore.class), conf);
}

View File

@ -26,6 +26,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.LocalConfigurationProvider;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceStatus;
@ -45,11 +46,8 @@
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.LocalConfigurationProvider;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.conf.ConfigurationProvider;
import org.apache.hadoop.yarn.conf.ConfigurationProviderFactory;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
@ -92,8 +90,6 @@ public class AdminService extends CompositeService implements
private Server server;
private InetSocketAddress masterServiceAddress;
private AccessControlList adminAcl;
private ConfigurationProvider configurationProvider = null;
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
@ -115,10 +111,6 @@ public synchronized void serviceInit(Configuration conf) throws Exception {
}
}
this.configurationProvider =
ConfigurationProviderFactory.getConfigurationProvider(conf);
configurationProvider.init(conf);
masterServiceAddress = conf.getSocketAddr(
YarnConfiguration.RM_ADMIN_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS,
@ -139,9 +131,6 @@ protected synchronized void serviceStart() throws Exception {
@Override
protected synchronized void serviceStop() throws Exception {
stopServer();
if (this.configurationProvider != null) {
configurationProvider.close();
}
super.serviceStop();
}
@ -158,7 +147,10 @@ protected void startServer() throws Exception {
if (conf.getBoolean(
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
false)) {
refreshServiceAcls(conf, new RMPolicyProvider());
refreshServiceAcls(
getConfiguration(conf,
YarnConfiguration.HADOOP_POLICY_CONFIGURATION_FILE),
RMPolicyProvider.getInstance());
}
if (rmContext.isHAEnabled()) {
@ -321,8 +313,8 @@ public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request)
RefreshQueuesResponse response =
recordFactory.newRecordInstance(RefreshQueuesResponse.class);
try {
Configuration conf =
getConfiguration(YarnConfiguration.CS_CONFIGURATION_FILE);
Configuration conf = getConfiguration(getConfig(),
YarnConfiguration.CS_CONFIGURATION_FILE);
rmContext.getScheduler().reinitialize(conf, this.rmContext);
RMAuditLogger.logSuccess(user.getShortUserName(), argName,
"AdminService");
@ -376,7 +368,8 @@ public RefreshSuperUserGroupsConfigurationResponse refreshSuperUserGroupsConfigu
}
Configuration conf =
getConfiguration(YarnConfiguration.CORE_SITE_CONFIGURATION_FILE);
getConfiguration(getConfig(),
YarnConfiguration.CORE_SITE_CONFIGURATION_FILE);
ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
RMAuditLogger.logSuccess(user.getShortUserName(),
argName, "AdminService");
@ -421,7 +414,7 @@ public RefreshAdminAclsResponse refreshAdminAcls(
throwStandbyException();
}
Configuration conf =
getConfiguration(YarnConfiguration.YARN_SITE_XML_FILE);
getConfiguration(getConfig(), YarnConfiguration.YARN_SITE_XML_FILE);
adminAcl = new AccessControlList(conf.get(
YarnConfiguration.YARN_ADMIN_ACL,
YarnConfiguration.DEFAULT_YARN_ADMIN_ACL));
@ -452,9 +445,10 @@ public RefreshServiceAclsResponse refreshServiceAcls(
throwStandbyException();
}
PolicyProvider policyProvider = new RMPolicyProvider();
PolicyProvider policyProvider = RMPolicyProvider.getInstance();
Configuration conf =
getConfiguration(YarnConfiguration.HADOOP_POLICY_CONFIGURATION_FILE);
getConfiguration(getConfig(),
YarnConfiguration.HADOOP_POLICY_CONFIGURATION_FILE);
refreshServiceAcls(conf, policyProvider);
rmContext.getClientRMService().refreshServiceAcls(conf, policyProvider);
@ -466,12 +460,13 @@ public RefreshServiceAclsResponse refreshServiceAcls(
return recordFactory.newRecordInstance(RefreshServiceAclsResponse.class);
}
synchronized void refreshServiceAcls(Configuration configuration,
private synchronized void refreshServiceAcls(Configuration configuration,
PolicyProvider policyProvider) {
if (this.configurationProvider instanceof LocalConfigurationProvider) {
if (this.rmContext.getConfigurationProvider() instanceof
LocalConfigurationProvider) {
this.server.refreshServiceAcl(configuration, policyProvider);
} else {
this.server.refreshServiceAclWithConfigration(configuration,
this.server.refreshServiceAclWithLoadedConfiguration(configuration,
policyProvider);
}
}
@ -521,9 +516,10 @@ public UpdateNodeResourceResponse updateNodeResource(
return response;
}
private synchronized Configuration getConfiguration(String confFileName)
throws YarnException, IOException {
return this.configurationProvider.getConfiguration(confFileName);
private synchronized Configuration getConfiguration(Configuration conf,
String confFileName) throws YarnException, IOException {
return this.rmContext.getConfigurationProvider().getConfiguration(conf,
confFileName);
}
@VisibleForTesting

View File

@ -105,7 +105,6 @@ public class ApplicationMasterService extends AbstractService implements
private final AllocateResponse resync =
recordFactory.newRecordInstance(AllocateResponse.class);
private final RMContext rmContext;
private boolean useLocalConfigurationProvider;
public ApplicationMasterService(RMContext rmContext, YarnScheduler scheduler) {
super(ApplicationMasterService.class.getName());
@ -115,15 +114,6 @@ public ApplicationMasterService(RMContext rmContext, YarnScheduler scheduler) {
this.rmContext = rmContext;
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
this.useLocalConfigurationProvider =
(LocalConfigurationProvider.class.isAssignableFrom(conf.getClass(
YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
LocalConfigurationProvider.class)));
super.serviceInit(conf);
}
@Override
protected void serviceStart() throws Exception {
Configuration conf = getConfig();
@ -150,7 +140,10 @@ protected void serviceStart() throws Exception {
if (conf.getBoolean(
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
false)) {
refreshServiceAcls(conf, new RMPolicyProvider());
refreshServiceAcls(
this.rmContext.getConfigurationProvider().getConfiguration(conf,
YarnConfiguration.HADOOP_POLICY_CONFIGURATION_FILE),
RMPolicyProvider.getInstance());
}
this.server.start();
@ -591,10 +584,11 @@ public void unregisterAttempt(ApplicationAttemptId attemptId) {
public void refreshServiceAcls(Configuration configuration,
PolicyProvider policyProvider) {
if (this.useLocalConfigurationProvider) {
if (this.rmContext.getConfigurationProvider() instanceof
LocalConfigurationProvider) {
this.server.refreshServiceAcl(configuration, policyProvider);
} else {
this.server.refreshServiceAclWithConfigration(configuration,
this.server.refreshServiceAclWithLoadedConfiguration(configuration,
policyProvider);
}
}

View File

@ -136,7 +136,6 @@ public class ClientRMService extends AbstractService implements
private final ApplicationACLsManager applicationsACLsManager;
private final QueueACLsManager queueACLsManager;
private boolean useLocalConfigurationProvider;
public ClientRMService(RMContext rmContext, YarnScheduler scheduler,
RMAppManager rmAppManager, ApplicationACLsManager applicationACLsManager,
@ -154,10 +153,6 @@ public ClientRMService(RMContext rmContext, YarnScheduler scheduler,
@Override
protected void serviceInit(Configuration conf) throws Exception {
clientBindAddress = getBindAddress(conf);
this.useLocalConfigurationProvider =
(LocalConfigurationProvider.class.isAssignableFrom(conf.getClass(
YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
LocalConfigurationProvider.class)));
super.serviceInit(conf);
}
@ -176,7 +171,10 @@ protected void serviceStart() throws Exception {
if (conf.getBoolean(
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
false)) {
refreshServiceAcls(conf, new RMPolicyProvider());
refreshServiceAcls(
this.rmContext.getConfigurationProvider().getConfiguration(conf,
YarnConfiguration.HADOOP_POLICY_CONFIGURATION_FILE),
RMPolicyProvider.getInstance());
}
this.server.start();
@ -809,10 +807,11 @@ private String getRenewerForToken(Token<RMDelegationTokenIdentifier> token)
void refreshServiceAcls(Configuration configuration,
PolicyProvider policyProvider) {
if (this.useLocalConfigurationProvider) {
if (this.rmContext.getConfigurationProvider() instanceof
LocalConfigurationProvider) {
this.server.refreshServiceAcl(configuration, policyProvider);
} else {
this.server.refreshServiceAclWithConfigration(configuration,
this.server.refreshServiceAclWithLoadedConfiguration(configuration,
policyProvider);
}
}

View File

@ -23,6 +23,7 @@
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.ConfigurationProvider;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
@ -97,4 +98,5 @@ void setRMDelegationTokenSecretManager(
void setRMApplicationHistoryWriter(
RMApplicationHistoryWriter rmApplicationHistoryWriter);
ConfigurationProvider getConfigurationProvider();
}

View File

@ -23,8 +23,10 @@
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.yarn.LocalConfigurationProvider;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.ConfigurationProvider;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
@ -78,7 +80,7 @@ public class RMContextImpl implements RMContext {
private ResourceTrackerService resourceTrackerService;
private ApplicationMasterService applicationMasterService;
private RMApplicationHistoryWriter rmApplicationHistoryWriter;
private ConfigurationProvider configurationProvider;
/**
* Default constructor. To be used in conjunction with setter methods for
* individual fields.
@ -119,8 +121,11 @@ public RMContextImpl(Dispatcher rmDispatcher,
} catch (Exception e) {
assert false;
}
ConfigurationProvider provider = new LocalConfigurationProvider();
setConfigurationProvider(provider);
}
@Override
public Dispatcher getDispatcher() {
return this.rmDispatcher;
@ -334,4 +339,13 @@ public void setRMApplicationHistoryWriter(
this.rmApplicationHistoryWriter = rmApplicationHistoryWriter;
}
@Override
public ConfigurationProvider getConfigurationProvider() {
return this.configurationProvider;
}
public void setConfigurationProvider(
ConfigurationProvider configurationProvider) {
this.configurationProvider = configurationProvider;
}
}

View File

@ -42,10 +42,13 @@
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.LocalConfigurationProvider;
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.ConfigurationProvider;
import org.apache.hadoop.yarn.conf.ConfigurationProviderFactory;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
@ -154,7 +157,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
private boolean recoveryEnabled;
private String webAppAddress;
private ConfigurationProvider configurationProvider = null;
/** End of Active services */
private Configuration conf;
@ -182,6 +185,21 @@ protected void serviceInit(Configuration conf) throws Exception {
this.conf = conf;
this.rmContext = new RMContextImpl();
this.configurationProvider =
ConfigurationProviderFactory.getConfigurationProvider(conf);
this.configurationProvider.init(this.conf);
rmContext.setConfigurationProvider(configurationProvider);
if (!(this.configurationProvider instanceof LocalConfigurationProvider)) {
// load yarn-site.xml
this.conf =
this.configurationProvider.getConfiguration(this.conf,
YarnConfiguration.YARN_SITE_XML_FILE);
// load core-site.xml
this.conf =
this.configurationProvider.getConfiguration(this.conf,
YarnConfiguration.CORE_SITE_CONFIGURATION_FILE);
}
// register the handlers for all AlwaysOn services using setupDispatcher().
rmDispatcher = setupDispatcher();
addIfService(rmDispatcher);
@ -884,6 +902,9 @@ protected void serviceStop() throws Exception {
if (fetcher != null) {
fetcher.stop();
}
if (configurationProvider != null) {
configurationProvider.close();
}
super.serviceStop();
transitionToStandby(false);
rmContext.setHAServiceState(HAServiceState.STOPPING);

View File

@ -95,7 +95,6 @@ public class ResourceTrackerService extends AbstractService implements
private int minAllocMb;
private int minAllocVcores;
private boolean useLocalConfigurationProvider;
static {
resync.setNodeAction(NodeAction.RESYNC);
@ -145,10 +144,6 @@ protected void serviceInit(Configuration conf) throws Exception {
YarnConfiguration.RM_NODEMANAGER_MINIMUM_VERSION,
YarnConfiguration.DEFAULT_RM_NODEMANAGER_MINIMUM_VERSION);
this.useLocalConfigurationProvider =
(LocalConfigurationProvider.class.isAssignableFrom(conf.getClass(
YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
LocalConfigurationProvider.class)));
super.serviceInit(conf);
}
@ -169,7 +164,10 @@ protected void serviceStart() throws Exception {
if (conf.getBoolean(
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
false)) {
refreshServiceAcls(conf, new RMPolicyProvider());
refreshServiceAcls(
this.rmContext.getConfigurationProvider().getConfiguration(conf,
YarnConfiguration.HADOOP_POLICY_CONFIGURATION_FILE),
RMPolicyProvider.getInstance());
}
this.server.start();
@ -423,10 +421,11 @@ public static Node resolve(String hostName) {
void refreshServiceAcls(Configuration configuration,
PolicyProvider policyProvider) {
if (this.useLocalConfigurationProvider) {
if (this.rmContext.getConfigurationProvider() instanceof
LocalConfigurationProvider) {
this.server.refreshServiceAcl(configuration, policyProvider);
} else {
this.server.refreshServiceAclWithConfigration(configuration,
this.server.refreshServiceAclWithLoadedConfiguration(configuration,
policyProvider);
}
}

View File

@ -196,7 +196,6 @@ public Configuration getConf() {
private ResourceCalculator calculator;
private boolean usePortForNodeName;
private boolean useLocalConfigurationProvider;
public CapacityScheduler() {}
@ -262,14 +261,21 @@ public Resource getClusterResources() {
@Override
public synchronized void
reinitialize(Configuration conf, RMContext rmContext) throws IOException {
Configuration configuration = new Configuration(conf);
if (!initialized) {
this.useLocalConfigurationProvider =
(LocalConfigurationProvider.class.isAssignableFrom(conf.getClass(
YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
LocalConfigurationProvider.class)));
this.conf =
new CapacitySchedulerConfiguration(conf,
this.useLocalConfigurationProvider);
if (rmContext.getConfigurationProvider() instanceof
LocalConfigurationProvider) {
this.conf = new CapacitySchedulerConfiguration(configuration, true);
} else {
try {
this.conf =
new CapacitySchedulerConfiguration(rmContext
.getConfigurationProvider().getConfiguration(configuration,
YarnConfiguration.CS_CONFIGURATION_FILE), false);
} catch (Exception e) {
throw new IOException(e);
}
}
validateConf(this.conf);
this.minimumAllocation = this.conf.getMinimumAllocation();
this.maximumAllocation = this.conf.getMaximumAllocation();
@ -290,7 +296,8 @@ public Resource getClusterResources() {
CapacitySchedulerConfiguration oldConf = this.conf;
this.conf =
new CapacitySchedulerConfiguration(conf,
this.useLocalConfigurationProvider);
rmContext.getConfigurationProvider() instanceof
LocalConfigurationProvider);
validateConf(this.conf);
try {
LOG.info("Re-initializing queues...");
@ -316,6 +323,7 @@ public CSQueue hook(CSQueue queue) {
@Lock(CapacityScheduler.class)
private void initializeQueues(CapacitySchedulerConfiguration conf)
throws IOException {
root =
parseQueue(this, conf, null, CapacitySchedulerConfiguration.ROOT,
queues, queues, noop);

View File

@ -18,7 +18,9 @@
package org.apache.hadoop.yarn.server.resourcemanager.security.authorize;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.security.authorize.PolicyProvider;
@ -37,6 +39,23 @@
@InterfaceStability.Unstable
public class RMPolicyProvider extends PolicyProvider {
private static RMPolicyProvider rmPolicyProvider = null;
private RMPolicyProvider() {}
@Private
@Unstable
public static RMPolicyProvider getInstance() {
if (rmPolicyProvider == null) {
synchronized(RMPolicyProvider.class) {
if (rmPolicyProvider == null) {
rmPolicyProvider = new RMPolicyProvider();
}
}
}
return rmPolicyProvider;
}
private static final Service[] resourceManagerServices =
new Service[] {
new Service(

View File

@ -26,7 +26,6 @@
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.authorize.AccessControlList;
@ -105,34 +104,34 @@ public void testAdminRefreshQueuesWithFileSystemBasedConfigurationProvider()
throws IOException, YarnException {
configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
"org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider");
rm = new MockRM(configuration);
rm.init(configuration);
rm.start();
try {
rm = new MockRM(configuration);
rm.init(configuration);
rm.start();
fail("Should throw an exception");
} catch(Exception ex) {
// Expect exception here
}
// clean the remoteDirectory
cleanRemoteDirectory();
//upload default configurations
uploadDefaultConfiguration();
try {
rm = new MockRM(configuration);
rm.init(configuration);
rm.start();
} catch(Exception ex) {
fail("Should not get any exceptions");
}
CapacityScheduler cs =
(CapacityScheduler) rm.getRMContext().getScheduler();
int maxAppsBefore = cs.getConfiguration().getMaximumSystemApplications();
try {
rm.adminService.refreshQueues(RefreshQueuesRequest.newInstance());
fail("FileSystemBasedConfigurationProvider is used." +
" Should get an exception here");
} catch (Exception ex) {
Assert.assertTrue(ex.getMessage().contains(
"Can not find Configuration: capacity-scheduler.xml"));
}
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration();
csConf.set("yarn.scheduler.capacity.maximum-applications", "5000");
String csConfFile = writeConfigurationXML(csConf,
"capacity-scheduler.xml");
// upload the file into Remote File System
uploadToRemoteFileSystem(new Path(csConfFile));
uploadConfiguration(csConf, "capacity-scheduler.xml");
rm.adminService.refreshQueues(RefreshQueuesRequest.newInstance());
@ -159,20 +158,24 @@ public void testAdminAclsWithFileSystemBasedConfigurationProvider()
throws IOException, YarnException {
configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
"org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider");
rm = new MockRM(configuration);
rm.init(configuration);
rm.start();
try {
rm = new MockRM(configuration);
rm.init(configuration);
rm.start();
fail("Should throw an exception");
} catch(Exception ex) {
// Expect exception here
}
// clean the remoteDirectory
cleanRemoteDirectory();
//upload default configurations
uploadDefaultConfiguration();
try {
rm.adminService.refreshAdminAcls(RefreshAdminAclsRequest.newInstance());
fail("FileSystemBasedConfigurationProvider is used." +
" Should get an exception here");
} catch (Exception ex) {
Assert.assertTrue(ex.getMessage().contains(
"Can not find Configuration: yarn-site.xml"));
rm = new MockRM(configuration);
rm.init(configuration);
rm.start();
} catch(Exception ex) {
fail("Should not get any exceptions");
}
String aclStringBefore =
@ -180,10 +183,8 @@ public void testAdminAclsWithFileSystemBasedConfigurationProvider()
YarnConfiguration yarnConf = new YarnConfiguration();
yarnConf.set(YarnConfiguration.YARN_ADMIN_ACL, "world:anyone:rwcda");
String yarnConfFile = writeConfigurationXML(yarnConf, "yarn-site.xml");
uploadConfiguration(yarnConf, "yarn-site.xml");
// upload the file into Remote File System
uploadToRemoteFileSystem(new Path(yarnConfFile));
rm.adminService.refreshAdminAcls(RefreshAdminAclsRequest.newInstance());
String aclStringAfter =
@ -214,7 +215,6 @@ public void testServiceAclsRefreshWithLocalConfigurationProvider() {
}
}
@SuppressWarnings("resource")
@Test
public void testServiceAclsRefreshWithFileSystemBasedConfigurationProvider()
throws IOException, YarnException {
@ -224,33 +224,33 @@ public void testServiceAclsRefreshWithFileSystemBasedConfigurationProvider()
"org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider");
ResourceManager resourceManager = null;
try {
resourceManager = new ResourceManager();
resourceManager.init(configuration);
resourceManager.start();
// clean the remoteDirectory
cleanRemoteDirectory();
try {
resourceManager.adminService
.refreshServiceAcls(RefreshServiceAclsRequest
.newInstance());
fail("FileSystemBasedConfigurationProvider is used." +
" Should get an exception here");
resourceManager = new ResourceManager();
resourceManager.init(configuration);
resourceManager.start();
fail("Should throw an exception");
} catch (Exception ex) {
Assert.assertTrue(ex.getMessage().contains(
"Can not find Configuration: hadoop-policy.xml"));
// expect to get an exception here
}
String aclsString = "alice,bob users,wheel";
//upload default configurations
uploadDefaultConfiguration();
Configuration conf = new Configuration();
conf.setBoolean(
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, true);
conf.set("security.applicationclient.protocol.acl", aclsString);
String hadoopConfFile = writeConfigurationXML(conf, "hadoop-policy.xml");
uploadConfiguration(conf, "core-site.xml");
try {
resourceManager = new ResourceManager();
resourceManager.init(configuration);
resourceManager.start();
} catch (Exception ex) {
fail("Should not get any exceptions");
}
// upload the file into Remote File System
uploadToRemoteFileSystem(new Path(hadoopConfFile));
String aclsString = "alice,bob users,wheel";
Configuration newConf = new Configuration();
newConf.set("security.applicationclient.protocol.acl", aclsString);
uploadConfiguration(newConf, "hadoop-policy.xml");
resourceManager.adminService.refreshServiceAcls(RefreshServiceAclsRequest
.newInstance());
@ -328,31 +328,31 @@ private void verifyServiceACLsRefresh(ServiceAuthorizationManager manager,
throws IOException, YarnException {
configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
"org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider");
rm = new MockRM(configuration);
rm.init(configuration);
rm.start();
try {
rm = new MockRM(configuration);
rm.init(configuration);
rm.start();
fail("Should throw an exception");
} catch(Exception ex) {
// Expect exception here
}
// clean the remoteDirectory
cleanRemoteDirectory();
//upload default configurations
uploadDefaultConfiguration();
try {
rm.adminService.refreshSuperUserGroupsConfiguration(
RefreshSuperUserGroupsConfigurationRequest.newInstance());
fail("FileSystemBasedConfigurationProvider is used." +
" Should get an exception here");
} catch (Exception ex) {
Assert.assertTrue(ex.getMessage().contains(
"Can not find Configuration: core-site.xml"));
rm = new MockRM(configuration);
rm.init(configuration);
rm.start();
} catch(Exception ex) {
fail("Should not get any exceptions");
}
Configuration coreConf = new Configuration(false);
coreConf.set("hadoop.proxyuser.test.groups", "test_groups");
coreConf.set("hadoop.proxyuser.test.hosts", "test_hosts");
String coreConfFile = writeConfigurationXML(coreConf,
"core-site.xml");
uploadConfiguration(coreConf, "core-site.xml");
// upload the file into Remote File System
uploadToRemoteFileSystem(new Path(coreConfFile));
rm.adminService.refreshSuperUserGroupsConfiguration(
RefreshSuperUserGroupsConfigurationRequest.newInstance());
Assert.assertTrue(ProxyUsers.getProxyGroups()
@ -393,11 +393,29 @@ private void uploadToRemoteFileSystem(Path filePath)
fs.copyFromLocalFile(filePath, workingPath);
}
private void cleanRemoteDirectory() throws IOException {
if (fs.exists(workingPath)) {
for (FileStatus file : fs.listStatus(workingPath)) {
fs.delete(file.getPath(), true);
}
}
private void uploadConfiguration(Configuration conf, String confFileName)
throws IOException {
String csConfFile = writeConfigurationXML(conf, confFileName);
// upload the file into Remote File System
uploadToRemoteFileSystem(new Path(csConfFile));
}
private void uploadDefaultConfiguration() throws IOException {
Configuration conf = new Configuration();
uploadConfiguration(conf, "core-site.xml");
YarnConfiguration yarnConf = new YarnConfiguration();
yarnConf.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
"org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider");
uploadConfiguration(yarnConf, "yarn-site.xml");
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration();
uploadConfiguration(csConf, "capacity-scheduler.xml");
Configuration hadoopPolicyConf = new Configuration(false);
hadoopPolicyConf
.addResource(YarnConfiguration.HADOOP_POLICY_CONFIGURATION_FILE);
uploadConfiguration(hadoopPolicyConf, "hadoop-policy.xml");
}
}

View File

@ -40,6 +40,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.yarn.LocalConfigurationProvider;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
@ -104,6 +105,7 @@ public class TestCapacityScheduler {
private static float B3_CAPACITY = 20;
private ResourceManager resourceManager = null;
private RMContext mockContext;
@Before
public void setUp() throws Exception {
@ -118,6 +120,9 @@ public void setUp() throws Exception {
resourceManager.getRMContainerTokenSecretManager().rollMasterKey();
resourceManager.getRMNMTokenSecretManager().rollMasterKey();
((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start();
mockContext = mock(RMContext.class);
when(mockContext.getConfigurationProvider()).thenReturn(
new LocalConfigurationProvider());
}
@After
@ -133,7 +138,7 @@ public void testConfValidation() throws Exception {
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 2048);
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 1024);
try {
scheduler.reinitialize(conf, null);
scheduler.reinitialize(conf, mockContext);
fail("Exception is expected because the min memory allocation is" +
" larger than the max memory allocation.");
} catch (YarnRuntimeException e) {
@ -147,7 +152,7 @@ public void testConfValidation() throws Exception {
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, 2);
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, 1);
try {
scheduler.reinitialize(conf, null);
scheduler.reinitialize(conf, mockContext);
fail("Exception is expected because the min vcores allocation is" +
" larger than the max vcores allocation.");
} catch (YarnRuntimeException e) {
@ -353,7 +358,7 @@ null, new RMContainerTokenSecretManager(conf),
conf.setCapacity(A, 80f);
conf.setCapacity(B, 20f);
cs.reinitialize(conf,null);
cs.reinitialize(conf, mockContext);
checkQueueCapacities(cs, 80f, 20f);
}
@ -503,7 +508,7 @@ null, new RMContainerTokenSecretManager(conf),
conf.setCapacity(B2, B2_CAPACITY);
conf.setCapacity(B3, B3_CAPACITY);
conf.setCapacity(B4, B4_CAPACITY);
cs.reinitialize(conf,null);
cs.reinitialize(conf,mockContext);
checkQueueCapacities(cs, 80f, 20f);
// Verify parent for B4