diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 9babe96f51..9fa06d4966 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -340,6 +340,8 @@ Trunk (Unreleased) HDFS-7088. Archival Storage: fix TestBalancer and TestBalancerWithMultipleNameNodes. (szetszwo via jing9) + HDFS-7095. TestStorageMover often fails in Jenkins. (jing9) + Release 2.6.0 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java index 3952c39159..63bc6a1bce 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java @@ -69,7 +69,7 @@ public class FsVolumeImpl implements FsVolumeSpi { // Capacity configured. This is useful when we want to // limit the visible capacity for tests. If negative, then we just // query from the filesystem. - protected long configuredCapacity; + protected volatile long configuredCapacity; /** * Per-volume worker pool that processes new blocks to cache. @@ -129,7 +129,7 @@ void decDfsUsed(String bpid, long value) { } } } - + long getDfsUsed() throws IOException { long dfsUsed = 0; synchronized(dataset) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java index f1837ae0d4..858db1d684 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java @@ -75,8 +75,10 @@ private StorageMap() { private void add(Source source, StorageGroup target) { sources.put(source); - targets.put(target); - getTargetStorages(target.getStorageType()).add(target); + if (target != null) { + targets.put(target); + getTargetStorages(target.getStorageType()).add(target); + } } private Source getSource(MLocation ml) { @@ -126,12 +128,11 @@ void init() throws IOException { for(DatanodeStorageReport r : reports) { final DDatanode dn = dispatcher.newDatanode(r.getDatanodeInfo()); for(StorageType t : StorageType.asList()) { + final Source source = dn.addSource(t, Long.MAX_VALUE, dispatcher); final long maxRemaining = getMaxRemaining(r, t); - if (maxRemaining > 0L) { - final Source source = dn.addSource(t, Long.MAX_VALUE, dispatcher); - final StorageGroup target = dn.addTarget(t, maxRemaining); - storages.add(source, target); - } + final StorageGroup target = maxRemaining > 0L ? dn.addTarget(t, + maxRemaining) : null; + storages.add(source, target); } } } @@ -155,7 +156,10 @@ private ExitStatus run() { DBlock newDBlock(Block block, List locations) { final DBlock db = new DBlock(block); for(MLocation ml : locations) { - db.addLocation(storages.getTarget(ml)); + StorageGroup source = storages.getSource(ml); + if (source != null) { + db.addLocation(source); + } } return db; } @@ -349,7 +353,7 @@ boolean scheduleMoves4Block(StorageTypeDiff diff, LocatedBlock lb) { for (final StorageType t : diff.existing) { for (final MLocation ml : locations) { final Source source = storages.getSource(ml); - if (ml.storageType == t) { + if (ml.storageType == t && source != null) { // try to schedule one replica move. if (scheduleMoveReplica(db, source, diff.expected)) { return true; @@ -363,7 +367,9 @@ boolean scheduleMoves4Block(StorageTypeDiff diff, LocatedBlock lb) { @VisibleForTesting boolean scheduleMoveReplica(DBlock db, MLocation ml, List targetTypes) { - return scheduleMoveReplica(db, storages.getSource(ml), targetTypes); + final Source source = storages.getSource(ml); + return source == null ? false : scheduleMoveReplica(db, + storages.getSource(ml), targetTypes); } boolean scheduleMoveReplica(DBlock db, Source source, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestStorageMover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestStorageMover.java index ad813cb7fe..e40f142e5a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestStorageMover.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestStorageMover.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdfs.server.mover; +import java.io.File; import java.io.IOException; import java.net.URI; import java.util.ArrayList; @@ -26,10 +27,12 @@ import java.util.List; import java.util.Map; +import com.google.common.base.Joiner; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.ReconfigurationException; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; @@ -55,6 +58,8 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl; import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper; import org.apache.hadoop.io.IOUtils; import org.apache.log4j.Level; @@ -599,6 +604,18 @@ private void waitForAllReplicas(int expectedReplicaNum, Path file, } } + private void setVolumeFull(DataNode dn, StorageType type) { + List volumes = dn.getFSDataset().getVolumes(); + for (int j = 0; j < volumes.size(); ++j) { + FsVolumeImpl volume = (FsVolumeImpl) volumes.get(j); + if (volume.getStorageType() == type) { + LOG.info("setCapacity to 0 for [" + volume.getStorageType() + "]" + + volume.getStorageID()); + volume.setCapacityForTesting(0); + } + } + } + /** * Test DISK is running out of spaces. */ @@ -608,76 +625,51 @@ public void testNoSpaceDisk() throws Exception { final PathPolicyMap pathPolicyMap = new PathPolicyMap(0); final NamespaceScheme nsScheme = pathPolicyMap.newNamespaceScheme(); - final long diskCapacity = (6 + HdfsConstants.MIN_BLOCKS_FOR_WRITE) - * BLOCK_SIZE; - final long archiveCapacity = 100 * BLOCK_SIZE; - final long[][] capacities = genCapacities(NUM_DATANODES, 1, 1, - diskCapacity, archiveCapacity); Configuration conf = new Configuration(DEFAULT_CONF); final ClusterScheme clusterScheme = new ClusterScheme(conf, - NUM_DATANODES, REPL, genStorageTypes(NUM_DATANODES, 1, 1), capacities); + NUM_DATANODES, REPL, genStorageTypes(NUM_DATANODES), null); final MigrationTest test = new MigrationTest(clusterScheme, nsScheme); try { test.runBasicTest(false); - // create hot files with replication 3 until not more spaces. + // create 2 hot files with replication 3 final short replication = 3; - { - int hotFileCount = 0; - try { - for (; ; hotFileCount++) { - final Path p = new Path(pathPolicyMap.hot, "file" + hotFileCount); - DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, replication, 0L); - waitForAllReplicas(replication, p, test.dfs); - } - } catch (IOException e) { - LOG.info("Expected: hotFileCount=" + hotFileCount, e); - } - Assert.assertTrue(hotFileCount >= 1); - } - - // create hot files with replication 1 to use up all remaining spaces. - { - int hotFileCount_r1 = 0; - try { - for (; ; hotFileCount_r1++) { - final Path p = new Path(pathPolicyMap.hot, "file_r1_" + hotFileCount_r1); - DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, (short) 1, 0L); - waitForAllReplicas(1, p, test.dfs); - } - } catch (IOException e) { - LOG.info("Expected: hotFileCount_r1=" + hotFileCount_r1, e); - } - } - - { // test increasing replication. Since DISK is full, - // new replicas should be stored in ARCHIVE as a fallback storage. - final Path file0 = new Path(pathPolicyMap.hot, "file0"); - final Replication r = test.getReplication(file0); - final short newReplication = (short) 5; - test.dfs.setReplication(file0, newReplication); - Thread.sleep(10000); - test.verifyReplication(file0, r.disk, newReplication - r.disk); - } - - { // test creating a cold file and then increase replication - final Path p = new Path(pathPolicyMap.cold, "foo"); + for (int i = 0; i < 2; i++) { + final Path p = new Path(pathPolicyMap.hot, "file" + i); DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, replication, 0L); - test.verifyReplication(p, 0, replication); - - final short newReplication = 5; - test.dfs.setReplication(p, newReplication); - Thread.sleep(10000); - test.verifyReplication(p, 0, newReplication); + waitForAllReplicas(replication, p, test.dfs); } - { //test move a hot file to warm - final Path file1 = new Path(pathPolicyMap.hot, "file1"); - test.dfs.rename(file1, pathPolicyMap.warm); - test.migrate(); - test.verifyFile(new Path(pathPolicyMap.warm, "file1"), WARM.getId()); + // set all the DISK volume to full + for (DataNode dn : test.cluster.getDataNodes()) { + setVolumeFull(dn, StorageType.DISK); + DataNodeTestUtils.triggerHeartbeat(dn); } + + // test increasing replication. Since DISK is full, + // new replicas should be stored in ARCHIVE as a fallback storage. + final Path file0 = new Path(pathPolicyMap.hot, "file0"); + final Replication r = test.getReplication(file0); + final short newReplication = (short) 5; + test.dfs.setReplication(file0, newReplication); + Thread.sleep(10000); + test.verifyReplication(file0, r.disk, newReplication - r.disk); + + // test creating a cold file and then increase replication + final Path p = new Path(pathPolicyMap.cold, "foo"); + DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, replication, 0L); + test.verifyReplication(p, 0, replication); + + test.dfs.setReplication(p, newReplication); + Thread.sleep(10000); + test.verifyReplication(p, 0, newReplication); + + //test move a hot file to warm + final Path file1 = new Path(pathPolicyMap.hot, "file1"); + test.dfs.rename(file1, pathPolicyMap.warm); + test.migrate(); + test.verifyFile(new Path(pathPolicyMap.warm, "file1"), WARM.getId()); } finally { test.shutdownCluster(); } @@ -692,53 +684,31 @@ public void testNoSpaceArchive() throws Exception { final PathPolicyMap pathPolicyMap = new PathPolicyMap(0); final NamespaceScheme nsScheme = pathPolicyMap.newNamespaceScheme(); - final long diskCapacity = 100 * BLOCK_SIZE; - final long archiveCapacity = (6 + HdfsConstants.MIN_BLOCKS_FOR_WRITE) - * BLOCK_SIZE; - final long[][] capacities = genCapacities(NUM_DATANODES, 1, 1, - diskCapacity, archiveCapacity); final ClusterScheme clusterScheme = new ClusterScheme(DEFAULT_CONF, - NUM_DATANODES, REPL, genStorageTypes(NUM_DATANODES, 1, 1), capacities); + NUM_DATANODES, REPL, genStorageTypes(NUM_DATANODES), null); final MigrationTest test = new MigrationTest(clusterScheme, nsScheme); try { test.runBasicTest(false); - // create cold files with replication 3 until not more spaces. + // create 2 hot files with replication 3 final short replication = 3; - { - int coldFileCount = 0; - try { - for (; ; coldFileCount++) { - final Path p = new Path(pathPolicyMap.cold, "file" + coldFileCount); - DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, replication, 0L); - waitForAllReplicas(replication, p, test.dfs); - } - } catch (IOException e) { - LOG.info("Expected: coldFileCount=" + coldFileCount, e); - } - Assert.assertTrue(coldFileCount >= 1); + for (int i = 0; i < 2; i++) { + final Path p = new Path(pathPolicyMap.cold, "file" + i); + DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, replication, 0L); + waitForAllReplicas(replication, p, test.dfs); } - // create cold files with replication 1 to use up all remaining spaces. - { - int coldFileCount_r1 = 0; - try { - for (; ; coldFileCount_r1++) { - final Path p = new Path(pathPolicyMap.cold, "file_r1_" + coldFileCount_r1); - DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, (short) 1, 0L); - waitForAllReplicas(1, p, test.dfs); - } - } catch (IOException e) { - LOG.info("Expected: coldFileCount_r1=" + coldFileCount_r1, e); - } + // set all the ARCHIVE volume to full + for (DataNode dn : test.cluster.getDataNodes()) { + setVolumeFull(dn, StorageType.ARCHIVE); + DataNodeTestUtils.triggerHeartbeat(dn); } { // test increasing replication but new replicas cannot be created // since no more ARCHIVE space. final Path file0 = new Path(pathPolicyMap.cold, "file0"); final Replication r = test.getReplication(file0); - LOG.info("XXX " + file0 + ": replication=" + r); Assert.assertEquals(0, r.disk); final short newReplication = (short) 5;