Merge branch 'trunk' into HDFS-6581

This commit is contained in:
arp 2014-09-20 16:13:17 -07:00
commit e257b6dbb1
4 changed files with 80 additions and 102 deletions

View File

@ -340,6 +340,8 @@ Trunk (Unreleased)
HDFS-7088. Archival Storage: fix TestBalancer and HDFS-7088. Archival Storage: fix TestBalancer and
TestBalancerWithMultipleNameNodes. (szetszwo via jing9) TestBalancerWithMultipleNameNodes. (szetszwo via jing9)
HDFS-7095. TestStorageMover often fails in Jenkins. (jing9)
Release 2.6.0 - UNRELEASED Release 2.6.0 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -69,7 +69,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
// Capacity configured. This is useful when we want to // Capacity configured. This is useful when we want to
// limit the visible capacity for tests. If negative, then we just // limit the visible capacity for tests. If negative, then we just
// query from the filesystem. // query from the filesystem.
protected long configuredCapacity; protected volatile long configuredCapacity;
/** /**
* Per-volume worker pool that processes new blocks to cache. * Per-volume worker pool that processes new blocks to cache.

View File

@ -75,8 +75,10 @@ private StorageMap() {
private void add(Source source, StorageGroup target) { private void add(Source source, StorageGroup target) {
sources.put(source); sources.put(source);
targets.put(target); if (target != null) {
getTargetStorages(target.getStorageType()).add(target); targets.put(target);
getTargetStorages(target.getStorageType()).add(target);
}
} }
private Source getSource(MLocation ml) { private Source getSource(MLocation ml) {
@ -126,12 +128,11 @@ void init() throws IOException {
for(DatanodeStorageReport r : reports) { for(DatanodeStorageReport r : reports) {
final DDatanode dn = dispatcher.newDatanode(r.getDatanodeInfo()); final DDatanode dn = dispatcher.newDatanode(r.getDatanodeInfo());
for(StorageType t : StorageType.asList()) { for(StorageType t : StorageType.asList()) {
final Source source = dn.addSource(t, Long.MAX_VALUE, dispatcher);
final long maxRemaining = getMaxRemaining(r, t); final long maxRemaining = getMaxRemaining(r, t);
if (maxRemaining > 0L) { final StorageGroup target = maxRemaining > 0L ? dn.addTarget(t,
final Source source = dn.addSource(t, Long.MAX_VALUE, dispatcher); maxRemaining) : null;
final StorageGroup target = dn.addTarget(t, maxRemaining); storages.add(source, target);
storages.add(source, target);
}
} }
} }
} }
@ -155,7 +156,10 @@ private ExitStatus run() {
DBlock newDBlock(Block block, List<MLocation> locations) { DBlock newDBlock(Block block, List<MLocation> locations) {
final DBlock db = new DBlock(block); final DBlock db = new DBlock(block);
for(MLocation ml : locations) { for(MLocation ml : locations) {
db.addLocation(storages.getTarget(ml)); StorageGroup source = storages.getSource(ml);
if (source != null) {
db.addLocation(source);
}
} }
return db; return db;
} }
@ -349,7 +353,7 @@ boolean scheduleMoves4Block(StorageTypeDiff diff, LocatedBlock lb) {
for (final StorageType t : diff.existing) { for (final StorageType t : diff.existing) {
for (final MLocation ml : locations) { for (final MLocation ml : locations) {
final Source source = storages.getSource(ml); final Source source = storages.getSource(ml);
if (ml.storageType == t) { if (ml.storageType == t && source != null) {
// try to schedule one replica move. // try to schedule one replica move.
if (scheduleMoveReplica(db, source, diff.expected)) { if (scheduleMoveReplica(db, source, diff.expected)) {
return true; return true;
@ -363,7 +367,9 @@ boolean scheduleMoves4Block(StorageTypeDiff diff, LocatedBlock lb) {
@VisibleForTesting @VisibleForTesting
boolean scheduleMoveReplica(DBlock db, MLocation ml, boolean scheduleMoveReplica(DBlock db, MLocation ml,
List<StorageType> targetTypes) { List<StorageType> targetTypes) {
return scheduleMoveReplica(db, storages.getSource(ml), targetTypes); final Source source = storages.getSource(ml);
return source == null ? false : scheduleMoveReplica(db,
storages.getSource(ml), targetTypes);
} }
boolean scheduleMoveReplica(DBlock db, Source source, boolean scheduleMoveReplica(DBlock db, Source source,

View File

@ -17,6 +17,7 @@
*/ */
package org.apache.hadoop.hdfs.server.mover; package org.apache.hadoop.hdfs.server.mover;
import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.util.ArrayList; import java.util.ArrayList;
@ -26,10 +27,12 @@
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import com.google.common.base.Joiner;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger; import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.ReconfigurationException;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -55,6 +58,8 @@
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper; import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.log4j.Level; import org.apache.log4j.Level;
@ -599,6 +604,18 @@ private void waitForAllReplicas(int expectedReplicaNum, Path file,
} }
} }
private void setVolumeFull(DataNode dn, StorageType type) {
List<? extends FsVolumeSpi> volumes = dn.getFSDataset().getVolumes();
for (int j = 0; j < volumes.size(); ++j) {
FsVolumeImpl volume = (FsVolumeImpl) volumes.get(j);
if (volume.getStorageType() == type) {
LOG.info("setCapacity to 0 for [" + volume.getStorageType() + "]"
+ volume.getStorageID());
volume.setCapacityForTesting(0);
}
}
}
/** /**
* Test DISK is running out of spaces. * Test DISK is running out of spaces.
*/ */
@ -608,76 +625,51 @@ public void testNoSpaceDisk() throws Exception {
final PathPolicyMap pathPolicyMap = new PathPolicyMap(0); final PathPolicyMap pathPolicyMap = new PathPolicyMap(0);
final NamespaceScheme nsScheme = pathPolicyMap.newNamespaceScheme(); final NamespaceScheme nsScheme = pathPolicyMap.newNamespaceScheme();
final long diskCapacity = (6 + HdfsConstants.MIN_BLOCKS_FOR_WRITE)
* BLOCK_SIZE;
final long archiveCapacity = 100 * BLOCK_SIZE;
final long[][] capacities = genCapacities(NUM_DATANODES, 1, 1,
diskCapacity, archiveCapacity);
Configuration conf = new Configuration(DEFAULT_CONF); Configuration conf = new Configuration(DEFAULT_CONF);
final ClusterScheme clusterScheme = new ClusterScheme(conf, final ClusterScheme clusterScheme = new ClusterScheme(conf,
NUM_DATANODES, REPL, genStorageTypes(NUM_DATANODES, 1, 1), capacities); NUM_DATANODES, REPL, genStorageTypes(NUM_DATANODES), null);
final MigrationTest test = new MigrationTest(clusterScheme, nsScheme); final MigrationTest test = new MigrationTest(clusterScheme, nsScheme);
try { try {
test.runBasicTest(false); test.runBasicTest(false);
// create hot files with replication 3 until not more spaces. // create 2 hot files with replication 3
final short replication = 3; final short replication = 3;
{ for (int i = 0; i < 2; i++) {
int hotFileCount = 0; final Path p = new Path(pathPolicyMap.hot, "file" + i);
try {
for (; ; hotFileCount++) {
final Path p = new Path(pathPolicyMap.hot, "file" + hotFileCount);
DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, replication, 0L);
waitForAllReplicas(replication, p, test.dfs);
}
} catch (IOException e) {
LOG.info("Expected: hotFileCount=" + hotFileCount, e);
}
Assert.assertTrue(hotFileCount >= 1);
}
// create hot files with replication 1 to use up all remaining spaces.
{
int hotFileCount_r1 = 0;
try {
for (; ; hotFileCount_r1++) {
final Path p = new Path(pathPolicyMap.hot, "file_r1_" + hotFileCount_r1);
DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, (short) 1, 0L);
waitForAllReplicas(1, p, test.dfs);
}
} catch (IOException e) {
LOG.info("Expected: hotFileCount_r1=" + hotFileCount_r1, e);
}
}
{ // test increasing replication. Since DISK is full,
// new replicas should be stored in ARCHIVE as a fallback storage.
final Path file0 = new Path(pathPolicyMap.hot, "file0");
final Replication r = test.getReplication(file0);
final short newReplication = (short) 5;
test.dfs.setReplication(file0, newReplication);
Thread.sleep(10000);
test.verifyReplication(file0, r.disk, newReplication - r.disk);
}
{ // test creating a cold file and then increase replication
final Path p = new Path(pathPolicyMap.cold, "foo");
DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, replication, 0L); DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, replication, 0L);
test.verifyReplication(p, 0, replication); waitForAllReplicas(replication, p, test.dfs);
final short newReplication = 5;
test.dfs.setReplication(p, newReplication);
Thread.sleep(10000);
test.verifyReplication(p, 0, newReplication);
} }
{ //test move a hot file to warm // set all the DISK volume to full
final Path file1 = new Path(pathPolicyMap.hot, "file1"); for (DataNode dn : test.cluster.getDataNodes()) {
test.dfs.rename(file1, pathPolicyMap.warm); setVolumeFull(dn, StorageType.DISK);
test.migrate(); DataNodeTestUtils.triggerHeartbeat(dn);
test.verifyFile(new Path(pathPolicyMap.warm, "file1"), WARM.getId());
} }
// test increasing replication. Since DISK is full,
// new replicas should be stored in ARCHIVE as a fallback storage.
final Path file0 = new Path(pathPolicyMap.hot, "file0");
final Replication r = test.getReplication(file0);
final short newReplication = (short) 5;
test.dfs.setReplication(file0, newReplication);
Thread.sleep(10000);
test.verifyReplication(file0, r.disk, newReplication - r.disk);
// test creating a cold file and then increase replication
final Path p = new Path(pathPolicyMap.cold, "foo");
DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, replication, 0L);
test.verifyReplication(p, 0, replication);
test.dfs.setReplication(p, newReplication);
Thread.sleep(10000);
test.verifyReplication(p, 0, newReplication);
//test move a hot file to warm
final Path file1 = new Path(pathPolicyMap.hot, "file1");
test.dfs.rename(file1, pathPolicyMap.warm);
test.migrate();
test.verifyFile(new Path(pathPolicyMap.warm, "file1"), WARM.getId());
} finally { } finally {
test.shutdownCluster(); test.shutdownCluster();
} }
@ -692,53 +684,31 @@ public void testNoSpaceArchive() throws Exception {
final PathPolicyMap pathPolicyMap = new PathPolicyMap(0); final PathPolicyMap pathPolicyMap = new PathPolicyMap(0);
final NamespaceScheme nsScheme = pathPolicyMap.newNamespaceScheme(); final NamespaceScheme nsScheme = pathPolicyMap.newNamespaceScheme();
final long diskCapacity = 100 * BLOCK_SIZE;
final long archiveCapacity = (6 + HdfsConstants.MIN_BLOCKS_FOR_WRITE)
* BLOCK_SIZE;
final long[][] capacities = genCapacities(NUM_DATANODES, 1, 1,
diskCapacity, archiveCapacity);
final ClusterScheme clusterScheme = new ClusterScheme(DEFAULT_CONF, final ClusterScheme clusterScheme = new ClusterScheme(DEFAULT_CONF,
NUM_DATANODES, REPL, genStorageTypes(NUM_DATANODES, 1, 1), capacities); NUM_DATANODES, REPL, genStorageTypes(NUM_DATANODES), null);
final MigrationTest test = new MigrationTest(clusterScheme, nsScheme); final MigrationTest test = new MigrationTest(clusterScheme, nsScheme);
try { try {
test.runBasicTest(false); test.runBasicTest(false);
// create cold files with replication 3 until not more spaces. // create 2 hot files with replication 3
final short replication = 3; final short replication = 3;
{ for (int i = 0; i < 2; i++) {
int coldFileCount = 0; final Path p = new Path(pathPolicyMap.cold, "file" + i);
try { DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, replication, 0L);
for (; ; coldFileCount++) { waitForAllReplicas(replication, p, test.dfs);
final Path p = new Path(pathPolicyMap.cold, "file" + coldFileCount);
DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, replication, 0L);
waitForAllReplicas(replication, p, test.dfs);
}
} catch (IOException e) {
LOG.info("Expected: coldFileCount=" + coldFileCount, e);
}
Assert.assertTrue(coldFileCount >= 1);
} }
// create cold files with replication 1 to use up all remaining spaces. // set all the ARCHIVE volume to full
{ for (DataNode dn : test.cluster.getDataNodes()) {
int coldFileCount_r1 = 0; setVolumeFull(dn, StorageType.ARCHIVE);
try { DataNodeTestUtils.triggerHeartbeat(dn);
for (; ; coldFileCount_r1++) {
final Path p = new Path(pathPolicyMap.cold, "file_r1_" + coldFileCount_r1);
DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, (short) 1, 0L);
waitForAllReplicas(1, p, test.dfs);
}
} catch (IOException e) {
LOG.info("Expected: coldFileCount_r1=" + coldFileCount_r1, e);
}
} }
{ // test increasing replication but new replicas cannot be created { // test increasing replication but new replicas cannot be created
// since no more ARCHIVE space. // since no more ARCHIVE space.
final Path file0 = new Path(pathPolicyMap.cold, "file0"); final Path file0 = new Path(pathPolicyMap.cold, "file0");
final Replication r = test.getReplication(file0); final Replication r = test.getReplication(file0);
LOG.info("XXX " + file0 + ": replication=" + r);
Assert.assertEquals(0, r.disk); Assert.assertEquals(0, r.disk);
final short newReplication = (short) 5; final short newReplication = (short) 5;