HDFS-9608. Disk IO imbalance in HDFS with heterogeneous storages. Contributed by Wei Zhou.

This commit is contained in:
Andrew Wang 2016-02-17 11:29:10 -08:00
parent 77f7ca3e94
commit 3a23dc683c
4 changed files with 131 additions and 16 deletions

View File

@ -1021,6 +1021,9 @@ Release 2.9.0 - UNRELEASED
HDFS-9691. TestBlockManagerSafeMode#testCheckSafeMode fails intermittently. HDFS-9691. TestBlockManagerSafeMode#testCheckSafeMode fails intermittently.
(Mingliang Liu via aajisaka) (Mingliang Liu via aajisaka)
HDFS-9608. Disk IO imbalance in HDFS with heterogeneous storages.
(Wei Zhou via wang)
Release 2.8.0 - UNRELEASED Release 2.8.0 - UNRELEASED
NEW FEATURES NEW FEATURES

View File

@ -31,6 +31,7 @@
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
/** /**
@ -39,12 +40,16 @@
* new replica allocation. By default this policy prefers assigning replicas to * new replica allocation. By default this policy prefers assigning replicas to
* those volumes with more available free space, so as to over time balance the * those volumes with more available free space, so as to over time balance the
* available space of all the volumes within a DN. * available space of all the volumes within a DN.
* Use fine-grained locks to enable choosing volumes of different storage
* types concurrently.
*/ */
public class AvailableSpaceVolumeChoosingPolicy<V extends FsVolumeSpi> public class AvailableSpaceVolumeChoosingPolicy<V extends FsVolumeSpi>
implements VolumeChoosingPolicy<V>, Configurable { implements VolumeChoosingPolicy<V>, Configurable {
private static final Log LOG = LogFactory.getLog(AvailableSpaceVolumeChoosingPolicy.class); private static final Log LOG = LogFactory.getLog(AvailableSpaceVolumeChoosingPolicy.class);
private Object[] syncLocks;
private final Random random; private final Random random;
private long balancedSpaceThreshold = DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_THRESHOLD_DEFAULT; private long balancedSpaceThreshold = DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_THRESHOLD_DEFAULT;
@ -52,14 +57,24 @@ public class AvailableSpaceVolumeChoosingPolicy<V extends FsVolumeSpi>
AvailableSpaceVolumeChoosingPolicy(Random random) { AvailableSpaceVolumeChoosingPolicy(Random random) {
this.random = random; this.random = random;
initLocks();
} }
public AvailableSpaceVolumeChoosingPolicy() { public AvailableSpaceVolumeChoosingPolicy() {
this(new Random()); this(new Random());
initLocks();
}
private void initLocks() {
int numStorageTypes = StorageType.values().length;
syncLocks = new Object[numStorageTypes];
for (int i = 0; i < numStorageTypes; i++) {
syncLocks[i] = new Object();
}
} }
@Override @Override
public synchronized void setConf(Configuration conf) { public void setConf(Configuration conf) {
balancedSpaceThreshold = conf.getLong( balancedSpaceThreshold = conf.getLong(
DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_THRESHOLD_KEY, DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_THRESHOLD_KEY,
DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_THRESHOLD_DEFAULT); DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_THRESHOLD_DEFAULT);
@ -85,7 +100,7 @@ public synchronized void setConf(Configuration conf) {
} }
@Override @Override
public synchronized Configuration getConf() { public Configuration getConf() {
// Nothing to do. Only added to fulfill the Configurable contract. // Nothing to do. Only added to fulfill the Configurable contract.
return null; return null;
} }
@ -98,12 +113,24 @@ public synchronized Configuration getConf() {
new RoundRobinVolumeChoosingPolicy<V>(); new RoundRobinVolumeChoosingPolicy<V>();
@Override @Override
public synchronized V chooseVolume(List<V> volumes, public V chooseVolume(List<V> volumes,
long replicaSize) throws IOException { long replicaSize) throws IOException {
if (volumes.size() < 1) { if (volumes.size() < 1) {
throw new DiskOutOfSpaceException("No more available volumes"); throw new DiskOutOfSpaceException("No more available volumes");
} }
// As all the items in volumes are with the same storage type,
// so only need to get the storage type index of the first item in volumes
StorageType storageType = volumes.get(0).getStorageType();
int index = storageType != null ?
storageType.ordinal() : StorageType.DEFAULT.ordinal();
synchronized (syncLocks[index]) {
return doChooseVolume(volumes, replicaSize);
}
}
private V doChooseVolume(final List<V> volumes,
long replicaSize) throws IOException {
AvailableSpaceVolumeList volumesWithSpaces = AvailableSpaceVolumeList volumesWithSpaces =
new AvailableSpaceVolumeList(volumes); new AvailableSpaceVolumeList(volumes);

View File

@ -22,30 +22,58 @@
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
/** /**
* Choose volumes in round-robin order. * Choose volumes with the same storage type in round-robin order.
* Use fine-grained locks to synchronize volume choosing.
*/ */
public class RoundRobinVolumeChoosingPolicy<V extends FsVolumeSpi> public class RoundRobinVolumeChoosingPolicy<V extends FsVolumeSpi>
implements VolumeChoosingPolicy<V> { implements VolumeChoosingPolicy<V> {
public static final Log LOG = LogFactory.getLog(RoundRobinVolumeChoosingPolicy.class); public static final Log LOG = LogFactory.getLog(RoundRobinVolumeChoosingPolicy.class);
private int curVolume = 0; // curVolumes stores the RR counters of each storage type.
// The ordinal of storage type in org.apache.hadoop.fs.StorageType
// is used as the index to get data from the array.
private int[] curVolumes;
// syncLocks stores the locks for each storage type.
private Object[] syncLocks;
public RoundRobinVolumeChoosingPolicy() {
int numStorageTypes = StorageType.values().length;
curVolumes = new int[numStorageTypes];
syncLocks = new Object[numStorageTypes];
for (int i = 0; i < numStorageTypes; i++) {
syncLocks[i] = new Object();
}
}
@Override @Override
public synchronized V chooseVolume(final List<V> volumes, long blockSize) public V chooseVolume(final List<V> volumes, long blockSize)
throws IOException { throws IOException {
if (volumes.size() < 1) { if (volumes.size() < 1) {
throw new DiskOutOfSpaceException("No more available volumes"); throw new DiskOutOfSpaceException("No more available volumes");
} }
// As all the items in volumes are with the same storage type,
// so only need to get the storage type index of the first item in volumes
StorageType storageType = volumes.get(0).getStorageType();
int index = storageType != null ?
storageType.ordinal() : StorageType.DEFAULT.ordinal();
synchronized (syncLocks[index]) {
return chooseVolume(index, volumes, blockSize);
}
}
private V chooseVolume(final int curVolumeIndex, final List<V> volumes,
long blockSize) throws IOException {
// since volumes could've been removed because of the failure // since volumes could've been removed because of the failure
// make sure we are not out of bounds // make sure we are not out of bounds
if(curVolume >= volumes.size()) { int curVolume = curVolumes[curVolumeIndex] < volumes.size()
curVolume = 0; ? curVolumes[curVolumeIndex] : 0;
}
int startVolume = curVolume; int startVolume = curVolume;
long maxAvailable = 0; long maxAvailable = 0;
@ -55,6 +83,7 @@ public synchronized V chooseVolume(final List<V> volumes, long blockSize)
curVolume = (curVolume + 1) % volumes.size(); curVolume = (curVolume + 1) % volumes.size();
long availableVolumeSize = volume.getAvailable(); long availableVolumeSize = volume.getAvailable();
if (availableVolumeSize > blockSize) { if (availableVolumeSize > blockSize) {
curVolumes[curVolumeIndex] = curVolume;
return volume; return volume;
} }

View File

@ -21,6 +21,7 @@
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
import org.junit.Assert; import org.junit.Assert;
@ -102,4 +103,59 @@ public static void testRRPolicyExceptionMessage(
} }
} }
// Test Round-Robin choosing algorithm with heterogeneous storage.
@Test
public void testRRPolicyWithStorageTypes() throws Exception {
final RoundRobinVolumeChoosingPolicy<FsVolumeSpi> policy
= new RoundRobinVolumeChoosingPolicy<FsVolumeSpi>();
testRRPolicyWithStorageTypes(policy);
}
public static void testRRPolicyWithStorageTypes(
VolumeChoosingPolicy<FsVolumeSpi> policy) throws Exception {
final List<FsVolumeSpi> diskVolumes = new ArrayList<FsVolumeSpi>();
final List<FsVolumeSpi> ssdVolumes = new ArrayList<FsVolumeSpi>();
// Add two DISK volumes to diskVolumes
diskVolumes.add(Mockito.mock(FsVolumeSpi.class));
Mockito.when(diskVolumes.get(0).getStorageType())
.thenReturn(StorageType.DISK);
Mockito.when(diskVolumes.get(0).getAvailable()).thenReturn(100L);
diskVolumes.add(Mockito.mock(FsVolumeSpi.class));
Mockito.when(diskVolumes.get(1).getStorageType())
.thenReturn(StorageType.DISK);
Mockito.when(diskVolumes.get(1).getAvailable()).thenReturn(100L);
// Add two SSD volumes to ssdVolumes
ssdVolumes.add(Mockito.mock(FsVolumeSpi.class));
Mockito.when(ssdVolumes.get(0).getStorageType())
.thenReturn(StorageType.SSD);
Mockito.when(ssdVolumes.get(0).getAvailable()).thenReturn(200L);
ssdVolumes.add(Mockito.mock(FsVolumeSpi.class));
Mockito.when(ssdVolumes.get(1).getStorageType())
.thenReturn(StorageType.SSD);
Mockito.when(ssdVolumes.get(1).getAvailable()).thenReturn(100L);
Assert.assertEquals(diskVolumes.get(0),
policy.chooseVolume(diskVolumes, 0));
// Independent Round-Robin for different storage type
Assert.assertEquals(ssdVolumes.get(0),
policy.chooseVolume(ssdVolumes, 0));
// Take block size into consideration
Assert.assertEquals(ssdVolumes.get(0),
policy.chooseVolume(ssdVolumes, 150L));
Assert.assertEquals(diskVolumes.get(1),
policy.chooseVolume(diskVolumes, 0));
Assert.assertEquals(diskVolumes.get(0),
policy.chooseVolume(diskVolumes, 50L));
try {
policy.chooseVolume(diskVolumes, 200L);
Assert.fail("Should throw an DiskOutOfSpaceException before this!");
} catch (DiskOutOfSpaceException e) {
// Pass.
}
}
} }