Merge r1569890 through r1570083 from trunk.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-5535@1570084 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2014-02-20 04:38:30 +00:00
commit 55aec006f4
43 changed files with 995 additions and 206 deletions

View File

@ -765,6 +765,19 @@ public void addResource(InputStream in, String name) {
addResourceObject(new Resource(in, name));
}
/**
* Add a configuration resource.
*
* The properties of this resource will override properties of previously
* added resources, unless they were marked <a href="#Final">final</a>.
*
* @param conf Configuration object from which to load properties
*/
public void addResource(Configuration conf) {
addResourceObject(new Resource(conf.getProps()));
}
/**
* Reload configuration from previously added resources.

View File

@ -377,6 +377,22 @@ public void close() throws IOException {
Thread.currentThread().interrupt();
}
}
/**
* Call shutdown(SHUT_RDWR) on the UNIX domain socket.
*
* @throws IOException
*/
public void shutdown() throws IOException {
refCount.reference();
boolean exc = true;
try {
shutdown0(fd);
exc = false;
} finally {
unreference(exc);
}
}
private native static void sendFileDescriptors0(int fd,
FileDescriptor descriptors[],

View File

@ -34,6 +34,8 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.AfterClass;
import org.junit.Assert;
@ -41,7 +43,6 @@
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.unix.DomainSocket.DomainChannel;
@ -727,4 +728,38 @@ public void testFdPassingPathSecurity() throws Exception {
tmp.close();
}
}
@Test(timeout=180000)
public void testShutdown() throws Exception {
final AtomicInteger bytesRead = new AtomicInteger(0);
final AtomicBoolean failed = new AtomicBoolean(false);
final DomainSocket[] socks = DomainSocket.socketpair();
Runnable reader = new Runnable() {
@Override
public void run() {
while (true) {
try {
int ret = socks[1].getInputStream().read();
if (ret == -1) return;
bytesRead.addAndGet(1);
} catch (IOException e) {
DomainSocket.LOG.error("reader error", e);
failed.set(true);
return;
}
}
}
};
Thread readerThread = new Thread(reader);
readerThread.start();
socks[0].getOutputStream().write(1);
socks[0].getOutputStream().write(2);
socks[0].getOutputStream().write(3);
Assert.assertTrue(readerThread.isAlive());
socks[0].shutdown();
readerThread.join();
Assert.assertFalse(failed.get());
Assert.assertEquals(3, bytesRead.get());
IOUtils.cleanup(null, socks);
}
}

View File

@ -416,6 +416,14 @@ Release 2.4.0 - UNRELEASED
HDFS-5726. Fix compilation error in AbstractINodeDiff for JDK7. (jing9)
HDFS-5973. add DomainSocket#shutdown method (cmccabe)
HDFS-5318. Support read-only and read-write paths to shared replicas.
(Eric Sirianni via Arpit Agarwal)
HDFS-5868. Make hsync implementation pluggable on the DataNode.
(Buddy Taylor via Arpit Agarwal)
OPTIMIZATIONS
HDFS-5790. LeaseManager.findPath is very slow when many leases need recovery
@ -521,6 +529,14 @@ Release 2.4.0 - UNRELEASED
HDFS-5961. OIV cannot load fsimages containing a symbolic link. (kihwal)
HDFS-5483. NN should gracefully handle multiple block replicas on same DN.
(Arpit Agarwal)
HDFS-5742. DatanodeCluster (mini cluster of DNs) fails to start.
(Arpit Agarwal)
HDFS-5979. Typo and logger fix for fsimage PB code. (wang)
BREAKDOWN OF HDFS-5698 SUBTASKS AND RELATED JIRAS
HDFS-5717. Save FSImage header in protobuf. (Haohui Mai via jing9)

View File

@ -1581,8 +1581,8 @@ public static DatanodeStorageProto convert(DatanodeStorage s) {
private static StorageState convertState(State state) {
switch(state) {
case READ_ONLY:
return StorageState.READ_ONLY;
case READ_ONLY_SHARED:
return StorageState.READ_ONLY_SHARED;
case NORMAL:
default:
return StorageState.NORMAL;
@ -1610,8 +1610,8 @@ public static DatanodeStorage convert(DatanodeStorageProto s) {
private static State convertState(StorageState state) {
switch(state) {
case READ_ONLY:
return DatanodeStorage.State.READ_ONLY;
case READ_ONLY_SHARED:
return DatanodeStorage.State.READ_ONLY_SHARED;
case NORMAL:
default:
return DatanodeStorage.State.NORMAL;

View File

@ -73,6 +73,7 @@
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
@ -501,7 +502,10 @@ private void dumpBlockMeta(Block block, PrintWriter out) {
chooseSourceDatanode(block, containingNodes,
containingLiveReplicasNodes, numReplicas,
UnderReplicatedBlocks.LEVEL);
assert containingLiveReplicasNodes.size() == numReplicas.liveReplicas();
// containingLiveReplicasNodes can include READ_ONLY_SHARED replicas which are
// not included in the numReplicas.liveReplicas() count
assert containingLiveReplicasNodes.size() >= numReplicas.liveReplicas();
int usableReplicas = numReplicas.liveReplicas() +
numReplicas.decommissionedReplicas();
@ -1040,7 +1044,7 @@ void addToInvalidates(final Block block, final DatanodeInfo datanode) {
*/
private void addToInvalidates(Block b) {
StringBuilder datanodes = new StringBuilder();
for(DatanodeStorageInfo storage : blocksMap.getStorages(b)) {
for(DatanodeStorageInfo storage : blocksMap.getStorages(b, State.NORMAL)) {
final DatanodeDescriptor node = storage.getDatanodeDescriptor();
invalidateBlocks.add(b, node, false);
datanodes.append(node).append(" ");
@ -1254,7 +1258,10 @@ int computeReplicationWorkForBlocks(List<List<Block>> blocksToReplicate) {
continue;
}
assert liveReplicaNodes.size() == numReplicas.liveReplicas();
// liveReplicaNodes can include READ_ONLY_SHARED replicas which are
// not included in the numReplicas.liveReplicas() count
assert liveReplicaNodes.size() >= numReplicas.liveReplicas();
// do not schedule more if enough replicas is already pending
numEffectiveReplicas = numReplicas.liveReplicas() +
pendingReplications.getNumReplicas(block);
@ -1494,15 +1501,16 @@ DatanodeDescriptor chooseSourceDatanode(Block block,
final DatanodeDescriptor node = storage.getDatanodeDescriptor();
LightWeightLinkedSet<Block> excessBlocks =
excessReplicateMap.get(node.getDatanodeUuid());
int countableReplica = storage.getState() == State.NORMAL ? 1 : 0;
if ((nodesCorrupt != null) && (nodesCorrupt.contains(node)))
corrupt++;
corrupt += countableReplica;
else if (node.isDecommissionInProgress() || node.isDecommissioned())
decommissioned++;
decommissioned += countableReplica;
else if (excessBlocks != null && excessBlocks.contains(block)) {
excess++;
excess += countableReplica;
} else {
nodesContainingLiveReplicas.add(storage);
live++;
live += countableReplica;
}
containingNodes.add(node);
// Check if this replica is corrupt
@ -1880,7 +1888,8 @@ private void reportDiff(DatanodeDescriptor dn, DatanodeStorage storage,
iblk, iState, toAdd, toInvalidate, toCorrupt, toUC);
// move block to the head of the list
if (storedBlock != null && (curIndex = storedBlock.findDatanode(dn)) >= 0) {
if (storedBlock != null &&
(curIndex = storedBlock.findStorageInfo(storageInfo)) >= 0) {
headIndex = storageInfo.moveBlockToHead(storedBlock, curIndex, headIndex);
}
}
@ -2581,7 +2590,7 @@ private void processOverReplicatedBlock(final Block block,
Collection<DatanodeDescriptor> nonExcess = new ArrayList<DatanodeDescriptor>();
Collection<DatanodeDescriptor> corruptNodes = corruptReplicas
.getNodes(block);
for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) {
for(DatanodeStorageInfo storage : blocksMap.getStorages(block, State.NORMAL)) {
final DatanodeDescriptor cur = storage.getDatanodeDescriptor();
if (storage.areBlockContentsStale()) {
LOG.info("BLOCK* processOverReplicatedBlock: " +
@ -2910,7 +2919,7 @@ public NumberReplicas countNodes(Block b) {
int excess = 0;
int stale = 0;
Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(b);
for(DatanodeStorageInfo storage : blocksMap.getStorages(b)) {
for(DatanodeStorageInfo storage : blocksMap.getStorages(b, State.NORMAL)) {
final DatanodeDescriptor node = storage.getDatanodeDescriptor();
if ((nodesCorrupt != null) && (nodesCorrupt.contains(node))) {
corrupt++;
@ -2949,7 +2958,7 @@ int countLiveNodes(BlockInfo b) {
// else proceed with fast case
int live = 0;
Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(b);
for(DatanodeStorageInfo storage : blocksMap.getStorages(b)) {
for(DatanodeStorageInfo storage : blocksMap.getStorages(b, State.NORMAL)) {
final DatanodeDescriptor node = storage.getDatanodeDescriptor();
if ((nodesCorrupt == null) || (!nodesCorrupt.contains(node)))
live++;

View File

@ -605,7 +605,7 @@ private boolean isGoodTarget(DatanodeStorageInfo storage,
+ storageType);
return false;
}
if (storage.getState() == State.READ_ONLY) {
if (storage.getState() == State.READ_ONLY_SHARED) {
logNodeIsNotChosen(storage, "storage is read-only");
return false;
}

View File

@ -20,10 +20,14 @@
import java.util.Iterator;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.util.GSet;
import org.apache.hadoop.util.LightWeightGSet;
import org.apache.hadoop.util.LightWeightGSet.SetIterator;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
/**
* This class maintains the map from a block to its metadata.
* block's metadata currently includes blockCollection it belongs to and
@ -135,6 +139,22 @@ Iterable<DatanodeStorageInfo> getStorages(Block b) {
return getStorages(blocks.get(b));
}
/**
* Searches for the block in the BlocksMap and
* returns {@link Iterable} of the storages the block belongs to
* <i>that are of the given {@link DatanodeStorage.State state}</i>.
*
* @param state DatanodeStorage state by which to filter the returned Iterable
*/
Iterable<DatanodeStorageInfo> getStorages(Block b, final DatanodeStorage.State state) {
return Iterables.filter(getStorages(blocks.get(b)), new Predicate<DatanodeStorageInfo>() {
@Override
public boolean apply(DatanodeStorageInfo storage) {
return storage.getState() == state;
}
});
}
/**
* For a block that has already been retrieved from the BlocksMap
* returns {@link Iterable} of the storages the block belongs to.

View File

@ -78,7 +78,6 @@ class BlockReceiver implements Closeable {
private boolean needsChecksumTranslation;
private OutputStream out = null; // to block file at local disk
private FileDescriptor outFd;
private OutputStream cout = null; // output stream for cehcksum file
private DataOutputStream checksumOut = null; // to crc file at local disk
private int bytesPerChecksum;
private int checksumSize;
@ -223,9 +222,8 @@ class BlockReceiver implements Closeable {
LOG.warn("Could not get file descriptor for outputstream of class " +
out.getClass());
}
this.cout = streams.getChecksumOut();
this.checksumOut = new DataOutputStream(new BufferedOutputStream(
cout, HdfsConstants.SMALL_BUFFER_SIZE));
streams.getChecksumOut(), HdfsConstants.SMALL_BUFFER_SIZE));
// write data chunk header if creating a new replica
if (isCreate) {
BlockMetadataHeader.writeHeader(checksumOut, diskChecksum);
@ -280,9 +278,9 @@ public void close() throws IOException {
long flushStartNanos = System.nanoTime();
checksumOut.flush();
long flushEndNanos = System.nanoTime();
if (syncOnClose && (cout instanceof FileOutputStream)) {
if (syncOnClose) {
long fsyncStartNanos = flushEndNanos;
((FileOutputStream)cout).getChannel().force(true);
streams.syncChecksumOut();
datanode.metrics.addFsyncNanos(System.nanoTime() - fsyncStartNanos);
}
flushTotalNanos += flushEndNanos - flushStartNanos;
@ -302,9 +300,9 @@ public void close() throws IOException {
long flushStartNanos = System.nanoTime();
out.flush();
long flushEndNanos = System.nanoTime();
if (syncOnClose && (out instanceof FileOutputStream)) {
if (syncOnClose) {
long fsyncStartNanos = flushEndNanos;
((FileOutputStream)out).getChannel().force(true);
streams.syncDataOut();
datanode.metrics.addFsyncNanos(System.nanoTime() - fsyncStartNanos);
}
flushTotalNanos += flushEndNanos - flushStartNanos;
@ -338,9 +336,9 @@ void flushOrSync(boolean isSync) throws IOException {
long flushStartNanos = System.nanoTime();
checksumOut.flush();
long flushEndNanos = System.nanoTime();
if (isSync && (cout instanceof FileOutputStream)) {
if (isSync) {
long fsyncStartNanos = flushEndNanos;
((FileOutputStream)cout).getChannel().force(true);
streams.syncChecksumOut();
datanode.metrics.addFsyncNanos(System.nanoTime() - fsyncStartNanos);
}
flushTotalNanos += flushEndNanos - flushStartNanos;
@ -349,9 +347,9 @@ void flushOrSync(boolean isSync) throws IOException {
long flushStartNanos = System.nanoTime();
out.flush();
long flushEndNanos = System.nanoTime();
if (isSync && (out instanceof FileOutputStream)) {
if (isSync) {
long fsyncStartNanos = flushEndNanos;
((FileOutputStream)out).getChannel().force(true);
streams.syncDataOut();
datanode.metrics.addFsyncNanos(System.nanoTime() - fsyncStartNanos);
}
flushTotalNanos += flushEndNanos - flushStartNanos;

View File

@ -18,7 +18,9 @@
package org.apache.hadoop.hdfs.server.datanode.fsdataset;
import java.io.Closeable;
import java.io.FileOutputStream;
import java.io.OutputStream;
import java.io.IOException;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.DataChecksum;
@ -62,4 +64,23 @@ public void close() {
IOUtils.closeStream(dataOut);
IOUtils.closeStream(checksumOut);
}
}
/**
* Sync the data stream if it supports it.
*/
public void syncDataOut() throws IOException {
if (dataOut instanceof FileOutputStream) {
((FileOutputStream)dataOut).getChannel().force(true);
}
}
/**
* Sync the checksum stream if it supports it.
*/
public void syncChecksumOut() throws IOException {
if (checksumOut instanceof FileOutputStream) {
((FileOutputStream)checksumOut).getChannel().force(true);
}
}
}

View File

@ -139,6 +139,7 @@ void format(FSNamesystem fsn, String clusterId) throws IOException {
"FSImage.format should be called with an uninitialized namesystem, has " +
fileCount + " files");
NamespaceInfo ns = NNStorage.newNamespaceInfo();
LOG.info("Allocated new BlockPoolId: " + ns.getBlockPoolID());
ns.clusterID = clusterId;
storage.format(ns);

View File

@ -75,7 +75,7 @@ public final class FSImageFormatPBINode {
private static final AclEntryType[] ACL_ENTRY_TYPE_VALUES = AclEntryType
.values();
private static final Log LOG = LogFactory.getLog(FSImageFormatProtobuf.class);
private static final Log LOG = LogFactory.getLog(FSImageFormatPBINode.class);
public final static class Loader {
public static PermissionStatus loadPermission(long id,

View File

@ -267,7 +267,7 @@ public int compare(FileSummary.Section s1, FileSummary.Section s2) {
}
break;
default:
LOG.warn("Unregconized section " + n);
LOG.warn("Unrecognized section " + n);
break;
}
}

View File

@ -58,6 +58,8 @@
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementStatus;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.net.NetUtils;
@ -378,11 +380,13 @@ void check(String parent, HdfsFileStatus file, Result res) throws IOException {
boolean isCorrupt = lBlk.isCorrupt();
String blkName = block.toString();
DatanodeInfo[] locs = lBlk.getLocations();
res.totalReplicas += locs.length;
NumberReplicas numberReplicas = namenode.getNamesystem().getBlockManager().countNodes(block.getLocalBlock());
int liveReplicas = numberReplicas.liveReplicas();
res.totalReplicas += liveReplicas;
short targetFileReplication = file.getReplication();
res.numExpectedReplicas += targetFileReplication;
if (locs.length > targetFileReplication) {
res.excessiveReplicas += (locs.length - targetFileReplication);
if (liveReplicas > targetFileReplication) {
res.excessiveReplicas += (liveReplicas - targetFileReplication);
res.numOverReplicatedBlocks += 1;
}
// Check if block is Corrupt
@ -392,10 +396,10 @@ void check(String parent, HdfsFileStatus file, Result res) throws IOException {
out.print("\n" + path + ": CORRUPT blockpool " + block.getBlockPoolId() +
" block " + block.getBlockName()+"\n");
}
if (locs.length >= minReplication)
if (liveReplicas >= minReplication)
res.numMinReplicatedBlocks++;
if (locs.length < targetFileReplication && locs.length > 0) {
res.missingReplicas += (targetFileReplication - locs.length);
if (liveReplicas < targetFileReplication && liveReplicas > 0) {
res.missingReplicas += (targetFileReplication - liveReplicas);
res.numUnderReplicatedBlocks += 1;
underReplicatedPerFile++;
if (!showFiles) {
@ -404,7 +408,7 @@ void check(String parent, HdfsFileStatus file, Result res) throws IOException {
out.println(" Under replicated " + block +
". Target Replicas is " +
targetFileReplication + " but found " +
locs.length + " replica(s).");
liveReplicas + " replica(s).");
}
// verify block placement policy
BlockPlacementStatus blockPlacementStatus = bpPolicy
@ -421,13 +425,13 @@ void check(String parent, HdfsFileStatus file, Result res) throws IOException {
block + ". " + blockPlacementStatus.getErrorDescription());
}
report.append(i + ". " + blkName + " len=" + block.getNumBytes());
if (locs.length == 0) {
if (liveReplicas == 0) {
report.append(" MISSING!");
res.addMissing(block.toString(), block.getNumBytes());
missing++;
missize += block.getNumBytes();
} else {
report.append(" repl=" + locs.length);
report.append(" repl=" + liveReplicas);
if (showLocations || showRacks) {
StringBuilder sb = new StringBuilder("[");
for (int j = 0; j < locs.length; j++) {

View File

@ -28,7 +28,18 @@ public class DatanodeStorage {
/** The state of the storage. */
public enum State {
NORMAL,
READ_ONLY
/**
* A storage that represents a read-only path to replicas stored on a shared storage device.
* Replicas on {@link #READ_ONLY_SHARED} storage are not counted towards live replicas.
*
* <p>
* In certain implementations, a {@link #READ_ONLY_SHARED} storage may be correlated to
* its {@link #NORMAL} counterpart using the {@link DatanodeStorage#storageID}. This
* property should be used for debugging purposes only.
* </p>
*/
READ_ONLY_SHARED;
}
private final String storageID;

View File

@ -50,7 +50,7 @@ message DatanodeRegistrationProto {
message DatanodeStorageProto {
enum StorageState {
NORMAL = 0;
READ_ONLY = 1;
READ_ONLY_SHARED = 1;
}
required string storageUuid = 1;

View File

@ -68,9 +68,10 @@ public class DataNodeCluster {
static String dataNodeDirs = DATANODE_DIRS;
static final String USAGE =
"Usage: datanodecluster " +
" -n <numDataNodes> " +
" -n <numDataNodes> " +
" -bpid <bpid>" +
" [-racks <numRacks>] " +
" [-simulated] " +
" [-simulated [<simulatedCapacityPerDn>]] " +
" [-inject startingBlockId numBlocksPerDN]" +
" [-r replicationFactorForInjectedBlocks]" +
" [-d dataNodeDirs]\n" +
@ -91,7 +92,7 @@ static void printUsageExit(String err) {
printUsageExit();
}
public static void main(String[] args) {
public static void main(String[] args) throws InterruptedException {
int numDataNodes = 0;
int numRacks = 0;
boolean inject = false;
@ -99,6 +100,8 @@ public static void main(String[] args) {
int numBlocksPerDNtoInject = 0;
int replication = 1;
boolean checkDataNodeAddrConfig = false;
long simulatedCapacityPerDn = SimulatedFSDataset.DEFAULT_CAPACITY;
String bpid = null;
Configuration conf = new HdfsConfiguration();
@ -115,7 +118,7 @@ public static void main(String[] args) {
numRacks = Integer.parseInt(args[i]);
} else if (args[i].equals("-r")) {
if (++i >= args.length || args[i].startsWith("-")) {
printUsageExit("Missing replicaiton factor");
printUsageExit("Missing replication factor");
}
replication = Integer.parseInt(args[i]);
} else if (args[i].equals("-d")) {
@ -125,6 +128,14 @@ public static void main(String[] args) {
dataNodeDirs = args[i];
} else if (args[i].equals("-simulated")) {
SimulatedFSDataset.setFactory(conf);
if ((i+1) < args.length && !args[i+1].startsWith("-")) {
simulatedCapacityPerDn = Long.parseLong(args[++i]);
}
} else if (args[i].equals("-bpid")) {
if (++i >= args.length || args[i].startsWith("-")) {
printUsageExit("Missing blockpoolid parameter");
}
bpid = args[i];
} else if (args[i].equals("-inject")) {
if (!FsDatasetSpi.Factory.getFactory(conf).isSimulated()) {
System.out.print("-inject is valid only for simulated");
@ -153,6 +164,9 @@ public static void main(String[] args) {
printUsageExit("Replication must be less than or equal to numDataNodes");
}
if (bpid == null) {
printUsageExit("BlockPoolId must be provided");
}
String nameNodeAdr = FileSystem.getDefaultUri(conf).getAuthority();
if (nameNodeAdr == null) {
System.out.println("No name node address and port in config");
@ -162,9 +176,14 @@ public static void main(String[] args) {
System.out.println("Starting " + numDataNodes +
(simulated ? " Simulated " : " ") +
" Data Nodes that will connect to Name Node at " + nameNodeAdr);
System.setProperty("test.build.data", dataNodeDirs);
long simulatedCapacities[] = new long[numDataNodes];
for (int i = 0; i < numDataNodes; ++i) {
simulatedCapacities[i] = simulatedCapacityPerDn;
}
MiniDFSCluster mc = new MiniDFSCluster();
try {
mc.formatDataNodeDirs();
@ -182,13 +201,12 @@ public static void main(String[] args) {
//rack4DataNode[i] = racks[i%numRacks];
rack4DataNode[i] = rackPrefix + "-" + i%numRacks;
System.out.println("Data Node " + i + " using " + rack4DataNode[i]);
}
}
try {
mc.startDataNodes(conf, numDataNodes, true, StartupOption.REGULAR,
rack4DataNode, null, null, false, checkDataNodeAddrConfig);
rack4DataNode, null, simulatedCapacities, false, checkDataNodeAddrConfig);
Thread.sleep(10*1000); // Give the DN some time to connect to NN and init storage directories.
if (inject) {
long blockSize = 10;
System.out.println("Injecting " + numBlocksPerDNtoInject +
@ -203,7 +221,7 @@ public static void main(String[] args) {
}
for (int i = 1; i <= replication; ++i) {
// inject blocks for dn_i into dn_i and replica in dn_i's neighbors
mc.injectBlocks((i_dn + i- 1)% numDataNodes, Arrays.asList(blocks));
mc.injectBlocks((i_dn + i- 1)% numDataNodes, Arrays.asList(blocks), bpid);
System.out.println("Injecting blocks of dn " + i_dn + " into dn" +
((i_dn + i- 1)% numDataNodes));
}

View File

@ -157,6 +157,7 @@ public static class Builder {
private boolean checkExitOnShutdown = true;
private boolean checkDataNodeAddrConfig = false;
private boolean checkDataNodeHostConfig = false;
private Configuration[] dnConfOverlays;
public Builder(Configuration conf) {
this.conf = conf;
@ -333,6 +334,19 @@ public Builder nnTopology(MiniDFSNNTopology topology) {
return this;
}
/**
* Default: null
*
* An array of {@link Configuration} objects that will overlay the
* global MiniDFSCluster Configuration for the corresponding DataNode.
*
* Useful for setting specific per-DataNode configuration parameters.
*/
public Builder dataNodeConfOverlays(Configuration[] dnConfOverlays) {
this.dnConfOverlays = dnConfOverlays;
return this;
}
/**
* Construct the actual MiniDFSCluster
*/
@ -375,7 +389,8 @@ protected MiniDFSCluster(Builder builder) throws IOException {
builder.nnTopology,
builder.checkExitOnShutdown,
builder.checkDataNodeAddrConfig,
builder.checkDataNodeHostConfig);
builder.checkDataNodeHostConfig,
builder.dnConfOverlays);
}
public class DataNodeProperties {
@ -625,7 +640,7 @@ public MiniDFSCluster(int nameNodePort,
manageNameDfsDirs, true, manageDataDfsDirs, manageDataDfsDirs,
operation, null, racks, hosts,
simulatedCapacities, null, true, false,
MiniDFSNNTopology.simpleSingleNN(nameNodePort, 0), true, false, false);
MiniDFSNNTopology.simpleSingleNN(nameNodePort, 0), true, false, false, null);
}
private void initMiniDFSCluster(
@ -638,7 +653,8 @@ private void initMiniDFSCluster(
boolean waitSafeMode, boolean setupHostsFile,
MiniDFSNNTopology nnTopology, boolean checkExitOnShutdown,
boolean checkDataNodeAddrConfig,
boolean checkDataNodeHostConfig)
boolean checkDataNodeHostConfig,
Configuration[] dnConfOverlays)
throws IOException {
ExitUtil.disableSystemExit();
@ -703,7 +719,7 @@ private void initMiniDFSCluster(
startDataNodes(conf, numDataNodes, storageType, manageDataDfsDirs,
dnStartOpt != null ? dnStartOpt : startOpt,
racks, hosts, simulatedCapacities, setupHostsFile,
checkDataNodeAddrConfig, checkDataNodeHostConfig);
checkDataNodeAddrConfig, checkDataNodeHostConfig, dnConfOverlays);
waitClusterUp();
//make sure ProxyUsers uses the latest conf
ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
@ -1110,7 +1126,7 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes,
long[] simulatedCapacities,
boolean setupHostsFile) throws IOException {
startDataNodes(conf, numDataNodes, StorageType.DEFAULT, manageDfsDirs, operation, racks, hosts,
simulatedCapacities, setupHostsFile, false, false);
simulatedCapacities, setupHostsFile, false, false, null);
}
/**
@ -1124,7 +1140,7 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes,
boolean setupHostsFile,
boolean checkDataNodeAddrConfig) throws IOException {
startDataNodes(conf, numDataNodes, StorageType.DEFAULT, manageDfsDirs, operation, racks, hosts,
simulatedCapacities, setupHostsFile, checkDataNodeAddrConfig, false);
simulatedCapacities, setupHostsFile, checkDataNodeAddrConfig, false, null);
}
/**
@ -1151,7 +1167,8 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes,
* @param setupHostsFile add new nodes to dfs hosts files
* @param checkDataNodeAddrConfig if true, only set DataNode port addresses if not already set in config
* @param checkDataNodeHostConfig if true, only set DataNode hostname key if not already set in config
*
* @param dnConfOverlays An array of {@link Configuration} objects that will overlay the
* global MiniDFSCluster Configuration for the corresponding DataNode.
* @throws IllegalStateException if NameNode has been shutdown
*/
public synchronized void startDataNodes(Configuration conf, int numDataNodes,
@ -1160,7 +1177,8 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes,
long[] simulatedCapacities,
boolean setupHostsFile,
boolean checkDataNodeAddrConfig,
boolean checkDataNodeHostConfig) throws IOException {
boolean checkDataNodeHostConfig,
Configuration[] dnConfOverlays) throws IOException {
if (operation == StartupOption.RECOVER) {
return;
}
@ -1200,6 +1218,13 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes,
+ simulatedCapacities.length
+ "] is less than the number of datanodes [" + numDataNodes + "].");
}
if (dnConfOverlays != null
&& numDataNodes > dnConfOverlays.length) {
throw new IllegalArgumentException( "The length of dnConfOverlays ["
+ dnConfOverlays.length
+ "] is less than the number of datanodes [" + numDataNodes + "].");
}
String [] dnArgs = (operation == null ||
operation != StartupOption.ROLLBACK) ?
@ -1208,6 +1233,9 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes,
for (int i = curDatanodesNum; i < curDatanodesNum+numDataNodes; i++) {
Configuration dnConf = new HdfsConfiguration(conf);
if (dnConfOverlays != null) {
dnConf.addResource(dnConfOverlays[i]);
}
// Set up datanode address
setupDatanodeAddress(dnConf, setupHostsFile, checkDataNodeAddrConfig);
if (manageDfsDirs) {
@ -2057,17 +2085,19 @@ public List<Map<DatanodeStorage, BlockListAsLongs>> getAllBlockReports(String bp
return result;
}
/**
* This method is valid only if the data nodes have simulated data
* @param dataNodeIndex - data node i which to inject - the index is same as for getDataNodes()
* @param blocksToInject - the blocks
* @param bpid - (optional) the block pool id to use for injecting blocks.
* If not supplied then it is queried from the in-process NameNode.
* @throws IOException
* if not simulatedFSDataset
* if any of blocks already exist in the data node
*
*/
public void injectBlocks(int dataNodeIndex, Iterable<Block> blocksToInject) throws IOException {
public void injectBlocks(int dataNodeIndex,
Iterable<Block> blocksToInject, String bpid) throws IOException {
if (dataNodeIndex < 0 || dataNodeIndex > dataNodes.size()) {
throw new IndexOutOfBoundsException();
}
@ -2076,7 +2106,9 @@ public void injectBlocks(int dataNodeIndex, Iterable<Block> blocksToInject) thro
if (!(dataSet instanceof SimulatedFSDataset)) {
throw new IOException("injectBlocks is valid only for SimilatedFSDataset");
}
String bpid = getNamesystem().getBlockPoolId();
if (bpid == null) {
bpid = getNamesystem().getBlockPoolId();
}
SimulatedFSDataset sdataset = (SimulatedFSDataset) dataSet;
sdataset.injectBlocks(bpid, blocksToInject);
dataNodes.get(dataNodeIndex).datanode.scheduleAllBlockReport(0);
@ -2101,25 +2133,6 @@ public void injectBlocks(int nameNodeIndex, int dataNodeIndex,
dataNodes.get(dataNodeIndex).datanode.scheduleAllBlockReport(0);
}
/**
* This method is valid only if the data nodes have simulated data
* @param blocksToInject - blocksToInject[] is indexed in the same order as the list
* of datanodes returned by getDataNodes()
* @throws IOException
* if not simulatedFSDataset
* if any of blocks already exist in the data nodes
* Note the rest of the blocks are not injected.
*/
public void injectBlocks(Iterable<Block>[] blocksToInject)
throws IOException {
if (blocksToInject.length > dataNodes.size()) {
throw new IndexOutOfBoundsException();
}
for (int i = 0; i < blocksToInject.length; ++i) {
injectBlocks(i, blocksToInject[i]);
}
}
/**
* Set the softLimit and hardLimit of client lease periods
*/
@ -2166,11 +2179,13 @@ public String getDataDirectory() {
* @return the base directory for this instance.
*/
protected String determineDfsBaseDir() {
String dfsdir = conf.get(HDFS_MINIDFS_BASEDIR, null);
if (dfsdir == null) {
dfsdir = getBaseDirectory();
if (conf != null) {
final String dfsdir = conf.get(HDFS_MINIDFS_BASEDIR, null);
if (dfsdir != null) {
return dfsdir;
}
}
return dfsdir;
return getBaseDirectory();
}
/**

View File

@ -210,7 +210,8 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes,
long[] simulatedCapacities,
boolean setupHostsFile,
boolean checkDataNodeAddrConfig,
boolean checkDataNodeHostConfig) throws IOException {
boolean checkDataNodeHostConfig,
Configuration[] dnConfOverlays) throws IOException {
startDataNodes(conf, numDataNodes, storageType, manageDfsDirs, operation, racks,
NODE_GROUPS, hosts, simulatedCapacities, setupHostsFile,
checkDataNodeAddrConfig, checkDataNodeHostConfig);

View File

@ -168,7 +168,7 @@ public void testInjection() throws IOException {
// Insert all the blocks in the first data node
LOG.info("Inserting " + uniqueBlocks.size() + " blocks");
cluster.injectBlocks(0, uniqueBlocks);
cluster.injectBlocks(0, uniqueBlocks, null);
dfsClient = new DFSClient(new InetSocketAddress("localhost",
cluster.getNameNodePort()),

View File

@ -209,7 +209,7 @@ private void testUnevenDistribution(Configuration conf,
ClientProtocol.class).getProxy();
for(int i = 0; i < blocksDN.length; i++)
cluster.injectBlocks(i, Arrays.asList(blocksDN[i]));
cluster.injectBlocks(i, Arrays.asList(blocksDN[i]), null);
final long totalCapacity = sum(capacities);
runBalancer(conf, totalUsedSpace, totalCapacity);

View File

@ -34,6 +34,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
@ -96,6 +97,11 @@ public static void setFactory(Configuration conf) {
public static final long DEFAULT_CAPACITY = 2L<<40; // 1 terabyte
public static final byte DEFAULT_DATABYTE = 9;
public static final String CONFIG_PROPERTY_STATE =
"dfs.datanode.simulateddatastorage.state";
private static final DatanodeStorage.State DEFAULT_STATE =
DatanodeStorage.State.NORMAL;
static final byte[] nullCrcFileData;
static {
DataChecksum checksum = DataChecksum.newDataChecksum(
@ -325,9 +331,9 @@ void free(long amount) {
private static class SimulatedStorage {
private Map<String, SimulatedBPStorage> map =
new HashMap<String, SimulatedBPStorage>();
private final String storageUuid = "SimulatedStroage-" + DatanodeStorage.generateUuid();
private final long capacity; // in bytes
private final DatanodeStorage dnStorage;
synchronized long getFree() {
return capacity - getUsed();
@ -365,8 +371,11 @@ synchronized void free(String bpid, long amount) throws IOException {
getBPStorage(bpid).free(amount);
}
SimulatedStorage(long cap) {
SimulatedStorage(long cap, DatanodeStorage.State state) {
capacity = cap;
dnStorage = new DatanodeStorage(
"SimulatedStorage-" + DatanodeStorage.generateUuid(),
state, StorageType.DEFAULT);
}
synchronized void addBlockPool(String bpid) {
@ -390,11 +399,15 @@ private SimulatedBPStorage getBPStorage(String bpid) throws IOException {
}
String getStorageUuid() {
return storageUuid;
return dnStorage.getStorageID();
}
DatanodeStorage getDnStorage() {
return dnStorage;
}
synchronized StorageReport getStorageReport(String bpid) {
return new StorageReport(new DatanodeStorage(getStorageUuid()),
return new StorageReport(dnStorage,
false, getCapacity(), getUsed(), getFree(),
map.get(bpid).getUsed());
}
@ -417,7 +430,8 @@ public SimulatedFSDataset(DataStorage storage, Configuration conf) {
registerMBean(datanodeUuid);
this.storage = new SimulatedStorage(
conf.getLong(CONFIG_PROPERTY_CAPACITY, DEFAULT_CAPACITY));
conf.getLong(CONFIG_PROPERTY_CAPACITY, DEFAULT_CAPACITY),
conf.getEnum(CONFIG_PROPERTY_STATE, DEFAULT_STATE));
}
public synchronized void injectBlocks(String bpid,
@ -488,7 +502,7 @@ synchronized BlockListAsLongs getBlockReport(String bpid) {
@Override
public synchronized Map<DatanodeStorage, BlockListAsLongs> getBlockReports(
String bpid) {
return Collections.singletonMap(new DatanodeStorage(storage.storageUuid), getBlockReport(bpid));
return Collections.singletonMap(storage.getDnStorage(), getBlockReport(bpid));
}
@Override // FsDatasetSpi

View File

@ -0,0 +1,137 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.*;
import org.apache.hadoop.hdfs.protocol.*;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
/**
* This test verifies NameNode behavior when it gets unexpected block reports
* from DataNodes. The same block is reported by two different storages on
* the same DataNode. Excess replicas on the same DN should be ignored by the NN.
*/
public class TestBlockHasMultipleReplicasOnSameDN {
public static final Log LOG = LogFactory.getLog(TestBlockHasMultipleReplicasOnSameDN.class);
private static short NUM_DATANODES = 2;
private static final int BLOCK_SIZE = 1024;
private static final long NUM_BLOCKS = 5;
private static final long seed = 0x1BADF00DL;
private Configuration conf;
private MiniDFSCluster cluster;
private DistributedFileSystem fs;
private DFSClient client;
private String bpid;
@Before
public void startUpCluster() throws IOException {
conf = new HdfsConfiguration();
cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(NUM_DATANODES)
.build();
fs = cluster.getFileSystem();
client = fs.getClient();
bpid = cluster.getNamesystem().getBlockPoolId();
}
@After
public void shutDownCluster() throws IOException {
if (cluster != null) {
fs.close();
cluster.shutdown();
cluster = null;
}
}
private String makeFileName(String prefix) {
return "/" + prefix + ".dat";
}
/**
* Verify NameNode behavior when a given DN reports multiple replicas
* of a given block.
*/
@Test
public void testBlockHasMultipleReplicasOnSameDN() throws IOException {
String filename = makeFileName(GenericTestUtils.getMethodName());
Path filePath = new Path(filename);
// Write out a file with a few blocks.
DFSTestUtil.createFile(fs, filePath, BLOCK_SIZE, BLOCK_SIZE * NUM_BLOCKS,
BLOCK_SIZE, NUM_DATANODES, seed);
// Get the block list for the file with the block locations.
LocatedBlocks locatedBlocks = client.getLocatedBlocks(
filePath.toString(), 0, BLOCK_SIZE * NUM_BLOCKS);
// Generate a fake block report from one of the DataNodes, such
// that it reports one copy of each block on either storage.
DataNode dn = cluster.getDataNodes().get(0);
DatanodeRegistration dnReg = dn.getDNRegistrationForBP(bpid);
StorageBlockReport reports[] =
new StorageBlockReport[MiniDFSCluster.DIRS_PER_DATANODE];
ArrayList<Block> blocks = new ArrayList<Block>();
for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) {
blocks.add(locatedBlock.getBlock().getLocalBlock());
}
for (int i = 0; i < MiniDFSCluster.DIRS_PER_DATANODE; ++i) {
BlockListAsLongs bll = new BlockListAsLongs(blocks, null);
FsVolumeSpi v = dn.getFSDataset().getVolumes().get(i);
DatanodeStorage dns = new DatanodeStorage(v.getStorageID());
reports[i] = new StorageBlockReport(dns, bll.getBlockListAsLongs());
}
// Should not assert!
cluster.getNameNodeRpc().blockReport(dnReg, bpid, reports);
// Get the block locations once again.
locatedBlocks = client.getLocatedBlocks(filename, 0, BLOCK_SIZE * NUM_BLOCKS);
// Make sure that each block has two replicas, one on each DataNode.
for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) {
DatanodeInfo[] locations = locatedBlock.getLocations();
assertThat(locations.length, is((int) NUM_DATANODES));
assertThat(locations[0].getDatanodeUuid(), not(locations[1].getDatanodeUuid()));
}
}
}

View File

@ -0,0 +1,270 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import static org.hamcrest.CoreMatchers.*;
import static org.junit.Assert.*;
import static org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State.*;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collections;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import com.google.common.collect.Iterables;
/**
* Test proper {@link BlockManager} replication counting for {@link DatanodeStorage}s
* with {@link DatanodeStorage.State#READ_ONLY_SHARED READ_ONLY} state.
*
* Uses {@link SimulatedFSDataset} to inject read-only replicas into a DataNode.
*/
public class TestReadOnlySharedStorage {
public static final Log LOG = LogFactory.getLog(TestReadOnlySharedStorage.class);
private static short NUM_DATANODES = 3;
private static int RO_NODE_INDEX = 0;
private static final int BLOCK_SIZE = 1024;
private static final long seed = 0x1BADF00DL;
private static final Path PATH = new Path("/" + TestReadOnlySharedStorage.class.getName() + ".dat");
private static final int RETRIES = 10;
private Configuration conf;
private MiniDFSCluster cluster;
private DistributedFileSystem fs;
private DFSClient client;
private BlockManager blockManager;
private DatanodeManager datanodeManager;
private DatanodeInfo normalDataNode;
private DatanodeInfo readOnlyDataNode;
private Block block;
private ExtendedBlock extendedBlock;
/**
* Setup a {@link MiniDFSCluster}.
* Create a block with both {@link State#NORMAL} and {@link State#READ_ONLY_SHARED} replicas.
*/
@Before
public void setup() throws IOException, InterruptedException {
conf = new HdfsConfiguration();
SimulatedFSDataset.setFactory(conf);
Configuration[] overlays = new Configuration[NUM_DATANODES];
for (int i = 0; i < overlays.length; i++) {
overlays[i] = new Configuration();
if (i == RO_NODE_INDEX) {
overlays[i].setEnum(SimulatedFSDataset.CONFIG_PROPERTY_STATE,
i == RO_NODE_INDEX
? READ_ONLY_SHARED
: NORMAL);
}
}
cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(NUM_DATANODES)
.dataNodeConfOverlays(overlays)
.build();
fs = cluster.getFileSystem();
blockManager = cluster.getNameNode().getNamesystem().getBlockManager();
datanodeManager = blockManager.getDatanodeManager();
client = new DFSClient(new InetSocketAddress("localhost", cluster.getNameNodePort()),
cluster.getConfiguration(0));
for (int i = 0; i < NUM_DATANODES; i++) {
DataNode dataNode = cluster.getDataNodes().get(i);
validateStorageState(
BlockManagerTestUtil.getStorageReportsForDatanode(
datanodeManager.getDatanode(dataNode.getDatanodeId())),
i == RO_NODE_INDEX
? READ_ONLY_SHARED
: NORMAL);
}
// Create a 1 block file
DFSTestUtil.createFile(fs, PATH, BLOCK_SIZE, BLOCK_SIZE,
BLOCK_SIZE, (short) 1, seed);
LocatedBlock locatedBlock = getLocatedBlock();
extendedBlock = locatedBlock.getBlock();
block = extendedBlock.getLocalBlock();
assertThat(locatedBlock.getLocations().length, is(1));
normalDataNode = locatedBlock.getLocations()[0];
readOnlyDataNode = datanodeManager.getDatanode(cluster.getDataNodes().get(RO_NODE_INDEX).getDatanodeId());
assertThat(normalDataNode, is(not(readOnlyDataNode)));
validateNumberReplicas(1);
// Inject the block into the datanode with READ_ONLY_SHARED storage
cluster.injectBlocks(0, RO_NODE_INDEX, Collections.singleton(block));
// There should now be 2 *locations* for the block
// Must wait until the NameNode has processed the block report for the injected blocks
waitForLocations(2);
}
@After
public void tearDown() throws IOException {
fs.delete(PATH, false);
if (cluster != null) {
fs.close();
cluster.shutdown();
cluster = null;
}
}
private void waitForLocations(int locations) throws IOException, InterruptedException {
for (int tries = 0; tries < RETRIES; )
try {
LocatedBlock locatedBlock = getLocatedBlock();
assertThat(locatedBlock.getLocations().length, is(locations));
break;
} catch (AssertionError e) {
if (++tries < RETRIES) {
Thread.sleep(1000);
} else {
throw e;
}
}
}
private LocatedBlock getLocatedBlock() throws IOException {
LocatedBlocks locatedBlocks = client.getLocatedBlocks(PATH.toString(), 0, BLOCK_SIZE);
assertThat(locatedBlocks.getLocatedBlocks().size(), is(1));
return Iterables.getOnlyElement(locatedBlocks.getLocatedBlocks());
}
private void validateStorageState(StorageReport[] storageReports, DatanodeStorage.State state) {
for (StorageReport storageReport : storageReports) {
DatanodeStorage storage = storageReport.getStorage();
assertThat(storage.getState(), is(state));
}
}
private void validateNumberReplicas(int expectedReplicas) throws IOException {
NumberReplicas numberReplicas = blockManager.countNodes(block);
assertThat(numberReplicas.liveReplicas(), is(expectedReplicas));
assertThat(numberReplicas.excessReplicas(), is(0));
assertThat(numberReplicas.corruptReplicas(), is(0));
assertThat(numberReplicas.decommissionedReplicas(), is(0));
assertThat(numberReplicas.replicasOnStaleNodes(), is(0));
BlockManagerTestUtil.updateState(blockManager);
assertThat(blockManager.getUnderReplicatedBlocksCount(), is(0L));
assertThat(blockManager.getExcessBlocksCount(), is(0L));
}
/**
* Verify that <tt>READ_ONLY_SHARED</tt> replicas are <i>not</i> counted towards the overall
* replication count, but <i>are</i> included as replica locations returned to clients for reads.
*/
@Test
public void testReplicaCounting() throws Exception {
// There should only be 1 *replica* (the READ_ONLY_SHARED doesn't count)
validateNumberReplicas(1);
fs.setReplication(PATH, (short) 2);
// There should now be 3 *locations* for the block, and 2 *replicas*
waitForLocations(3);
validateNumberReplicas(2);
}
/**
* Verify that the NameNode is able to still use <tt>READ_ONLY_SHARED</tt> replicas even
* when the single NORMAL replica is offline (and the effective replication count is 0).
*/
@Test
public void testNormalReplicaOffline() throws Exception {
// Stop the datanode hosting the NORMAL replica
cluster.stopDataNode(normalDataNode.getXferAddr());
// Force NameNode to detect that the datanode is down
BlockManagerTestUtil.noticeDeadDatanode(
cluster.getNameNode(), normalDataNode.getXferAddr());
// The live replica count should now be zero (since the NORMAL replica is offline)
NumberReplicas numberReplicas = blockManager.countNodes(block);
assertThat(numberReplicas.liveReplicas(), is(0));
// The block should be reported as under-replicated
BlockManagerTestUtil.updateState(blockManager);
assertThat(blockManager.getUnderReplicatedBlocksCount(), is(1L));
// The BlockManager should be able to heal the replication count back to 1
// by triggering an inter-datanode replication from one of the READ_ONLY_SHARED replicas
BlockManagerTestUtil.computeAllPendingWork(blockManager);
DFSTestUtil.waitForReplication(cluster, extendedBlock, 1, 1, 0);
// There should now be 2 *locations* for the block, and 1 *replica*
assertThat(getLocatedBlock().getLocations().length, is(2));
validateNumberReplicas(1);
}
/**
* Verify that corrupt <tt>READ_ONLY_SHARED</tt> replicas aren't counted
* towards the corrupt replicas total.
*/
@Test
public void testReadOnlyReplicaCorrupt() throws Exception {
// "Corrupt" a READ_ONLY_SHARED replica by reporting it as a bad replica
client.reportBadBlocks(new LocatedBlock[] {
new LocatedBlock(extendedBlock, new DatanodeInfo[] { readOnlyDataNode })
});
// There should now be only 1 *location* for the block as the READ_ONLY_SHARED is corrupt
waitForLocations(1);
// However, the corrupt READ_ONLY_SHARED replica should *not* affect the overall corrupt replicas count
NumberReplicas numberReplicas = blockManager.countNodes(block);
assertThat(numberReplicas.corruptReplicas(), is(0));
}
}

View File

@ -57,7 +57,7 @@ public class CreateEditsLog {
GenerationStamp.LAST_RESERVED_STAMP;
static void addFiles(FSEditLog editLog, int numFiles, short replication,
int blocksPerFile, long startingBlockId,
int blocksPerFile, long startingBlockId, long blockSize,
FileNameGenerator nameGenerator) {
PermissionStatus p = new PermissionStatus("joeDoe", "people",
@ -66,7 +66,6 @@ static void addFiles(FSEditLog editLog, int numFiles, short replication,
INodeDirectory dirInode = new INodeDirectory(inodeId.nextValue(), null, p,
0L);
editLog.logMkDir(BASE_PATH, dirInode);
long blockSize = 10;
BlockInfo[] blocks = new BlockInfo[blocksPerFile];
for (int iB = 0; iB < blocksPerFile; ++iB) {
blocks[iB] =
@ -144,6 +143,7 @@ public static void main(String[] args)
int numFiles = 0;
short replication = 1;
int numBlocksPerFile = 0;
long blockSize = 10;
if (args.length == 0) {
printUsageExit();
@ -164,10 +164,16 @@ public static void main(String[] args)
if (numFiles <=0 || numBlocksPerFile <= 0) {
printUsageExit("numFiles and numBlocksPerFile most be greater than 0");
}
} else if (args[i].equals("-l")) {
if (i + 1 >= args.length) {
printUsageExit(
"Missing block length");
}
blockSize = Long.parseLong(args[++i]);
} else if (args[i].equals("-r") || args[i+1].startsWith("-")) {
if (i + 1 >= args.length) {
printUsageExit(
"Missing num files, starting block and/or number of blocks");
"Missing replication factor");
}
replication = Short.parseShort(args[++i]);
} else if (args[i].equals("-d")) {
@ -202,7 +208,7 @@ public static void main(String[] args)
FSEditLog editLog = FSImageTestUtil.createStandaloneEditLog(editsLogDir);
editLog.openForWrite();
addFiles(editLog, numFiles, replication, numBlocksPerFile, startingBlockId,
nameGenerator);
blockSize, nameGenerator);
editLog.logSync();
editLog.close();
}

View File

@ -29,6 +29,9 @@ Release 2.5.0 - UNRELEASED
BUG FIXES
YARN-1718. Fix a couple isTerminals in Fair Scheduler queue placement rules
(Sandy Ryza)
Release 2.4.0 - UNRELEASED
INCOMPATIBLE CHANGES
@ -206,6 +209,9 @@ Release 2.4.0 - UNRELEASED
available across RM failover by making using of a remote
configuration-provider. (Xuan Gong via vinodkv)
YARN-1171. Add default queue properties to Fair Scheduler documentation
(Naren Koneru via Sandy Ryza)
OPTIMIZATIONS
BUG FIXES
@ -309,6 +315,9 @@ Release 2.4.0 - UNRELEASED
application history store in the transition to the final state. (Contributed
by Zhijie Shen)
YARN-713. Fixed ResourceManager to not crash while building tokens when DNS
issues happen transmittently. (Jian He via vinodkv)
Release 2.3.1 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -464,9 +464,11 @@ public AllocateResponse allocate(AllocateRequest request)
blacklistAdditions, blacklistRemovals);
RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId);
AllocateResponse allocateResponse =
recordFactory.newRecordInstance(AllocateResponse.class);
if (!allocation.getContainers().isEmpty()) {
allocateResponse.setNMTokens(allocation.getNMTokens());
}
// update the response with the deltas of node status changes
List<RMNode> updatedNodes = new ArrayList<RMNode>();
@ -505,12 +507,6 @@ public AllocateResponse allocate(AllocateRequest request)
allocateResponse
.setPreemptionMessage(generatePreemptionMessage(allocation));
// Adding NMTokens for allocated containers.
if (!allocation.getContainers().isEmpty()) {
allocateResponse.setNMTokens(rmContext.getNMTokenSecretManager()
.createAndGetNMTokens(app.getUser(), appAttemptId,
allocation.getContainers()));
}
/*
* As we are updating the response inside the lock object so we don't
* need to worry about unregister call occurring in between (which

View File

@ -78,6 +78,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFinishedAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent;
@ -202,7 +203,8 @@ RMAppAttemptEventType.RECOVER, new AttemptRecoveredTransition())
// Transitions from SCHEDULED State
.addTransition(RMAppAttemptState.SCHEDULED,
RMAppAttemptState.ALLOCATED_SAVING,
EnumSet.of(RMAppAttemptState.ALLOCATED_SAVING,
RMAppAttemptState.SCHEDULED),
RMAppAttemptEventType.CONTAINER_ALLOCATED,
new AMContainerAllocatedTransition())
.addTransition(RMAppAttemptState.SCHEDULED, RMAppAttemptState.FINAL_SAVING,
@ -769,8 +771,9 @@ public void transition(RMAppAttemptImpl appAttempt,
private static final List<ContainerId> EMPTY_CONTAINER_RELEASE_LIST =
new ArrayList<ContainerId>();
private static final List<ResourceRequest> EMPTY_CONTAINER_REQUEST_LIST =
new ArrayList<ResourceRequest>();
new ArrayList<ResourceRequest>();
private static final class ScheduleTransition
implements
@ -803,29 +806,57 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
}
}
private static final class AMContainerAllocatedTransition
extends BaseTransition {
private static final class AMContainerAllocatedTransition
implements
MultipleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent, RMAppAttemptState> {
@Override
public void transition(RMAppAttemptImpl appAttempt,
RMAppAttemptEvent event) {
public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
RMAppAttemptEvent event) {
// Acquire the AM container from the scheduler.
Allocation amContainerAllocation = appAttempt.scheduler.allocate(
appAttempt.applicationAttemptId, EMPTY_CONTAINER_REQUEST_LIST,
EMPTY_CONTAINER_RELEASE_LIST, null, null);
Allocation amContainerAllocation =
appAttempt.scheduler.allocate(appAttempt.applicationAttemptId,
EMPTY_CONTAINER_REQUEST_LIST, EMPTY_CONTAINER_RELEASE_LIST, null,
null);
// There must be at least one container allocated, because a
// CONTAINER_ALLOCATED is emitted after an RMContainer is constructed,
// and is put in SchedulerApplication#newlyAllocatedContainers. Then,
// YarnScheduler#allocate will fetch it.
assert amContainerAllocation.getContainers().size() != 0;
// and is put in SchedulerApplication#newlyAllocatedContainers.
// Note that YarnScheduler#allocate is not guaranteed to be able to
// fetch it since container may not be fetchable for some reason like
// DNS unavailable causing container token not generated. As such, we
// return to the previous state and keep retry until am container is
// fetched.
if (amContainerAllocation.getContainers().size() == 0) {
appAttempt.retryFetchingAMContainer(appAttempt);
return RMAppAttemptState.SCHEDULED;
}
// Set the masterContainer
appAttempt.setMasterContainer(amContainerAllocation.getContainers().get(
0));
appAttempt.setMasterContainer(amContainerAllocation.getContainers()
.get(0));
appAttempt.getSubmissionContext().setResource(
appAttempt.getMasterContainer().getResource());
appAttempt.getMasterContainer().getResource());
appAttempt.storeAttempt();
return RMAppAttemptState.ALLOCATED_SAVING;
}
}
private void retryFetchingAMContainer(final RMAppAttemptImpl appAttempt) {
// start a new thread so that we are not blocking main dispatcher thread.
new Thread() {
@Override
public void run() {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting to resend the"
+ " ContainerAllocated Event.");
}
appAttempt.eventHandler.handle(new RMAppAttemptContainerAllocatedEvent(
appAttempt.applicationAttemptId));
}
}.start();
}
private static final class AttemptStoredTransition extends BaseTransition {
@Override
public void transition(RMAppAttemptImpl appAttempt,

View File

@ -25,16 +25,7 @@
public class RMAppAttemptContainerAllocatedEvent extends RMAppAttemptEvent {
private final Container container;
public RMAppAttemptContainerAllocatedEvent(ApplicationAttemptId appAttemptId,
Container container) {
public RMAppAttemptContainerAllocatedEvent(ApplicationAttemptId appAttemptId) {
super(appAttemptId, RMAppAttemptEventType.CONTAINER_ALLOCATED);
this.container = container;
}
public Container getContainer() {
return this.container;
}
}

View File

@ -340,7 +340,7 @@ private static final class ContainerStartedTransition extends
@Override
public void transition(RMContainerImpl container, RMContainerEvent event) {
container.eventHandler.handle(new RMAppAttemptContainerAllocatedEvent(
container.appAttemptId, container.container));
container.appAttemptId));
}
}

View File

@ -31,11 +31,16 @@
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.util.resource.Resources;
public abstract class AbstractYarnScheduler implements ResourceScheduler {
protected RMContext rmContext;
protected Map<ApplicationId, SchedulerApplication> applications;
protected final static List<Container> EMPTY_CONTAINER_LIST =
new ArrayList<Container>();
protected static final Allocation EMPTY_ALLOCATION = new Allocation(
EMPTY_CONTAINER_LIST, Resources.createResource(0), null, null, null);
public synchronized List<Container> getTransferredContainers(
ApplicationAttemptId currentAttempt) {

View File

@ -22,10 +22,9 @@
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
public class Allocation {
@ -34,24 +33,24 @@ public class Allocation {
final Set<ContainerId> strictContainers;
final Set<ContainerId> fungibleContainers;
final List<ResourceRequest> fungibleResources;
public Allocation(List<Container> containers, Resource resourceLimit) {
this(containers, resourceLimit, null, null, null);
}
public Allocation(List<Container> containers, Resource resourceLimit,
Set<ContainerId> strictContainers) {
this(containers, resourceLimit, strictContainers, null, null);
}
final List<NMToken> nmTokens;
public Allocation(List<Container> containers, Resource resourceLimit,
Set<ContainerId> strictContainers, Set<ContainerId> fungibleContainers,
List<ResourceRequest> fungibleResources) {
this(containers, resourceLimit,strictContainers, fungibleContainers,
fungibleResources, null);
}
public Allocation(List<Container> containers, Resource resourceLimit,
Set<ContainerId> strictContainers, Set<ContainerId> fungibleContainers,
List<ResourceRequest> fungibleResources, List<NMToken> nmTokens) {
this.containers = containers;
this.resourceLimit = resourceLimit;
this.strictContainers = strictContainers;
this.fungibleContainers = fungibleContainers;
this.fungibleResources = fungibleResources;
this.nmTokens = nmTokens;
}
public List<Container> getContainers() {
@ -74,4 +73,8 @@ public List<ResourceRequest> getResourcePreemptions() {
return fungibleResources;
}
public List<NMToken> getNMTokens() {
return nmTokens;
}
}

View File

@ -20,6 +20,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@ -33,6 +34,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
@ -339,21 +341,61 @@ public Resource getCurrentConsumption() {
return currentConsumption;
}
public synchronized List<Container> pullNewlyAllocatedContainers() {
List<Container> returnContainerList = new ArrayList<Container>(
newlyAllocatedContainers.size());
for (RMContainer rmContainer : newlyAllocatedContainers) {
rmContainer.handle(new RMContainerEvent(rmContainer.getContainerId(),
RMContainerEventType.ACQUIRED));
Container container = rmContainer.getContainer();
rmContainer.getContainer().setContainerToken(
rmContext.getContainerTokenSecretManager().createContainerToken(
rmContainer.getContainerId(), container.getNodeId(), getUser(),
container.getResource()));
returnContainerList.add(rmContainer.getContainer());
public static class ContainersAndNMTokensAllocation {
List<Container> containerList;
List<NMToken> nmTokenList;
public ContainersAndNMTokensAllocation(List<Container> containerList,
List<NMToken> nmTokenList) {
this.containerList = containerList;
this.nmTokenList = nmTokenList;
}
newlyAllocatedContainers.clear();
return returnContainerList;
public List<Container> getContainerList() {
return containerList;
}
public List<NMToken> getNMTokenList() {
return nmTokenList;
}
}
// Create container token and NMToken altogether, if either of them fails for
// some reason like DNS unavailable, do not return this container and keep it
// in the newlyAllocatedContainers waiting to be refetched.
public synchronized ContainersAndNMTokensAllocation
pullNewlyAllocatedContainersAndNMTokens() {
List<Container> returnContainerList =
new ArrayList<Container>(newlyAllocatedContainers.size());
List<NMToken> nmTokens = new ArrayList<NMToken>();
for (Iterator<RMContainer> i = newlyAllocatedContainers.iterator(); i
.hasNext();) {
RMContainer rmContainer = i.next();
Container container = rmContainer.getContainer();
try {
// create container token and NMToken altogether.
container.setContainerToken(rmContext.getContainerTokenSecretManager()
.createContainerToken(container.getId(), container.getNodeId(),
getUser(), container.getResource()));
NMToken nmToken =
rmContext.getNMTokenSecretManager().createAndGetNMToken(getUser(),
getApplicationAttemptId(), container);
if (nmToken != null) {
nmTokens.add(nmToken);
}
} catch (IllegalArgumentException e) {
// DNS might be down, skip returning this container.
LOG.error(
"Error trying to assign container token to allocated container "
+ container.getId(), e);
continue;
}
returnContainerList.add(container);
i.remove();
rmContainer.handle(new RMContainerEvent(rmContainer.getContainerId(),
RMContainerEventType.ACQUIRED));
}
return new ContainersAndNMTokensAllocation(returnContainerList, nmTokens);
}
public synchronized void updateBlacklist(

View File

@ -104,9 +104,6 @@ public class CapacityScheduler extends AbstractYarnScheduler
private CSQueue root;
private final static List<Container> EMPTY_CONTAINER_LIST =
new ArrayList<Container>();
static final Comparator<CSQueue> queueComparator = new Comparator<CSQueue>() {
@Override
public int compare(CSQueue q1, CSQueue q2) {
@ -557,9 +554,6 @@ private synchronized void doneApplicationAttempt(
}
}
private static final Allocation EMPTY_ALLOCATION =
new Allocation(EMPTY_CONTAINER_LIST, Resources.createResource(0, 0));
@Override
@Lock(Lock.NoLock.class)
public Allocation allocate(ApplicationAttemptId applicationAttemptId,

View File

@ -237,9 +237,11 @@ public synchronized Allocation getAllocation(ResourceCalculator rc,
ResourceRequest rr = ResourceRequest.newInstance(
Priority.UNDEFINED, ResourceRequest.ANY,
minimumAllocation, numCont);
return new Allocation(pullNewlyAllocatedContainers(), getHeadroom(),
null, currentContPreemption,
Collections.singletonList(rr));
ContainersAndNMTokensAllocation allocation =
pullNewlyAllocatedContainersAndNMTokens();
return new Allocation(allocation.getContainerList(), getHeadroom(), null,
currentContPreemption, Collections.singletonList(rr),
allocation.getNMTokenList());
}
}

View File

@ -78,6 +78,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.ContainersAndNMTokensAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
@ -143,12 +144,6 @@ public class FairScheduler extends AbstractYarnScheduler {
// How often fair shares are re-calculated (ms)
protected long UPDATE_INTERVAL = 500;
private final static List<Container> EMPTY_CONTAINER_LIST =
new ArrayList<Container>();
private static final Allocation EMPTY_ALLOCATION =
new Allocation(EMPTY_CONTAINER_LIST, Resources.createResource(0));
// Aggregate metrics
FSQueueMetrics rootMetrics;
@ -922,9 +917,11 @@ public Allocation allocate(ApplicationAttemptId appAttemptId,
}
application.updateBlacklist(blacklistAdditions, blacklistRemovals);
return new Allocation(application.pullNewlyAllocatedContainers(),
application.getHeadroom(), preemptionContainerIds);
ContainersAndNMTokensAllocation allocation =
application.pullNewlyAllocatedContainersAndNMTokens();
return new Allocation(allocation.getContainerList(),
application.getHeadroom(), preemptionContainerIds, null, null,
allocation.getNMTokenList());
}
}

View File

@ -162,7 +162,7 @@ protected String getQueueForApp(String requestedQueue,
@Override
public boolean isTerminal() {
return create;
return false;
}
}
@ -201,7 +201,7 @@ protected String getQueueForApp(String requestedQueue, String user,
@Override
public boolean isTerminal() {
return create;
return true;
}
}

View File

@ -50,7 +50,6 @@
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
@ -80,6 +79,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.ContainersAndNMTokensAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
@ -114,9 +114,6 @@ public class FifoScheduler extends AbstractYarnScheduler implements
Configuration conf;
private final static Container[] EMPTY_CONTAINER_ARRAY = new Container[] {};
private final static List<Container> EMPTY_CONTAINER_LIST = Arrays.asList(EMPTY_CONTAINER_ARRAY);
protected Map<NodeId, FiCaSchedulerNode> nodes = new ConcurrentHashMap<NodeId, FiCaSchedulerNode>();
private boolean initialized;
@ -264,8 +261,7 @@ public Resource getMaximumResourceCapability() {
}
}
private static final Allocation EMPTY_ALLOCATION =
new Allocation(EMPTY_CONTAINER_LIST, Resources.createResource(0));
@Override
public Allocation allocate(
ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask,
@ -328,10 +324,11 @@ public Allocation allocate(
}
application.updateBlacklist(blacklistAdditions, blacklistRemovals);
return new Allocation(
application.pullNewlyAllocatedContainers(),
application.getHeadroom());
ContainersAndNMTokensAllocation allocation =
application.pullNewlyAllocatedContainersAndNMTokens();
return new Allocation(allocation.getContainerList(),
application.getHeadroom(), null, null, null,
allocation.getNMTokenList());
}
}

View File

@ -18,10 +18,8 @@
package org.apache.hadoop.yarn.server.resourcemanager.security;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
@ -177,35 +175,39 @@ public void run() {
activateNextMasterKey();
}
}
public List<NMToken> createAndGetNMTokens(String applicationSubmitter,
ApplicationAttemptId appAttemptId, List<Container> containers) {
public NMToken createAndGetNMToken(String applicationSubmitter,
ApplicationAttemptId appAttemptId, Container container) {
try {
this.readLock.lock();
List<NMToken> nmTokens = new ArrayList<NMToken>();
HashSet<NodeId> nodeSet = this.appAttemptToNodeKeyMap.get(appAttemptId);
NMToken nmToken = null;
if (nodeSet != null) {
for (Container container : containers) {
if (!nodeSet.contains(container.getNodeId())) {
if (!nodeSet.contains(container.getNodeId())) {
if (LOG.isDebugEnabled()) {
LOG.debug("Sending NMToken for nodeId : "
+ container.getNodeId().toString()
+ " for application attempt : " + appAttemptId.toString());
Token token = createNMToken(appAttemptId, container.getNodeId(),
applicationSubmitter);
NMToken nmToken =
NMToken.newInstance(container.getNodeId(), token);
nmTokens.add(nmToken);
// This will update the nmToken set.
}
Token token =
createNMToken(container.getId().getApplicationAttemptId(),
container.getNodeId(), applicationSubmitter);
nmToken = NMToken.newInstance(container.getNodeId(), token);
// The node set here is used for differentiating whether the NMToken
// has been issued for this node from the client's perspective. If
// this is an AM container, the NMToken is issued only for RM and so
// we should not update the node set.
if (container.getId().getId() != 1) {
nodeSet.add(container.getNodeId());
}
}
}
return nmTokens;
return nmToken;
} finally {
this.readLock.unlock();
}
}
public void registerApplicationAttempt(ApplicationAttemptId appAttemptId) {
try {
this.writeLock.lock();

View File

@ -598,8 +598,7 @@ private Container allocateApplicationAttempt() {
applicationAttempt.handle(
new RMAppAttemptContainerAllocatedEvent(
applicationAttempt.getAppAttemptId(),
container));
applicationAttempt.getAppAttemptId()));
assertEquals(RMAppAttemptState.ALLOCATED_SAVING,
applicationAttempt.getAppAttemptState());

View File

@ -25,20 +25,29 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.SecurityUtilTestHelper;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.RMSecretManagerService;
import org.apache.hadoop.yarn.server.resourcemanager.TestFifoScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.junit.Test;
@ -149,4 +158,92 @@ public void testContainerTokenGeneratedOnPullRequest() throws Exception {
Assert.assertNotNull(containers.get(0).getContainerToken());
rm1.stop();
}
@Test
public void testNormalContainerAllocationWhenDNSUnavailable() throws Exception{
YarnConfiguration conf = new YarnConfiguration();
MockRM rm1 = new MockRM(conf);
rm1.start();
MockNM nm1 = rm1.registerNode("unknownhost:1234", 8000);
RMApp app1 = rm1.submitApp(200);
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
// request a container.
am1.allocate("127.0.0.1", 1024, 1, new ArrayList<ContainerId>());
ContainerId containerId2 =
ContainerId.newInstance(am1.getApplicationAttemptId(), 2);
rm1.waitForState(nm1, containerId2, RMContainerState.ALLOCATED);
// acquire the container.
SecurityUtilTestHelper.setTokenServiceUseIp(true);
List<Container> containers =
am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
// not able to fetch the container;
Assert.assertEquals(0, containers.size());
SecurityUtilTestHelper.setTokenServiceUseIp(false);
containers =
am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
// should be able to fetch the container;
Assert.assertEquals(1, containers.size());
}
private volatile int numRetries = 0;
private class TestRMSecretManagerService extends RMSecretManagerService {
public TestRMSecretManagerService(Configuration conf,
RMContextImpl rmContext) {
super(conf, rmContext);
}
@Override
protected RMContainerTokenSecretManager createContainerTokenSecretManager(
Configuration conf) {
return new RMContainerTokenSecretManager(conf) {
@Override
public Token createContainerToken(ContainerId containerId,
NodeId nodeId, String appSubmitter, Resource capability) {
numRetries++;
return super.createContainerToken(containerId, nodeId, appSubmitter,
capability);
}
};
}
}
// This is to test fetching AM container will be retried, if AM container is
// not fetchable since DNS is unavailable causing container token/NMtoken
// creation failure.
@Test(timeout = 20000)
public void testAMContainerAllocationWhenDNSUnavailable() throws Exception {
final YarnConfiguration conf = new YarnConfiguration();
MockRM rm1 = new MockRM(conf) {
@Override
protected RMSecretManagerService createRMSecretManagerService() {
return new TestRMSecretManagerService(conf, rmContext);
}
};
rm1.start();
MockNM nm1 = rm1.registerNode("unknownhost:1234", 8000);
SecurityUtilTestHelper.setTokenServiceUseIp(true);
RMApp app1 = rm1.submitApp(200);
RMAppAttempt attempt = app1.getCurrentAppAttempt();
nm1.nodeHeartbeat(true);
// fetching am container will fail, keep retrying 5 times.
while (numRetries <= 5) {
nm1.nodeHeartbeat(true);
Thread.sleep(1000);
Assert.assertEquals(RMAppAttemptState.SCHEDULED,
attempt.getAppAttemptState());
System.out.println("Waiting for am container to be allocated.");
}
SecurityUtilTestHelper.setTokenServiceUseIp(false);
rm1.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.ALLOCATED);
MockRM.launchAndRegisterAM(app1, rm1, nm1);
}
}

View File

@ -106,6 +106,17 @@ public void testTerminalRuleInMiddle() throws Exception {
parse(sb.toString());
}
@Test
public void testTerminals() throws Exception {
// Should make it through without an exception
StringBuffer sb = new StringBuffer();
sb.append("<queuePlacementPolicy>");
sb.append(" <rule name='secondaryGroupExistingQueue' create='true'/>");
sb.append(" <rule name='default' create='false'/>");
sb.append("</queuePlacementPolicy>");
parse(sb.toString());
}
private QueuePlacementPolicy parse(String str) throws Exception {
// Read and parse the allocations file.
DocumentBuilderFactory docBuilderFactory =

View File

@ -265,7 +265,15 @@ Allocation file format
its fair share before it will try to preempt containers to take resources from
other queues.
* <<A defaultQueueSchedulingPolicy element>>, which sets the default scheduling
* <<A defaultMinSharePreemptionTimeout element>>, which sets the default number
of seconds the queue is under its minimum share before it will try to preempt
containers to take resources from other queues; overriden by
minSharePreemptionTimeout element in each queue if specified.
* <<A queueMaxAppsDefault element>>, which sets the default running app limit
for queues; overriden by maxRunningApps element in each queue.
* <<A defaultQueueSchedulingPolicy element>>, which sets the default scheduling
policy for queues; overriden by the schedulingPolicy element in each queue
if specified. Defaults to "fair".