diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/Configuration.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/Configuration.java index c0d82004c0..480511efdf 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/Configuration.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/Configuration.java @@ -765,6 +765,19 @@ public void addResource(InputStream in, String name) { addResourceObject(new Resource(in, name)); } + /** + * Add a configuration resource. + * + * The properties of this resource will override properties of previously + * added resources, unless they were marked final. + * + * @param conf Configuration object from which to load properties + */ + public void addResource(Configuration conf) { + addResourceObject(new Resource(conf.getProps())); + } + + /** * Reload configuration from previously added resources. diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocket.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocket.java index e0d6b63c73..6166ba870b 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocket.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocket.java @@ -377,6 +377,22 @@ public void close() throws IOException { Thread.currentThread().interrupt(); } } + + /** + * Call shutdown(SHUT_RDWR) on the UNIX domain socket. + * + * @throws IOException + */ + public void shutdown() throws IOException { + refCount.reference(); + boolean exc = true; + try { + shutdown0(fd); + exc = false; + } finally { + unreference(exc); + } + } private native static void sendFileDescriptors0(int fd, FileDescriptor descriptors[], diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TestDomainSocket.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TestDomainSocket.java index d6d9591ddd..9fe8fae201 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TestDomainSocket.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TestDomainSocket.java @@ -34,6 +34,8 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.junit.AfterClass; import org.junit.Assert; @@ -41,7 +43,6 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; - import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.net.unix.DomainSocket.DomainChannel; @@ -727,4 +728,38 @@ public void testFdPassingPathSecurity() throws Exception { tmp.close(); } } + + @Test(timeout=180000) + public void testShutdown() throws Exception { + final AtomicInteger bytesRead = new AtomicInteger(0); + final AtomicBoolean failed = new AtomicBoolean(false); + final DomainSocket[] socks = DomainSocket.socketpair(); + Runnable reader = new Runnable() { + @Override + public void run() { + while (true) { + try { + int ret = socks[1].getInputStream().read(); + if (ret == -1) return; + bytesRead.addAndGet(1); + } catch (IOException e) { + DomainSocket.LOG.error("reader error", e); + failed.set(true); + return; + } + } + } + }; + Thread readerThread = new Thread(reader); + readerThread.start(); + socks[0].getOutputStream().write(1); + socks[0].getOutputStream().write(2); + socks[0].getOutputStream().write(3); + Assert.assertTrue(readerThread.isAlive()); + socks[0].shutdown(); + readerThread.join(); + Assert.assertFalse(failed.get()); + Assert.assertEquals(3, bytesRead.get()); + IOUtils.cleanup(null, socks); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index bd5917ee76..7e3c1f73af 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -416,6 +416,14 @@ Release 2.4.0 - UNRELEASED HDFS-5726. Fix compilation error in AbstractINodeDiff for JDK7. (jing9) + HDFS-5973. add DomainSocket#shutdown method (cmccabe) + + HDFS-5318. Support read-only and read-write paths to shared replicas. + (Eric Sirianni via Arpit Agarwal) + + HDFS-5868. Make hsync implementation pluggable on the DataNode. + (Buddy Taylor via Arpit Agarwal) + OPTIMIZATIONS HDFS-5790. LeaseManager.findPath is very slow when many leases need recovery @@ -521,6 +529,14 @@ Release 2.4.0 - UNRELEASED HDFS-5961. OIV cannot load fsimages containing a symbolic link. (kihwal) + HDFS-5483. NN should gracefully handle multiple block replicas on same DN. + (Arpit Agarwal) + + HDFS-5742. DatanodeCluster (mini cluster of DNs) fails to start. + (Arpit Agarwal) + + HDFS-5979. Typo and logger fix for fsimage PB code. (wang) + BREAKDOWN OF HDFS-5698 SUBTASKS AND RELATED JIRAS HDFS-5717. Save FSImage header in protobuf. (Haohui Mai via jing9) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index f6ed7c66e2..772d87b238 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@ -1581,8 +1581,8 @@ public static DatanodeStorageProto convert(DatanodeStorage s) { private static StorageState convertState(State state) { switch(state) { - case READ_ONLY: - return StorageState.READ_ONLY; + case READ_ONLY_SHARED: + return StorageState.READ_ONLY_SHARED; case NORMAL: default: return StorageState.NORMAL; @@ -1610,8 +1610,8 @@ public static DatanodeStorage convert(DatanodeStorageProto s) { private static State convertState(StorageState state) { switch(state) { - case READ_ONLY: - return DatanodeStorage.State.READ_ONLY; + case READ_ONLY_SHARED: + return DatanodeStorage.State.READ_ONLY_SHARED; case NORMAL: default: return DatanodeStorage.State.NORMAL; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index b0cea259de..41f450a720 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -73,6 +73,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State; import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; @@ -501,7 +502,10 @@ private void dumpBlockMeta(Block block, PrintWriter out) { chooseSourceDatanode(block, containingNodes, containingLiveReplicasNodes, numReplicas, UnderReplicatedBlocks.LEVEL); - assert containingLiveReplicasNodes.size() == numReplicas.liveReplicas(); + + // containingLiveReplicasNodes can include READ_ONLY_SHARED replicas which are + // not included in the numReplicas.liveReplicas() count + assert containingLiveReplicasNodes.size() >= numReplicas.liveReplicas(); int usableReplicas = numReplicas.liveReplicas() + numReplicas.decommissionedReplicas(); @@ -1040,7 +1044,7 @@ void addToInvalidates(final Block block, final DatanodeInfo datanode) { */ private void addToInvalidates(Block b) { StringBuilder datanodes = new StringBuilder(); - for(DatanodeStorageInfo storage : blocksMap.getStorages(b)) { + for(DatanodeStorageInfo storage : blocksMap.getStorages(b, State.NORMAL)) { final DatanodeDescriptor node = storage.getDatanodeDescriptor(); invalidateBlocks.add(b, node, false); datanodes.append(node).append(" "); @@ -1254,7 +1258,10 @@ int computeReplicationWorkForBlocks(List> blocksToReplicate) { continue; } - assert liveReplicaNodes.size() == numReplicas.liveReplicas(); + // liveReplicaNodes can include READ_ONLY_SHARED replicas which are + // not included in the numReplicas.liveReplicas() count + assert liveReplicaNodes.size() >= numReplicas.liveReplicas(); + // do not schedule more if enough replicas is already pending numEffectiveReplicas = numReplicas.liveReplicas() + pendingReplications.getNumReplicas(block); @@ -1494,15 +1501,16 @@ DatanodeDescriptor chooseSourceDatanode(Block block, final DatanodeDescriptor node = storage.getDatanodeDescriptor(); LightWeightLinkedSet excessBlocks = excessReplicateMap.get(node.getDatanodeUuid()); + int countableReplica = storage.getState() == State.NORMAL ? 1 : 0; if ((nodesCorrupt != null) && (nodesCorrupt.contains(node))) - corrupt++; + corrupt += countableReplica; else if (node.isDecommissionInProgress() || node.isDecommissioned()) - decommissioned++; + decommissioned += countableReplica; else if (excessBlocks != null && excessBlocks.contains(block)) { - excess++; + excess += countableReplica; } else { nodesContainingLiveReplicas.add(storage); - live++; + live += countableReplica; } containingNodes.add(node); // Check if this replica is corrupt @@ -1880,7 +1888,8 @@ private void reportDiff(DatanodeDescriptor dn, DatanodeStorage storage, iblk, iState, toAdd, toInvalidate, toCorrupt, toUC); // move block to the head of the list - if (storedBlock != null && (curIndex = storedBlock.findDatanode(dn)) >= 0) { + if (storedBlock != null && + (curIndex = storedBlock.findStorageInfo(storageInfo)) >= 0) { headIndex = storageInfo.moveBlockToHead(storedBlock, curIndex, headIndex); } } @@ -2581,7 +2590,7 @@ private void processOverReplicatedBlock(final Block block, Collection nonExcess = new ArrayList(); Collection corruptNodes = corruptReplicas .getNodes(block); - for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) { + for(DatanodeStorageInfo storage : blocksMap.getStorages(block, State.NORMAL)) { final DatanodeDescriptor cur = storage.getDatanodeDescriptor(); if (storage.areBlockContentsStale()) { LOG.info("BLOCK* processOverReplicatedBlock: " + @@ -2910,7 +2919,7 @@ public NumberReplicas countNodes(Block b) { int excess = 0; int stale = 0; Collection nodesCorrupt = corruptReplicas.getNodes(b); - for(DatanodeStorageInfo storage : blocksMap.getStorages(b)) { + for(DatanodeStorageInfo storage : blocksMap.getStorages(b, State.NORMAL)) { final DatanodeDescriptor node = storage.getDatanodeDescriptor(); if ((nodesCorrupt != null) && (nodesCorrupt.contains(node))) { corrupt++; @@ -2949,7 +2958,7 @@ int countLiveNodes(BlockInfo b) { // else proceed with fast case int live = 0; Collection nodesCorrupt = corruptReplicas.getNodes(b); - for(DatanodeStorageInfo storage : blocksMap.getStorages(b)) { + for(DatanodeStorageInfo storage : blocksMap.getStorages(b, State.NORMAL)) { final DatanodeDescriptor node = storage.getDatanodeDescriptor(); if ((nodesCorrupt == null) || (!nodesCorrupt.contains(node))) live++; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java index 8b740cd94c..da29f49bff 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java @@ -605,7 +605,7 @@ private boolean isGoodTarget(DatanodeStorageInfo storage, + storageType); return false; } - if (storage.getState() == State.READ_ONLY) { + if (storage.getState() == State.READ_ONLY_SHARED) { logNodeIsNotChosen(storage, "storage is read-only"); return false; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java index 75bd770fc8..eafd05cdfe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java @@ -20,10 +20,14 @@ import java.util.Iterator; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.util.GSet; import org.apache.hadoop.util.LightWeightGSet; import org.apache.hadoop.util.LightWeightGSet.SetIterator; +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; + /** * This class maintains the map from a block to its metadata. * block's metadata currently includes blockCollection it belongs to and @@ -135,6 +139,22 @@ Iterable getStorages(Block b) { return getStorages(blocks.get(b)); } + /** + * Searches for the block in the BlocksMap and + * returns {@link Iterable} of the storages the block belongs to + * that are of the given {@link DatanodeStorage.State state}. + * + * @param state DatanodeStorage state by which to filter the returned Iterable + */ + Iterable getStorages(Block b, final DatanodeStorage.State state) { + return Iterables.filter(getStorages(blocks.get(b)), new Predicate() { + @Override + public boolean apply(DatanodeStorageInfo storage) { + return storage.getState() == state; + } + }); + } + /** * For a block that has already been retrieved from the BlocksMap * returns {@link Iterable} of the storages the block belongs to. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index 3106f7f688..c1ed03ceb3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -78,7 +78,6 @@ class BlockReceiver implements Closeable { private boolean needsChecksumTranslation; private OutputStream out = null; // to block file at local disk private FileDescriptor outFd; - private OutputStream cout = null; // output stream for cehcksum file private DataOutputStream checksumOut = null; // to crc file at local disk private int bytesPerChecksum; private int checksumSize; @@ -223,9 +222,8 @@ class BlockReceiver implements Closeable { LOG.warn("Could not get file descriptor for outputstream of class " + out.getClass()); } - this.cout = streams.getChecksumOut(); this.checksumOut = new DataOutputStream(new BufferedOutputStream( - cout, HdfsConstants.SMALL_BUFFER_SIZE)); + streams.getChecksumOut(), HdfsConstants.SMALL_BUFFER_SIZE)); // write data chunk header if creating a new replica if (isCreate) { BlockMetadataHeader.writeHeader(checksumOut, diskChecksum); @@ -280,9 +278,9 @@ public void close() throws IOException { long flushStartNanos = System.nanoTime(); checksumOut.flush(); long flushEndNanos = System.nanoTime(); - if (syncOnClose && (cout instanceof FileOutputStream)) { + if (syncOnClose) { long fsyncStartNanos = flushEndNanos; - ((FileOutputStream)cout).getChannel().force(true); + streams.syncChecksumOut(); datanode.metrics.addFsyncNanos(System.nanoTime() - fsyncStartNanos); } flushTotalNanos += flushEndNanos - flushStartNanos; @@ -302,9 +300,9 @@ public void close() throws IOException { long flushStartNanos = System.nanoTime(); out.flush(); long flushEndNanos = System.nanoTime(); - if (syncOnClose && (out instanceof FileOutputStream)) { + if (syncOnClose) { long fsyncStartNanos = flushEndNanos; - ((FileOutputStream)out).getChannel().force(true); + streams.syncDataOut(); datanode.metrics.addFsyncNanos(System.nanoTime() - fsyncStartNanos); } flushTotalNanos += flushEndNanos - flushStartNanos; @@ -338,9 +336,9 @@ void flushOrSync(boolean isSync) throws IOException { long flushStartNanos = System.nanoTime(); checksumOut.flush(); long flushEndNanos = System.nanoTime(); - if (isSync && (cout instanceof FileOutputStream)) { + if (isSync) { long fsyncStartNanos = flushEndNanos; - ((FileOutputStream)cout).getChannel().force(true); + streams.syncChecksumOut(); datanode.metrics.addFsyncNanos(System.nanoTime() - fsyncStartNanos); } flushTotalNanos += flushEndNanos - flushStartNanos; @@ -349,9 +347,9 @@ void flushOrSync(boolean isSync) throws IOException { long flushStartNanos = System.nanoTime(); out.flush(); long flushEndNanos = System.nanoTime(); - if (isSync && (out instanceof FileOutputStream)) { + if (isSync) { long fsyncStartNanos = flushEndNanos; - ((FileOutputStream)out).getChannel().force(true); + streams.syncDataOut(); datanode.metrics.addFsyncNanos(System.nanoTime() - fsyncStartNanos); } flushTotalNanos += flushEndNanos - flushStartNanos; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java index 3866392d93..95044c825d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java @@ -18,7 +18,9 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset; import java.io.Closeable; +import java.io.FileOutputStream; import java.io.OutputStream; +import java.io.IOException; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.DataChecksum; @@ -62,4 +64,23 @@ public void close() { IOUtils.closeStream(dataOut); IOUtils.closeStream(checksumOut); } -} \ No newline at end of file + + /** + * Sync the data stream if it supports it. + */ + public void syncDataOut() throws IOException { + if (dataOut instanceof FileOutputStream) { + ((FileOutputStream)dataOut).getChannel().force(true); + } + } + + /** + * Sync the checksum stream if it supports it. + */ + public void syncChecksumOut() throws IOException { + if (checksumOut instanceof FileOutputStream) { + ((FileOutputStream)checksumOut).getChannel().force(true); + } + } + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java index a865a24a66..0a55c2b98f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java @@ -139,6 +139,7 @@ void format(FSNamesystem fsn, String clusterId) throws IOException { "FSImage.format should be called with an uninitialized namesystem, has " + fileCount + " files"); NamespaceInfo ns = NNStorage.newNamespaceInfo(); + LOG.info("Allocated new BlockPoolId: " + ns.getBlockPoolID()); ns.clusterID = clusterId; storage.format(ns); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java index 40a03ce971..5018aa03f8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java @@ -75,7 +75,7 @@ public final class FSImageFormatPBINode { private static final AclEntryType[] ACL_ENTRY_TYPE_VALUES = AclEntryType .values(); - private static final Log LOG = LogFactory.getLog(FSImageFormatProtobuf.class); + private static final Log LOG = LogFactory.getLog(FSImageFormatPBINode.class); public final static class Loader { public static PermissionStatus loadPermission(long id, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java index 57a5274886..adc913758b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java @@ -267,7 +267,7 @@ public int compare(FileSummary.Section s1, FileSummary.Section s2) { } break; default: - LOG.warn("Unregconized section " + n); + LOG.warn("Unrecognized section " + n); break; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java index 1f811bbeb0..49874b6a73 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java @@ -58,6 +58,8 @@ import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementStatus; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; +import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.net.NetUtils; @@ -378,11 +380,13 @@ void check(String parent, HdfsFileStatus file, Result res) throws IOException { boolean isCorrupt = lBlk.isCorrupt(); String blkName = block.toString(); DatanodeInfo[] locs = lBlk.getLocations(); - res.totalReplicas += locs.length; + NumberReplicas numberReplicas = namenode.getNamesystem().getBlockManager().countNodes(block.getLocalBlock()); + int liveReplicas = numberReplicas.liveReplicas(); + res.totalReplicas += liveReplicas; short targetFileReplication = file.getReplication(); res.numExpectedReplicas += targetFileReplication; - if (locs.length > targetFileReplication) { - res.excessiveReplicas += (locs.length - targetFileReplication); + if (liveReplicas > targetFileReplication) { + res.excessiveReplicas += (liveReplicas - targetFileReplication); res.numOverReplicatedBlocks += 1; } // Check if block is Corrupt @@ -392,10 +396,10 @@ void check(String parent, HdfsFileStatus file, Result res) throws IOException { out.print("\n" + path + ": CORRUPT blockpool " + block.getBlockPoolId() + " block " + block.getBlockName()+"\n"); } - if (locs.length >= minReplication) + if (liveReplicas >= minReplication) res.numMinReplicatedBlocks++; - if (locs.length < targetFileReplication && locs.length > 0) { - res.missingReplicas += (targetFileReplication - locs.length); + if (liveReplicas < targetFileReplication && liveReplicas > 0) { + res.missingReplicas += (targetFileReplication - liveReplicas); res.numUnderReplicatedBlocks += 1; underReplicatedPerFile++; if (!showFiles) { @@ -404,7 +408,7 @@ void check(String parent, HdfsFileStatus file, Result res) throws IOException { out.println(" Under replicated " + block + ". Target Replicas is " + targetFileReplication + " but found " + - locs.length + " replica(s)."); + liveReplicas + " replica(s)."); } // verify block placement policy BlockPlacementStatus blockPlacementStatus = bpPolicy @@ -421,13 +425,13 @@ void check(String parent, HdfsFileStatus file, Result res) throws IOException { block + ". " + blockPlacementStatus.getErrorDescription()); } report.append(i + ". " + blkName + " len=" + block.getNumBytes()); - if (locs.length == 0) { + if (liveReplicas == 0) { report.append(" MISSING!"); res.addMissing(block.toString(), block.getNumBytes()); missing++; missize += block.getNumBytes(); } else { - report.append(" repl=" + locs.length); + report.append(" repl=" + liveReplicas); if (showLocations || showRacks) { StringBuilder sb = new StringBuilder("["); for (int j = 0; j < locs.length; j++) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeStorage.java index 271f71091d..09675cdf3f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeStorage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeStorage.java @@ -28,7 +28,18 @@ public class DatanodeStorage { /** The state of the storage. */ public enum State { NORMAL, - READ_ONLY + + /** + * A storage that represents a read-only path to replicas stored on a shared storage device. + * Replicas on {@link #READ_ONLY_SHARED} storage are not counted towards live replicas. + * + *

+ * In certain implementations, a {@link #READ_ONLY_SHARED} storage may be correlated to + * its {@link #NORMAL} counterpart using the {@link DatanodeStorage#storageID}. This + * property should be used for debugging purposes only. + *

+ */ + READ_ONLY_SHARED; } private final String storageID; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto index 160f953b29..5e78dcfe7f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto @@ -50,7 +50,7 @@ message DatanodeRegistrationProto { message DatanodeStorageProto { enum StorageState { NORMAL = 0; - READ_ONLY = 1; + READ_ONLY_SHARED = 1; } required string storageUuid = 1; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DataNodeCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DataNodeCluster.java index f3b3ad2af5..01d2c85afe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DataNodeCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DataNodeCluster.java @@ -68,9 +68,10 @@ public class DataNodeCluster { static String dataNodeDirs = DATANODE_DIRS; static final String USAGE = "Usage: datanodecluster " + - " -n " + + " -n " + + " -bpid " + " [-racks ] " + - " [-simulated] " + + " [-simulated []] " + " [-inject startingBlockId numBlocksPerDN]" + " [-r replicationFactorForInjectedBlocks]" + " [-d dataNodeDirs]\n" + @@ -91,7 +92,7 @@ static void printUsageExit(String err) { printUsageExit(); } - public static void main(String[] args) { + public static void main(String[] args) throws InterruptedException { int numDataNodes = 0; int numRacks = 0; boolean inject = false; @@ -99,6 +100,8 @@ public static void main(String[] args) { int numBlocksPerDNtoInject = 0; int replication = 1; boolean checkDataNodeAddrConfig = false; + long simulatedCapacityPerDn = SimulatedFSDataset.DEFAULT_CAPACITY; + String bpid = null; Configuration conf = new HdfsConfiguration(); @@ -115,7 +118,7 @@ public static void main(String[] args) { numRacks = Integer.parseInt(args[i]); } else if (args[i].equals("-r")) { if (++i >= args.length || args[i].startsWith("-")) { - printUsageExit("Missing replicaiton factor"); + printUsageExit("Missing replication factor"); } replication = Integer.parseInt(args[i]); } else if (args[i].equals("-d")) { @@ -125,6 +128,14 @@ public static void main(String[] args) { dataNodeDirs = args[i]; } else if (args[i].equals("-simulated")) { SimulatedFSDataset.setFactory(conf); + if ((i+1) < args.length && !args[i+1].startsWith("-")) { + simulatedCapacityPerDn = Long.parseLong(args[++i]); + } + } else if (args[i].equals("-bpid")) { + if (++i >= args.length || args[i].startsWith("-")) { + printUsageExit("Missing blockpoolid parameter"); + } + bpid = args[i]; } else if (args[i].equals("-inject")) { if (!FsDatasetSpi.Factory.getFactory(conf).isSimulated()) { System.out.print("-inject is valid only for simulated"); @@ -153,6 +164,9 @@ public static void main(String[] args) { printUsageExit("Replication must be less than or equal to numDataNodes"); } + if (bpid == null) { + printUsageExit("BlockPoolId must be provided"); + } String nameNodeAdr = FileSystem.getDefaultUri(conf).getAuthority(); if (nameNodeAdr == null) { System.out.println("No name node address and port in config"); @@ -162,9 +176,14 @@ public static void main(String[] args) { System.out.println("Starting " + numDataNodes + (simulated ? " Simulated " : " ") + " Data Nodes that will connect to Name Node at " + nameNodeAdr); - + System.setProperty("test.build.data", dataNodeDirs); + long simulatedCapacities[] = new long[numDataNodes]; + for (int i = 0; i < numDataNodes; ++i) { + simulatedCapacities[i] = simulatedCapacityPerDn; + } + MiniDFSCluster mc = new MiniDFSCluster(); try { mc.formatDataNodeDirs(); @@ -182,13 +201,12 @@ public static void main(String[] args) { //rack4DataNode[i] = racks[i%numRacks]; rack4DataNode[i] = rackPrefix + "-" + i%numRacks; System.out.println("Data Node " + i + " using " + rack4DataNode[i]); - - } } try { mc.startDataNodes(conf, numDataNodes, true, StartupOption.REGULAR, - rack4DataNode, null, null, false, checkDataNodeAddrConfig); + rack4DataNode, null, simulatedCapacities, false, checkDataNodeAddrConfig); + Thread.sleep(10*1000); // Give the DN some time to connect to NN and init storage directories. if (inject) { long blockSize = 10; System.out.println("Injecting " + numBlocksPerDNtoInject + @@ -203,7 +221,7 @@ public static void main(String[] args) { } for (int i = 1; i <= replication; ++i) { // inject blocks for dn_i into dn_i and replica in dn_i's neighbors - mc.injectBlocks((i_dn + i- 1)% numDataNodes, Arrays.asList(blocks)); + mc.injectBlocks((i_dn + i- 1)% numDataNodes, Arrays.asList(blocks), bpid); System.out.println("Injecting blocks of dn " + i_dn + " into dn" + ((i_dn + i- 1)% numDataNodes)); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java index f163faea63..97c9a456a9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java @@ -157,6 +157,7 @@ public static class Builder { private boolean checkExitOnShutdown = true; private boolean checkDataNodeAddrConfig = false; private boolean checkDataNodeHostConfig = false; + private Configuration[] dnConfOverlays; public Builder(Configuration conf) { this.conf = conf; @@ -333,6 +334,19 @@ public Builder nnTopology(MiniDFSNNTopology topology) { return this; } + /** + * Default: null + * + * An array of {@link Configuration} objects that will overlay the + * global MiniDFSCluster Configuration for the corresponding DataNode. + * + * Useful for setting specific per-DataNode configuration parameters. + */ + public Builder dataNodeConfOverlays(Configuration[] dnConfOverlays) { + this.dnConfOverlays = dnConfOverlays; + return this; + } + /** * Construct the actual MiniDFSCluster */ @@ -375,7 +389,8 @@ protected MiniDFSCluster(Builder builder) throws IOException { builder.nnTopology, builder.checkExitOnShutdown, builder.checkDataNodeAddrConfig, - builder.checkDataNodeHostConfig); + builder.checkDataNodeHostConfig, + builder.dnConfOverlays); } public class DataNodeProperties { @@ -625,7 +640,7 @@ public MiniDFSCluster(int nameNodePort, manageNameDfsDirs, true, manageDataDfsDirs, manageDataDfsDirs, operation, null, racks, hosts, simulatedCapacities, null, true, false, - MiniDFSNNTopology.simpleSingleNN(nameNodePort, 0), true, false, false); + MiniDFSNNTopology.simpleSingleNN(nameNodePort, 0), true, false, false, null); } private void initMiniDFSCluster( @@ -638,7 +653,8 @@ private void initMiniDFSCluster( boolean waitSafeMode, boolean setupHostsFile, MiniDFSNNTopology nnTopology, boolean checkExitOnShutdown, boolean checkDataNodeAddrConfig, - boolean checkDataNodeHostConfig) + boolean checkDataNodeHostConfig, + Configuration[] dnConfOverlays) throws IOException { ExitUtil.disableSystemExit(); @@ -703,7 +719,7 @@ private void initMiniDFSCluster( startDataNodes(conf, numDataNodes, storageType, manageDataDfsDirs, dnStartOpt != null ? dnStartOpt : startOpt, racks, hosts, simulatedCapacities, setupHostsFile, - checkDataNodeAddrConfig, checkDataNodeHostConfig); + checkDataNodeAddrConfig, checkDataNodeHostConfig, dnConfOverlays); waitClusterUp(); //make sure ProxyUsers uses the latest conf ProxyUsers.refreshSuperUserGroupsConfiguration(conf); @@ -1110,7 +1126,7 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes, long[] simulatedCapacities, boolean setupHostsFile) throws IOException { startDataNodes(conf, numDataNodes, StorageType.DEFAULT, manageDfsDirs, operation, racks, hosts, - simulatedCapacities, setupHostsFile, false, false); + simulatedCapacities, setupHostsFile, false, false, null); } /** @@ -1124,7 +1140,7 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes, boolean setupHostsFile, boolean checkDataNodeAddrConfig) throws IOException { startDataNodes(conf, numDataNodes, StorageType.DEFAULT, manageDfsDirs, operation, racks, hosts, - simulatedCapacities, setupHostsFile, checkDataNodeAddrConfig, false); + simulatedCapacities, setupHostsFile, checkDataNodeAddrConfig, false, null); } /** @@ -1151,7 +1167,8 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes, * @param setupHostsFile add new nodes to dfs hosts files * @param checkDataNodeAddrConfig if true, only set DataNode port addresses if not already set in config * @param checkDataNodeHostConfig if true, only set DataNode hostname key if not already set in config - * + * @param dnConfOverlays An array of {@link Configuration} objects that will overlay the + * global MiniDFSCluster Configuration for the corresponding DataNode. * @throws IllegalStateException if NameNode has been shutdown */ public synchronized void startDataNodes(Configuration conf, int numDataNodes, @@ -1160,7 +1177,8 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes, long[] simulatedCapacities, boolean setupHostsFile, boolean checkDataNodeAddrConfig, - boolean checkDataNodeHostConfig) throws IOException { + boolean checkDataNodeHostConfig, + Configuration[] dnConfOverlays) throws IOException { if (operation == StartupOption.RECOVER) { return; } @@ -1200,6 +1218,13 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes, + simulatedCapacities.length + "] is less than the number of datanodes [" + numDataNodes + "]."); } + + if (dnConfOverlays != null + && numDataNodes > dnConfOverlays.length) { + throw new IllegalArgumentException( "The length of dnConfOverlays [" + + dnConfOverlays.length + + "] is less than the number of datanodes [" + numDataNodes + "]."); + } String [] dnArgs = (operation == null || operation != StartupOption.ROLLBACK) ? @@ -1208,6 +1233,9 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes, for (int i = curDatanodesNum; i < curDatanodesNum+numDataNodes; i++) { Configuration dnConf = new HdfsConfiguration(conf); + if (dnConfOverlays != null) { + dnConf.addResource(dnConfOverlays[i]); + } // Set up datanode address setupDatanodeAddress(dnConf, setupHostsFile, checkDataNodeAddrConfig); if (manageDfsDirs) { @@ -2057,17 +2085,19 @@ public List> getAllBlockReports(String bp return result; } - /** * This method is valid only if the data nodes have simulated data * @param dataNodeIndex - data node i which to inject - the index is same as for getDataNodes() * @param blocksToInject - the blocks + * @param bpid - (optional) the block pool id to use for injecting blocks. + * If not supplied then it is queried from the in-process NameNode. * @throws IOException * if not simulatedFSDataset * if any of blocks already exist in the data node * */ - public void injectBlocks(int dataNodeIndex, Iterable blocksToInject) throws IOException { + public void injectBlocks(int dataNodeIndex, + Iterable blocksToInject, String bpid) throws IOException { if (dataNodeIndex < 0 || dataNodeIndex > dataNodes.size()) { throw new IndexOutOfBoundsException(); } @@ -2076,7 +2106,9 @@ public void injectBlocks(int dataNodeIndex, Iterable blocksToInject) thro if (!(dataSet instanceof SimulatedFSDataset)) { throw new IOException("injectBlocks is valid only for SimilatedFSDataset"); } - String bpid = getNamesystem().getBlockPoolId(); + if (bpid == null) { + bpid = getNamesystem().getBlockPoolId(); + } SimulatedFSDataset sdataset = (SimulatedFSDataset) dataSet; sdataset.injectBlocks(bpid, blocksToInject); dataNodes.get(dataNodeIndex).datanode.scheduleAllBlockReport(0); @@ -2101,25 +2133,6 @@ public void injectBlocks(int nameNodeIndex, int dataNodeIndex, dataNodes.get(dataNodeIndex).datanode.scheduleAllBlockReport(0); } - /** - * This method is valid only if the data nodes have simulated data - * @param blocksToInject - blocksToInject[] is indexed in the same order as the list - * of datanodes returned by getDataNodes() - * @throws IOException - * if not simulatedFSDataset - * if any of blocks already exist in the data nodes - * Note the rest of the blocks are not injected. - */ - public void injectBlocks(Iterable[] blocksToInject) - throws IOException { - if (blocksToInject.length > dataNodes.size()) { - throw new IndexOutOfBoundsException(); - } - for (int i = 0; i < blocksToInject.length; ++i) { - injectBlocks(i, blocksToInject[i]); - } - } - /** * Set the softLimit and hardLimit of client lease periods */ @@ -2166,11 +2179,13 @@ public String getDataDirectory() { * @return the base directory for this instance. */ protected String determineDfsBaseDir() { - String dfsdir = conf.get(HDFS_MINIDFS_BASEDIR, null); - if (dfsdir == null) { - dfsdir = getBaseDirectory(); + if (conf != null) { + final String dfsdir = conf.get(HDFS_MINIDFS_BASEDIR, null); + if (dfsdir != null) { + return dfsdir; + } } - return dfsdir; + return getBaseDirectory(); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSClusterWithNodeGroup.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSClusterWithNodeGroup.java index 42ec9f84df..ae5028c282 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSClusterWithNodeGroup.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSClusterWithNodeGroup.java @@ -210,7 +210,8 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes, long[] simulatedCapacities, boolean setupHostsFile, boolean checkDataNodeAddrConfig, - boolean checkDataNodeHostConfig) throws IOException { + boolean checkDataNodeHostConfig, + Configuration[] dnConfOverlays) throws IOException { startDataNodes(conf, numDataNodes, storageType, manageDfsDirs, operation, racks, NODE_GROUPS, hosts, simulatedCapacities, setupHostsFile, checkDataNodeAddrConfig, checkDataNodeHostConfig); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestInjectionForSimulatedStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestInjectionForSimulatedStorage.java index 1f2fbbd8e4..a63c8d852a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestInjectionForSimulatedStorage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestInjectionForSimulatedStorage.java @@ -168,7 +168,7 @@ public void testInjection() throws IOException { // Insert all the blocks in the first data node LOG.info("Inserting " + uniqueBlocks.size() + " blocks"); - cluster.injectBlocks(0, uniqueBlocks); + cluster.injectBlocks(0, uniqueBlocks, null); dfsClient = new DFSClient(new InetSocketAddress("localhost", cluster.getNameNodePort()), diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java index 653fa79cc8..39f2f3cb95 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java @@ -209,7 +209,7 @@ private void testUnevenDistribution(Configuration conf, ClientProtocol.class).getProxy(); for(int i = 0; i < blocksDN.length; i++) - cluster.injectBlocks(i, Arrays.asList(blocksDN[i])); + cluster.injectBlocks(i, Arrays.asList(blocksDN[i]), null); final long totalCapacity = sum(capacities); runBalancer(conf, totalUsedSpace, totalCapacity); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index 3bf2f052fc..1b2d9ee8a5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -34,6 +34,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; @@ -96,6 +97,11 @@ public static void setFactory(Configuration conf) { public static final long DEFAULT_CAPACITY = 2L<<40; // 1 terabyte public static final byte DEFAULT_DATABYTE = 9; + public static final String CONFIG_PROPERTY_STATE = + "dfs.datanode.simulateddatastorage.state"; + private static final DatanodeStorage.State DEFAULT_STATE = + DatanodeStorage.State.NORMAL; + static final byte[] nullCrcFileData; static { DataChecksum checksum = DataChecksum.newDataChecksum( @@ -325,9 +331,9 @@ void free(long amount) { private static class SimulatedStorage { private Map map = new HashMap(); - private final String storageUuid = "SimulatedStroage-" + DatanodeStorage.generateUuid(); private final long capacity; // in bytes + private final DatanodeStorage dnStorage; synchronized long getFree() { return capacity - getUsed(); @@ -365,8 +371,11 @@ synchronized void free(String bpid, long amount) throws IOException { getBPStorage(bpid).free(amount); } - SimulatedStorage(long cap) { + SimulatedStorage(long cap, DatanodeStorage.State state) { capacity = cap; + dnStorage = new DatanodeStorage( + "SimulatedStorage-" + DatanodeStorage.generateUuid(), + state, StorageType.DEFAULT); } synchronized void addBlockPool(String bpid) { @@ -390,11 +399,15 @@ private SimulatedBPStorage getBPStorage(String bpid) throws IOException { } String getStorageUuid() { - return storageUuid; + return dnStorage.getStorageID(); + } + + DatanodeStorage getDnStorage() { + return dnStorage; } synchronized StorageReport getStorageReport(String bpid) { - return new StorageReport(new DatanodeStorage(getStorageUuid()), + return new StorageReport(dnStorage, false, getCapacity(), getUsed(), getFree(), map.get(bpid).getUsed()); } @@ -417,7 +430,8 @@ public SimulatedFSDataset(DataStorage storage, Configuration conf) { registerMBean(datanodeUuid); this.storage = new SimulatedStorage( - conf.getLong(CONFIG_PROPERTY_CAPACITY, DEFAULT_CAPACITY)); + conf.getLong(CONFIG_PROPERTY_CAPACITY, DEFAULT_CAPACITY), + conf.getEnum(CONFIG_PROPERTY_STATE, DEFAULT_STATE)); } public synchronized void injectBlocks(String bpid, @@ -488,7 +502,7 @@ synchronized BlockListAsLongs getBlockReport(String bpid) { @Override public synchronized Map getBlockReports( String bpid) { - return Collections.singletonMap(new DatanodeStorage(storage.storageUuid), getBlockReport(bpid)); + return Collections.singletonMap(storage.getDnStorage(), getBlockReport(bpid)); } @Override // FsDatasetSpi diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockHasMultipleReplicasOnSameDN.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockHasMultipleReplicasOnSameDN.java new file mode 100644 index 0000000000..0b28d554f1 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockHasMultipleReplicasOnSameDN.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; + + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.*; +import org.apache.hadoop.hdfs.protocol.*; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; +import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; +import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; + +/** + * This test verifies NameNode behavior when it gets unexpected block reports + * from DataNodes. The same block is reported by two different storages on + * the same DataNode. Excess replicas on the same DN should be ignored by the NN. + */ +public class TestBlockHasMultipleReplicasOnSameDN { + public static final Log LOG = LogFactory.getLog(TestBlockHasMultipleReplicasOnSameDN.class); + + private static short NUM_DATANODES = 2; + private static final int BLOCK_SIZE = 1024; + private static final long NUM_BLOCKS = 5; + private static final long seed = 0x1BADF00DL; + + private Configuration conf; + private MiniDFSCluster cluster; + private DistributedFileSystem fs; + private DFSClient client; + private String bpid; + + @Before + public void startUpCluster() throws IOException { + conf = new HdfsConfiguration(); + cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(NUM_DATANODES) + .build(); + fs = cluster.getFileSystem(); + client = fs.getClient(); + bpid = cluster.getNamesystem().getBlockPoolId(); + } + + @After + public void shutDownCluster() throws IOException { + if (cluster != null) { + fs.close(); + cluster.shutdown(); + cluster = null; + } + } + + private String makeFileName(String prefix) { + return "/" + prefix + ".dat"; + } + + /** + * Verify NameNode behavior when a given DN reports multiple replicas + * of a given block. + */ + @Test + public void testBlockHasMultipleReplicasOnSameDN() throws IOException { + String filename = makeFileName(GenericTestUtils.getMethodName()); + Path filePath = new Path(filename); + + // Write out a file with a few blocks. + DFSTestUtil.createFile(fs, filePath, BLOCK_SIZE, BLOCK_SIZE * NUM_BLOCKS, + BLOCK_SIZE, NUM_DATANODES, seed); + + // Get the block list for the file with the block locations. + LocatedBlocks locatedBlocks = client.getLocatedBlocks( + filePath.toString(), 0, BLOCK_SIZE * NUM_BLOCKS); + + // Generate a fake block report from one of the DataNodes, such + // that it reports one copy of each block on either storage. + DataNode dn = cluster.getDataNodes().get(0); + DatanodeRegistration dnReg = dn.getDNRegistrationForBP(bpid); + StorageBlockReport reports[] = + new StorageBlockReport[MiniDFSCluster.DIRS_PER_DATANODE]; + + ArrayList blocks = new ArrayList(); + + for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) { + blocks.add(locatedBlock.getBlock().getLocalBlock()); + } + + for (int i = 0; i < MiniDFSCluster.DIRS_PER_DATANODE; ++i) { + BlockListAsLongs bll = new BlockListAsLongs(blocks, null); + FsVolumeSpi v = dn.getFSDataset().getVolumes().get(i); + DatanodeStorage dns = new DatanodeStorage(v.getStorageID()); + reports[i] = new StorageBlockReport(dns, bll.getBlockListAsLongs()); + } + + // Should not assert! + cluster.getNameNodeRpc().blockReport(dnReg, bpid, reports); + + // Get the block locations once again. + locatedBlocks = client.getLocatedBlocks(filename, 0, BLOCK_SIZE * NUM_BLOCKS); + + // Make sure that each block has two replicas, one on each DataNode. + for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) { + DatanodeInfo[] locations = locatedBlock.getLocations(); + assertThat(locations.length, is((int) NUM_DATANODES)); + assertThat(locations[0].getDatanodeUuid(), not(locations[1].getDatanodeUuid())); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestReadOnlySharedStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestReadOnlySharedStorage.java new file mode 100644 index 0000000000..3815af5396 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestReadOnlySharedStorage.java @@ -0,0 +1,270 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.datanode; + +import static org.hamcrest.CoreMatchers.*; +import static org.junit.Assert.*; +import static org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State.*; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Collections; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; +import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; +import org.apache.hadoop.hdfs.server.protocol.StorageReport; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.Iterables; + +/** + * Test proper {@link BlockManager} replication counting for {@link DatanodeStorage}s + * with {@link DatanodeStorage.State#READ_ONLY_SHARED READ_ONLY} state. + * + * Uses {@link SimulatedFSDataset} to inject read-only replicas into a DataNode. + */ +public class TestReadOnlySharedStorage { + + public static final Log LOG = LogFactory.getLog(TestReadOnlySharedStorage.class); + + private static short NUM_DATANODES = 3; + private static int RO_NODE_INDEX = 0; + private static final int BLOCK_SIZE = 1024; + private static final long seed = 0x1BADF00DL; + private static final Path PATH = new Path("/" + TestReadOnlySharedStorage.class.getName() + ".dat"); + private static final int RETRIES = 10; + + private Configuration conf; + private MiniDFSCluster cluster; + private DistributedFileSystem fs; + private DFSClient client; + + private BlockManager blockManager; + + private DatanodeManager datanodeManager; + private DatanodeInfo normalDataNode; + private DatanodeInfo readOnlyDataNode; + + private Block block; + + private ExtendedBlock extendedBlock; + + + /** + * Setup a {@link MiniDFSCluster}. + * Create a block with both {@link State#NORMAL} and {@link State#READ_ONLY_SHARED} replicas. + */ + @Before + public void setup() throws IOException, InterruptedException { + conf = new HdfsConfiguration(); + SimulatedFSDataset.setFactory(conf); + + Configuration[] overlays = new Configuration[NUM_DATANODES]; + for (int i = 0; i < overlays.length; i++) { + overlays[i] = new Configuration(); + if (i == RO_NODE_INDEX) { + overlays[i].setEnum(SimulatedFSDataset.CONFIG_PROPERTY_STATE, + i == RO_NODE_INDEX + ? READ_ONLY_SHARED + : NORMAL); + } + } + + cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(NUM_DATANODES) + .dataNodeConfOverlays(overlays) + .build(); + fs = cluster.getFileSystem(); + blockManager = cluster.getNameNode().getNamesystem().getBlockManager(); + datanodeManager = blockManager.getDatanodeManager(); + client = new DFSClient(new InetSocketAddress("localhost", cluster.getNameNodePort()), + cluster.getConfiguration(0)); + + for (int i = 0; i < NUM_DATANODES; i++) { + DataNode dataNode = cluster.getDataNodes().get(i); + validateStorageState( + BlockManagerTestUtil.getStorageReportsForDatanode( + datanodeManager.getDatanode(dataNode.getDatanodeId())), + i == RO_NODE_INDEX + ? READ_ONLY_SHARED + : NORMAL); + } + + // Create a 1 block file + DFSTestUtil.createFile(fs, PATH, BLOCK_SIZE, BLOCK_SIZE, + BLOCK_SIZE, (short) 1, seed); + + LocatedBlock locatedBlock = getLocatedBlock(); + extendedBlock = locatedBlock.getBlock(); + block = extendedBlock.getLocalBlock(); + + assertThat(locatedBlock.getLocations().length, is(1)); + normalDataNode = locatedBlock.getLocations()[0]; + readOnlyDataNode = datanodeManager.getDatanode(cluster.getDataNodes().get(RO_NODE_INDEX).getDatanodeId()); + assertThat(normalDataNode, is(not(readOnlyDataNode))); + + validateNumberReplicas(1); + + // Inject the block into the datanode with READ_ONLY_SHARED storage + cluster.injectBlocks(0, RO_NODE_INDEX, Collections.singleton(block)); + + // There should now be 2 *locations* for the block + // Must wait until the NameNode has processed the block report for the injected blocks + waitForLocations(2); + } + + @After + public void tearDown() throws IOException { + fs.delete(PATH, false); + + if (cluster != null) { + fs.close(); + cluster.shutdown(); + cluster = null; + } + } + + private void waitForLocations(int locations) throws IOException, InterruptedException { + for (int tries = 0; tries < RETRIES; ) + try { + LocatedBlock locatedBlock = getLocatedBlock(); + assertThat(locatedBlock.getLocations().length, is(locations)); + break; + } catch (AssertionError e) { + if (++tries < RETRIES) { + Thread.sleep(1000); + } else { + throw e; + } + } + } + + private LocatedBlock getLocatedBlock() throws IOException { + LocatedBlocks locatedBlocks = client.getLocatedBlocks(PATH.toString(), 0, BLOCK_SIZE); + assertThat(locatedBlocks.getLocatedBlocks().size(), is(1)); + return Iterables.getOnlyElement(locatedBlocks.getLocatedBlocks()); + } + + private void validateStorageState(StorageReport[] storageReports, DatanodeStorage.State state) { + for (StorageReport storageReport : storageReports) { + DatanodeStorage storage = storageReport.getStorage(); + assertThat(storage.getState(), is(state)); + } + } + + private void validateNumberReplicas(int expectedReplicas) throws IOException { + NumberReplicas numberReplicas = blockManager.countNodes(block); + assertThat(numberReplicas.liveReplicas(), is(expectedReplicas)); + assertThat(numberReplicas.excessReplicas(), is(0)); + assertThat(numberReplicas.corruptReplicas(), is(0)); + assertThat(numberReplicas.decommissionedReplicas(), is(0)); + assertThat(numberReplicas.replicasOnStaleNodes(), is(0)); + + BlockManagerTestUtil.updateState(blockManager); + assertThat(blockManager.getUnderReplicatedBlocksCount(), is(0L)); + assertThat(blockManager.getExcessBlocksCount(), is(0L)); + } + + /** + * Verify that READ_ONLY_SHARED replicas are not counted towards the overall + * replication count, but are included as replica locations returned to clients for reads. + */ + @Test + public void testReplicaCounting() throws Exception { + // There should only be 1 *replica* (the READ_ONLY_SHARED doesn't count) + validateNumberReplicas(1); + + fs.setReplication(PATH, (short) 2); + + // There should now be 3 *locations* for the block, and 2 *replicas* + waitForLocations(3); + validateNumberReplicas(2); + } + + /** + * Verify that the NameNode is able to still use READ_ONLY_SHARED replicas even + * when the single NORMAL replica is offline (and the effective replication count is 0). + */ + @Test + public void testNormalReplicaOffline() throws Exception { + // Stop the datanode hosting the NORMAL replica + cluster.stopDataNode(normalDataNode.getXferAddr()); + + // Force NameNode to detect that the datanode is down + BlockManagerTestUtil.noticeDeadDatanode( + cluster.getNameNode(), normalDataNode.getXferAddr()); + + // The live replica count should now be zero (since the NORMAL replica is offline) + NumberReplicas numberReplicas = blockManager.countNodes(block); + assertThat(numberReplicas.liveReplicas(), is(0)); + + // The block should be reported as under-replicated + BlockManagerTestUtil.updateState(blockManager); + assertThat(blockManager.getUnderReplicatedBlocksCount(), is(1L)); + + // The BlockManager should be able to heal the replication count back to 1 + // by triggering an inter-datanode replication from one of the READ_ONLY_SHARED replicas + BlockManagerTestUtil.computeAllPendingWork(blockManager); + + DFSTestUtil.waitForReplication(cluster, extendedBlock, 1, 1, 0); + + // There should now be 2 *locations* for the block, and 1 *replica* + assertThat(getLocatedBlock().getLocations().length, is(2)); + validateNumberReplicas(1); + } + + /** + * Verify that corrupt READ_ONLY_SHARED replicas aren't counted + * towards the corrupt replicas total. + */ + @Test + public void testReadOnlyReplicaCorrupt() throws Exception { + // "Corrupt" a READ_ONLY_SHARED replica by reporting it as a bad replica + client.reportBadBlocks(new LocatedBlock[] { + new LocatedBlock(extendedBlock, new DatanodeInfo[] { readOnlyDataNode }) + }); + + // There should now be only 1 *location* for the block as the READ_ONLY_SHARED is corrupt + waitForLocations(1); + + // However, the corrupt READ_ONLY_SHARED replica should *not* affect the overall corrupt replicas count + NumberReplicas numberReplicas = blockManager.countNodes(block); + assertThat(numberReplicas.corruptReplicas(), is(0)); + } + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/CreateEditsLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/CreateEditsLog.java index 3358c1a951..db18e2743f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/CreateEditsLog.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/CreateEditsLog.java @@ -57,7 +57,7 @@ public class CreateEditsLog { GenerationStamp.LAST_RESERVED_STAMP; static void addFiles(FSEditLog editLog, int numFiles, short replication, - int blocksPerFile, long startingBlockId, + int blocksPerFile, long startingBlockId, long blockSize, FileNameGenerator nameGenerator) { PermissionStatus p = new PermissionStatus("joeDoe", "people", @@ -66,7 +66,6 @@ static void addFiles(FSEditLog editLog, int numFiles, short replication, INodeDirectory dirInode = new INodeDirectory(inodeId.nextValue(), null, p, 0L); editLog.logMkDir(BASE_PATH, dirInode); - long blockSize = 10; BlockInfo[] blocks = new BlockInfo[blocksPerFile]; for (int iB = 0; iB < blocksPerFile; ++iB) { blocks[iB] = @@ -144,6 +143,7 @@ public static void main(String[] args) int numFiles = 0; short replication = 1; int numBlocksPerFile = 0; + long blockSize = 10; if (args.length == 0) { printUsageExit(); @@ -164,10 +164,16 @@ public static void main(String[] args) if (numFiles <=0 || numBlocksPerFile <= 0) { printUsageExit("numFiles and numBlocksPerFile most be greater than 0"); } + } else if (args[i].equals("-l")) { + if (i + 1 >= args.length) { + printUsageExit( + "Missing block length"); + } + blockSize = Long.parseLong(args[++i]); } else if (args[i].equals("-r") || args[i+1].startsWith("-")) { if (i + 1 >= args.length) { printUsageExit( - "Missing num files, starting block and/or number of blocks"); + "Missing replication factor"); } replication = Short.parseShort(args[++i]); } else if (args[i].equals("-d")) { @@ -202,7 +208,7 @@ public static void main(String[] args) FSEditLog editLog = FSImageTestUtil.createStandaloneEditLog(editsLogDir); editLog.openForWrite(); addFiles(editLog, numFiles, replication, numBlocksPerFile, startingBlockId, - nameGenerator); + blockSize, nameGenerator); editLog.logSync(); editLog.close(); } diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 69f0c66738..e942e6689a 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -29,6 +29,9 @@ Release 2.5.0 - UNRELEASED BUG FIXES + YARN-1718. Fix a couple isTerminals in Fair Scheduler queue placement rules + (Sandy Ryza) + Release 2.4.0 - UNRELEASED INCOMPATIBLE CHANGES @@ -206,6 +209,9 @@ Release 2.4.0 - UNRELEASED available across RM failover by making using of a remote configuration-provider. (Xuan Gong via vinodkv) + YARN-1171. Add default queue properties to Fair Scheduler documentation + (Naren Koneru via Sandy Ryza) + OPTIMIZATIONS BUG FIXES @@ -309,6 +315,9 @@ Release 2.4.0 - UNRELEASED application history store in the transition to the final state. (Contributed by Zhijie Shen) + YARN-713. Fixed ResourceManager to not crash while building tokens when DNS + issues happen transmittently. (Jian He via vinodkv) + Release 2.3.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index 563dbe9156..db81dd86af 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -464,9 +464,11 @@ public AllocateResponse allocate(AllocateRequest request) blacklistAdditions, blacklistRemovals); RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId); - AllocateResponse allocateResponse = recordFactory.newRecordInstance(AllocateResponse.class); + if (!allocation.getContainers().isEmpty()) { + allocateResponse.setNMTokens(allocation.getNMTokens()); + } // update the response with the deltas of node status changes List updatedNodes = new ArrayList(); @@ -505,12 +507,6 @@ public AllocateResponse allocate(AllocateRequest request) allocateResponse .setPreemptionMessage(generatePreemptionMessage(allocation)); - // Adding NMTokens for allocated containers. - if (!allocation.getContainers().isEmpty()) { - allocateResponse.setNMTokens(rmContext.getNMTokenSecretManager() - .createAndGetNMTokens(app.getUser(), appAttemptId, - allocation.getContainers())); - } /* * As we are updating the response inside the lock object so we don't * need to worry about unregister call occurring in between (which diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 8d69d08096..3b845b1b8b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -78,6 +78,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFinishedAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent; @@ -202,7 +203,8 @@ RMAppAttemptEventType.RECOVER, new AttemptRecoveredTransition()) // Transitions from SCHEDULED State .addTransition(RMAppAttemptState.SCHEDULED, - RMAppAttemptState.ALLOCATED_SAVING, + EnumSet.of(RMAppAttemptState.ALLOCATED_SAVING, + RMAppAttemptState.SCHEDULED), RMAppAttemptEventType.CONTAINER_ALLOCATED, new AMContainerAllocatedTransition()) .addTransition(RMAppAttemptState.SCHEDULED, RMAppAttemptState.FINAL_SAVING, @@ -769,8 +771,9 @@ public void transition(RMAppAttemptImpl appAttempt, private static final List EMPTY_CONTAINER_RELEASE_LIST = new ArrayList(); + private static final List EMPTY_CONTAINER_REQUEST_LIST = - new ArrayList(); + new ArrayList(); private static final class ScheduleTransition implements @@ -803,29 +806,57 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, } } - private static final class AMContainerAllocatedTransition - extends BaseTransition { + private static final class AMContainerAllocatedTransition + implements + MultipleArcTransition { @Override - public void transition(RMAppAttemptImpl appAttempt, - RMAppAttemptEvent event) { + public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, + RMAppAttemptEvent event) { // Acquire the AM container from the scheduler. - Allocation amContainerAllocation = appAttempt.scheduler.allocate( - appAttempt.applicationAttemptId, EMPTY_CONTAINER_REQUEST_LIST, - EMPTY_CONTAINER_RELEASE_LIST, null, null); + Allocation amContainerAllocation = + appAttempt.scheduler.allocate(appAttempt.applicationAttemptId, + EMPTY_CONTAINER_REQUEST_LIST, EMPTY_CONTAINER_RELEASE_LIST, null, + null); // There must be at least one container allocated, because a // CONTAINER_ALLOCATED is emitted after an RMContainer is constructed, - // and is put in SchedulerApplication#newlyAllocatedContainers. Then, - // YarnScheduler#allocate will fetch it. - assert amContainerAllocation.getContainers().size() != 0; + // and is put in SchedulerApplication#newlyAllocatedContainers. + + // Note that YarnScheduler#allocate is not guaranteed to be able to + // fetch it since container may not be fetchable for some reason like + // DNS unavailable causing container token not generated. As such, we + // return to the previous state and keep retry until am container is + // fetched. + if (amContainerAllocation.getContainers().size() == 0) { + appAttempt.retryFetchingAMContainer(appAttempt); + return RMAppAttemptState.SCHEDULED; + } // Set the masterContainer - appAttempt.setMasterContainer(amContainerAllocation.getContainers().get( - 0)); + appAttempt.setMasterContainer(amContainerAllocation.getContainers() + .get(0)); appAttempt.getSubmissionContext().setResource( - appAttempt.getMasterContainer().getResource()); + appAttempt.getMasterContainer().getResource()); appAttempt.storeAttempt(); + return RMAppAttemptState.ALLOCATED_SAVING; } } - + + private void retryFetchingAMContainer(final RMAppAttemptImpl appAttempt) { + // start a new thread so that we are not blocking main dispatcher thread. + new Thread() { + @Override + public void run() { + try { + Thread.sleep(500); + } catch (InterruptedException e) { + LOG.warn("Interrupted while waiting to resend the" + + " ContainerAllocated Event."); + } + appAttempt.eventHandler.handle(new RMAppAttemptContainerAllocatedEvent( + appAttempt.applicationAttemptId)); + } + }.start(); + } + private static final class AttemptStoredTransition extends BaseTransition { @Override public void transition(RMAppAttemptImpl appAttempt, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptContainerAllocatedEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptContainerAllocatedEvent.java index e841f7af19..681f38c2c2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptContainerAllocatedEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptContainerAllocatedEvent.java @@ -25,16 +25,7 @@ public class RMAppAttemptContainerAllocatedEvent extends RMAppAttemptEvent { - private final Container container; - - public RMAppAttemptContainerAllocatedEvent(ApplicationAttemptId appAttemptId, - Container container) { + public RMAppAttemptContainerAllocatedEvent(ApplicationAttemptId appAttemptId) { super(appAttemptId, RMAppAttemptEventType.CONTAINER_ALLOCATED); - this.container = container; } - - public Container getContainer() { - return this.container; - } - } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java index 057c9ace7e..57fb703957 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java @@ -340,7 +340,7 @@ private static final class ContainerStartedTransition extends @Override public void transition(RMContainerImpl container, RMContainerEvent event) { container.eventHandler.handle(new RMAppAttemptContainerAllocatedEvent( - container.appAttemptId, container.container)); + container.appAttemptId)); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 4208d1db5e..0f3af41b01 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -31,11 +31,16 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.util.resource.Resources; public abstract class AbstractYarnScheduler implements ResourceScheduler { protected RMContext rmContext; protected Map applications; + protected final static List EMPTY_CONTAINER_LIST = + new ArrayList(); + protected static final Allocation EMPTY_ALLOCATION = new Allocation( + EMPTY_CONTAINER_LIST, Resources.createResource(0), null, null, null); public synchronized List getTransferredContainers( ApplicationAttemptId currentAttempt) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java index c03e31d8a3..3f2d8afd2e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java @@ -22,10 +22,9 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.factories.RecordFactory; -import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; public class Allocation { @@ -34,24 +33,24 @@ public class Allocation { final Set strictContainers; final Set fungibleContainers; final List fungibleResources; - - public Allocation(List containers, Resource resourceLimit) { - this(containers, resourceLimit, null, null, null); - } - - public Allocation(List containers, Resource resourceLimit, - Set strictContainers) { - this(containers, resourceLimit, strictContainers, null, null); - } + final List nmTokens; public Allocation(List containers, Resource resourceLimit, Set strictContainers, Set fungibleContainers, List fungibleResources) { + this(containers, resourceLimit,strictContainers, fungibleContainers, + fungibleResources, null); + } + + public Allocation(List containers, Resource resourceLimit, + Set strictContainers, Set fungibleContainers, + List fungibleResources, List nmTokens) { this.containers = containers; this.resourceLimit = resourceLimit; this.strictContainers = strictContainers; this.fungibleContainers = fungibleContainers; this.fungibleResources = fungibleResources; + this.nmTokens = nmTokens; } public List getContainers() { @@ -74,4 +73,8 @@ public List getResourcePreemptions() { return fungibleResources; } + public List getNMTokens() { + return nmTokens; + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index b1801dc10d..f35f76ff63 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; @@ -33,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; @@ -339,21 +341,61 @@ public Resource getCurrentConsumption() { return currentConsumption; } - public synchronized List pullNewlyAllocatedContainers() { - List returnContainerList = new ArrayList( - newlyAllocatedContainers.size()); - for (RMContainer rmContainer : newlyAllocatedContainers) { - rmContainer.handle(new RMContainerEvent(rmContainer.getContainerId(), - RMContainerEventType.ACQUIRED)); - Container container = rmContainer.getContainer(); - rmContainer.getContainer().setContainerToken( - rmContext.getContainerTokenSecretManager().createContainerToken( - rmContainer.getContainerId(), container.getNodeId(), getUser(), - container.getResource())); - returnContainerList.add(rmContainer.getContainer()); + public static class ContainersAndNMTokensAllocation { + List containerList; + List nmTokenList; + + public ContainersAndNMTokensAllocation(List containerList, + List nmTokenList) { + this.containerList = containerList; + this.nmTokenList = nmTokenList; } - newlyAllocatedContainers.clear(); - return returnContainerList; + + public List getContainerList() { + return containerList; + } + + public List getNMTokenList() { + return nmTokenList; + } + } + + // Create container token and NMToken altogether, if either of them fails for + // some reason like DNS unavailable, do not return this container and keep it + // in the newlyAllocatedContainers waiting to be refetched. + public synchronized ContainersAndNMTokensAllocation + pullNewlyAllocatedContainersAndNMTokens() { + List returnContainerList = + new ArrayList(newlyAllocatedContainers.size()); + List nmTokens = new ArrayList(); + for (Iterator i = newlyAllocatedContainers.iterator(); i + .hasNext();) { + RMContainer rmContainer = i.next(); + Container container = rmContainer.getContainer(); + try { + // create container token and NMToken altogether. + container.setContainerToken(rmContext.getContainerTokenSecretManager() + .createContainerToken(container.getId(), container.getNodeId(), + getUser(), container.getResource())); + NMToken nmToken = + rmContext.getNMTokenSecretManager().createAndGetNMToken(getUser(), + getApplicationAttemptId(), container); + if (nmToken != null) { + nmTokens.add(nmToken); + } + } catch (IllegalArgumentException e) { + // DNS might be down, skip returning this container. + LOG.error( + "Error trying to assign container token to allocated container " + + container.getId(), e); + continue; + } + returnContainerList.add(container); + i.remove(); + rmContainer.handle(new RMContainerEvent(rmContainer.getContainerId(), + RMContainerEventType.ACQUIRED)); + } + return new ContainersAndNMTokensAllocation(returnContainerList, nmTokens); } public synchronized void updateBlacklist( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 6826c4941f..b8f2376358 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -104,9 +104,6 @@ public class CapacityScheduler extends AbstractYarnScheduler private CSQueue root; - private final static List EMPTY_CONTAINER_LIST = - new ArrayList(); - static final Comparator queueComparator = new Comparator() { @Override public int compare(CSQueue q1, CSQueue q2) { @@ -557,9 +554,6 @@ private synchronized void doneApplicationAttempt( } } - private static final Allocation EMPTY_ALLOCATION = - new Allocation(EMPTY_CONTAINER_LIST, Resources.createResource(0, 0)); - @Override @Lock(Lock.NoLock.class) public Allocation allocate(ApplicationAttemptId applicationAttemptId, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 4be6b941d1..470cb106f1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -237,9 +237,11 @@ public synchronized Allocation getAllocation(ResourceCalculator rc, ResourceRequest rr = ResourceRequest.newInstance( Priority.UNDEFINED, ResourceRequest.ANY, minimumAllocation, numCont); - return new Allocation(pullNewlyAllocatedContainers(), getHeadroom(), - null, currentContPreemption, - Collections.singletonList(rr)); + ContainersAndNMTokensAllocation allocation = + pullNewlyAllocatedContainersAndNMTokens(); + return new Allocation(allocation.getContainerList(), getHeadroom(), null, + currentContPreemption, Collections.singletonList(rr), + allocation.getNMTokenList()); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index e23de7b3e9..a852d7b924 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -78,6 +78,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.ContainersAndNMTokensAllocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; @@ -143,12 +144,6 @@ public class FairScheduler extends AbstractYarnScheduler { // How often fair shares are re-calculated (ms) protected long UPDATE_INTERVAL = 500; - private final static List EMPTY_CONTAINER_LIST = - new ArrayList(); - - private static final Allocation EMPTY_ALLOCATION = - new Allocation(EMPTY_CONTAINER_LIST, Resources.createResource(0)); - // Aggregate metrics FSQueueMetrics rootMetrics; @@ -922,9 +917,11 @@ public Allocation allocate(ApplicationAttemptId appAttemptId, } application.updateBlacklist(blacklistAdditions, blacklistRemovals); - - return new Allocation(application.pullNewlyAllocatedContainers(), - application.getHeadroom(), preemptionContainerIds); + ContainersAndNMTokensAllocation allocation = + application.pullNewlyAllocatedContainersAndNMTokens(); + return new Allocation(allocation.getContainerList(), + application.getHeadroom(), preemptionContainerIds, null, null, + allocation.getNMTokenList()); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementRule.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementRule.java index ac0df50954..6acba27479 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementRule.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementRule.java @@ -162,7 +162,7 @@ protected String getQueueForApp(String requestedQueue, @Override public boolean isTerminal() { - return create; + return false; } } @@ -201,7 +201,7 @@ protected String getQueueForApp(String requestedQueue, String user, @Override public boolean isTerminal() { - return create; + return true; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index a2e01345ab..61628f95dc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -50,7 +50,6 @@ import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; @@ -80,6 +79,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.ContainersAndNMTokensAllocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; @@ -114,9 +114,6 @@ public class FifoScheduler extends AbstractYarnScheduler implements Configuration conf; - private final static Container[] EMPTY_CONTAINER_ARRAY = new Container[] {}; - private final static List EMPTY_CONTAINER_LIST = Arrays.asList(EMPTY_CONTAINER_ARRAY); - protected Map nodes = new ConcurrentHashMap(); private boolean initialized; @@ -264,8 +261,7 @@ public Resource getMaximumResourceCapability() { } } - private static final Allocation EMPTY_ALLOCATION = - new Allocation(EMPTY_CONTAINER_LIST, Resources.createResource(0)); + @Override public Allocation allocate( ApplicationAttemptId applicationAttemptId, List ask, @@ -328,10 +324,11 @@ public Allocation allocate( } application.updateBlacklist(blacklistAdditions, blacklistRemovals); - - return new Allocation( - application.pullNewlyAllocatedContainers(), - application.getHeadroom()); + ContainersAndNMTokensAllocation allocation = + application.pullNewlyAllocatedContainersAndNMTokens(); + return new Allocation(allocation.getContainerList(), + application.getHeadroom(), null, null, null, + allocation.getNMTokenList()); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java index ab31eaf3af..9ec7b690b5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java @@ -18,10 +18,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.security; -import java.util.ArrayList; import java.util.HashSet; import java.util.Iterator; -import java.util.List; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; @@ -177,35 +175,39 @@ public void run() { activateNextMasterKey(); } } - - public List createAndGetNMTokens(String applicationSubmitter, - ApplicationAttemptId appAttemptId, List containers) { + + public NMToken createAndGetNMToken(String applicationSubmitter, + ApplicationAttemptId appAttemptId, Container container) { try { this.readLock.lock(); - List nmTokens = new ArrayList(); HashSet nodeSet = this.appAttemptToNodeKeyMap.get(appAttemptId); + NMToken nmToken = null; if (nodeSet != null) { - for (Container container : containers) { - if (!nodeSet.contains(container.getNodeId())) { + if (!nodeSet.contains(container.getNodeId())) { + if (LOG.isDebugEnabled()) { LOG.debug("Sending NMToken for nodeId : " + container.getNodeId().toString() + " for application attempt : " + appAttemptId.toString()); - Token token = createNMToken(appAttemptId, container.getNodeId(), - applicationSubmitter); - NMToken nmToken = - NMToken.newInstance(container.getNodeId(), token); - nmTokens.add(nmToken); - // This will update the nmToken set. + } + Token token = + createNMToken(container.getId().getApplicationAttemptId(), + container.getNodeId(), applicationSubmitter); + nmToken = NMToken.newInstance(container.getNodeId(), token); + // The node set here is used for differentiating whether the NMToken + // has been issued for this node from the client's perspective. If + // this is an AM container, the NMToken is issued only for RM and so + // we should not update the node set. + if (container.getId().getId() != 1) { nodeSet.add(container.getNodeId()); } } } - return nmTokens; + return nmToken; } finally { this.readLock.unlock(); } } - + public void registerApplicationAttempt(ApplicationAttemptId appAttemptId) { try { this.writeLock.lock(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java index a34a42b5ad..dd57cf4ace 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java @@ -598,8 +598,7 @@ private Container allocateApplicationAttempt() { applicationAttempt.handle( new RMAppAttemptContainerAllocatedEvent( - applicationAttempt.getAppAttemptId(), - container)); + applicationAttempt.getAppAttemptId())); assertEquals(RMAppAttemptState.ALLOCATED_SAVING, applicationAttempt.getAppAttemptState()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java index 0e3bdeb2d4..86e1b1e569 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java @@ -25,20 +25,29 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.SecurityUtilTestHelper; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; +import org.apache.hadoop.yarn.server.resourcemanager.RMSecretManagerService; import org.apache.hadoop.yarn.server.resourcemanager.TestFifoScheduler; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.junit.Test; @@ -149,4 +158,92 @@ public void testContainerTokenGeneratedOnPullRequest() throws Exception { Assert.assertNotNull(containers.get(0).getContainerToken()); rm1.stop(); } + + @Test + public void testNormalContainerAllocationWhenDNSUnavailable() throws Exception{ + YarnConfiguration conf = new YarnConfiguration(); + MockRM rm1 = new MockRM(conf); + rm1.start(); + MockNM nm1 = rm1.registerNode("unknownhost:1234", 8000); + RMApp app1 = rm1.submitApp(200); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + // request a container. + am1.allocate("127.0.0.1", 1024, 1, new ArrayList()); + ContainerId containerId2 = + ContainerId.newInstance(am1.getApplicationAttemptId(), 2); + rm1.waitForState(nm1, containerId2, RMContainerState.ALLOCATED); + + // acquire the container. + SecurityUtilTestHelper.setTokenServiceUseIp(true); + List containers = + am1.allocate(new ArrayList(), + new ArrayList()).getAllocatedContainers(); + // not able to fetch the container; + Assert.assertEquals(0, containers.size()); + + SecurityUtilTestHelper.setTokenServiceUseIp(false); + containers = + am1.allocate(new ArrayList(), + new ArrayList()).getAllocatedContainers(); + // should be able to fetch the container; + Assert.assertEquals(1, containers.size()); + } + + private volatile int numRetries = 0; + private class TestRMSecretManagerService extends RMSecretManagerService { + + public TestRMSecretManagerService(Configuration conf, + RMContextImpl rmContext) { + super(conf, rmContext); + } + @Override + protected RMContainerTokenSecretManager createContainerTokenSecretManager( + Configuration conf) { + return new RMContainerTokenSecretManager(conf) { + + @Override + public Token createContainerToken(ContainerId containerId, + NodeId nodeId, String appSubmitter, Resource capability) { + numRetries++; + return super.createContainerToken(containerId, nodeId, appSubmitter, + capability); + } + }; + } + } + + // This is to test fetching AM container will be retried, if AM container is + // not fetchable since DNS is unavailable causing container token/NMtoken + // creation failure. + @Test(timeout = 20000) + public void testAMContainerAllocationWhenDNSUnavailable() throws Exception { + final YarnConfiguration conf = new YarnConfiguration(); + MockRM rm1 = new MockRM(conf) { + @Override + protected RMSecretManagerService createRMSecretManagerService() { + return new TestRMSecretManagerService(conf, rmContext); + } + }; + rm1.start(); + + MockNM nm1 = rm1.registerNode("unknownhost:1234", 8000); + SecurityUtilTestHelper.setTokenServiceUseIp(true); + RMApp app1 = rm1.submitApp(200); + RMAppAttempt attempt = app1.getCurrentAppAttempt(); + nm1.nodeHeartbeat(true); + + // fetching am container will fail, keep retrying 5 times. + while (numRetries <= 5) { + nm1.nodeHeartbeat(true); + Thread.sleep(1000); + Assert.assertEquals(RMAppAttemptState.SCHEDULED, + attempt.getAppAttemptState()); + System.out.println("Waiting for am container to be allocated."); + } + + SecurityUtilTestHelper.setTokenServiceUseIp(false); + rm1.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.ALLOCATED); + MockRM.launchAndRegisterAM(app1, rm1, nm1); + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueuePlacementPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueuePlacementPolicy.java index 5b5a51fa78..fd807c9d7e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueuePlacementPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueuePlacementPolicy.java @@ -106,6 +106,17 @@ public void testTerminalRuleInMiddle() throws Exception { parse(sb.toString()); } + @Test + public void testTerminals() throws Exception { + // Should make it through without an exception + StringBuffer sb = new StringBuffer(); + sb.append(""); + sb.append(" "); + sb.append(" "); + sb.append(""); + parse(sb.toString()); + } + private QueuePlacementPolicy parse(String str) throws Exception { // Read and parse the allocations file. DocumentBuilderFactory docBuilderFactory = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm index 97436fb8e5..32bb0b80dd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm @@ -265,7 +265,15 @@ Allocation file format its fair share before it will try to preempt containers to take resources from other queues. - * <>, which sets the default scheduling + * <>, which sets the default number + of seconds the queue is under its minimum share before it will try to preempt + containers to take resources from other queues; overriden by + minSharePreemptionTimeout element in each queue if specified. + + * <>, which sets the default running app limit + for queues; overriden by maxRunningApps element in each queue. + + * <>, which sets the default scheduling policy for queues; overriden by the schedulingPolicy element in each queue if specified. Defaults to "fair".