HDDS-140. Add DU usage to VolumeInfo.

This commit is contained in:
Hanisha Koneru 2018-06-06 15:36:36 -07:00
parent 977c8cd166
commit 772c95395b
6 changed files with 317 additions and 79 deletions

View File

@ -28,7 +28,7 @@
/**
* Choose volumes in round-robin order.
* Use fine-grained locks to synchronize volume choosing.
* The caller should synchronize access to the list of volumes.
*/
public class RoundRobinVolumeChoosingPolicy implements VolumeChoosingPolicy {

View File

@ -18,13 +18,14 @@
package org.apache.hadoop.ozone.container.common.impl;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
/**
* Stores information about a disk/volume.
@ -36,24 +37,29 @@ public class VolumeInfo {
private final Path rootDir;
private final StorageType storageType;
private VolumeState state;
// Space usage calculator
private VolumeUsage usage;
// Capacity configured. This is useful when we want to
// limit the visible capacity for tests. If negative, then we just
// query from the filesystem.
private long configuredCapacity;
private volatile AtomicLong scmUsed = new AtomicLong(0);
public static class Builder {
private final Configuration conf;
private final Path rootDir;
private StorageType storageType;
private VolumeState state;
private long configuredCapacity;
public Builder(Path rootDir) {
public Builder(Path rootDir, Configuration conf) {
this.rootDir = rootDir;
this.conf = conf;
}
public Builder(String rootDirStr) {
public Builder(String rootDirStr, Configuration conf) {
this.rootDir = new Path(rootDirStr);
this.conf = conf;
}
public Builder storageType(StorageType storageType) {
@ -76,9 +82,17 @@ public VolumeInfo build() throws IOException {
}
}
private VolumeInfo(Builder b) {
private VolumeInfo(Builder b) throws IOException {
this.rootDir = b.rootDir;
File root = new File(rootDir.toString());
Boolean succeeded = root.isDirectory() || root.mkdirs();
if (!succeeded) {
LOG.error("Unable to create the volume root dir at : {}", root);
throw new IOException("Unable to create the volume root dir at " + root);
}
this.storageType = (b.storageType != null ?
b.storageType : StorageType.DEFAULT);
@ -88,19 +102,42 @@ private VolumeInfo(Builder b) {
this.state = (b.state != null ? b.state : VolumeState.NOT_FORMATTED);
LOG.info("Creating Volume : " + rootDir + " of storage type : " +
this.usage = new VolumeUsage(root, b.conf);
LOG.info("Creating Volume : " + rootDir + " of storage type : " +
storageType + " and capacity : " + configuredCapacity);
}
public void addSpaceUsed(long spaceUsed) {
this.scmUsed.getAndAdd(spaceUsed);
public long getCapacity() {
return configuredCapacity < 0 ? usage.getCapacity() : configuredCapacity;
}
public long getAvailable() {
return configuredCapacity - scmUsed.get();
public long getAvailable() throws IOException {
return usage.getAvailable();
}
public void setState(VolumeState state) {
public long getScmUsed() throws IOException {
return usage.getScmUsed();
}
void shutdown() {
this.state = VolumeState.NON_EXISTENT;
shutdownUsageThread();
}
void failVolume() {
setState(VolumeState.FAILED);
shutdownUsageThread();
}
private void shutdownUsageThread() {
if (usage != null) {
usage.shutdown();
}
usage = null;
}
void setState(VolumeState state) {
this.state = state;
}

View File

@ -40,7 +40,6 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ -55,32 +54,28 @@ public class VolumeSet {
private static final Logger LOG = LoggerFactory.getLogger(VolumeSet.class);
private Configuration conf;
/**
* {@link VolumeSet#volumeList} maintains a list of active volumes in the
* {@link VolumeSet#volumeMap} maintains a map of all active volumes in the
* DataNode. Each volume has one-to-one mapping with a volumeInfo object.
*/
private List<VolumeInfo> volumeList;
/**
* {@link VolumeSet#failedVolumeList} maintains a list of volumes which have
* failed. This list is mutually exclusive to {@link VolumeSet#volumeList}.
*/
private List<VolumeInfo> failedVolumeList;
/**
* {@link VolumeSet#volumeMap} maintains a map of all volumes in the
* DataNode irrespective of their state.
*/
private Map<Path, VolumeInfo> volumeMap;
/**
* {@link VolumeSet#volumeStateMap} maintains a list of volumes per
* {@link VolumeSet#failedVolumeMap} maintains a map of volumes which have
* failed. The keys in this map and {@link VolumeSet#volumeMap} are
* mutually exclusive.
*/
private Map<Path, VolumeInfo> failedVolumeMap;
/**
* {@link VolumeSet#volumeStateMap} maintains a list of active volumes per
* StorageType.
*/
private EnumMap<StorageType, List<VolumeInfo>> volumeStateMap;
/**
* Lock to synchronize changes to the VolumeSet. Any update to
* {@link VolumeSet#volumeList}, {@link VolumeSet#failedVolumeList},
* {@link VolumeSet#volumeMap} or {@link VolumeSet#volumeStateMap} should
* be done after acquiring this lock.
* {@link VolumeSet#volumeMap}, {@link VolumeSet#failedVolumeMap}, or
* {@link VolumeSet#volumeStateMap} should be done after acquiring this lock.
*/
private final AutoCloseableLock volumeSetLock;
@ -100,9 +95,8 @@ public VolumeSet(Configuration conf) throws DiskOutOfSpaceException {
// Add DN volumes configured through ConfigKeys to volumeMap.
private void initializeVolumeSet() throws DiskOutOfSpaceException {
volumeList = new ArrayList<>();
failedVolumeList = new ArrayList<>();
volumeMap = new ConcurrentHashMap<>();
failedVolumeMap = new ConcurrentHashMap<>();
volumeStateMap = new EnumMap<>(StorageType.class);
Collection<String> datanodeDirs = conf.getTrimmedStringCollection(
@ -123,7 +117,6 @@ private void initializeVolumeSet() throws DiskOutOfSpaceException {
try {
VolumeInfo volumeInfo = getVolumeInfo(dir);
volumeList.add(volumeInfo);
volumeMap.put(volumeInfo.getRootDir(), volumeInfo);
volumeStateMap.get(volumeInfo.getStorageType()).add(volumeInfo);
} catch (IOException e) {
@ -131,7 +124,7 @@ private void initializeVolumeSet() throws DiskOutOfSpaceException {
}
}
if (volumeList.size() == 0) {
if (volumeMap.size() == 0) {
throw new DiskOutOfSpaceException("No storage location configured");
}
}
@ -148,7 +141,7 @@ private VolumeInfo getVolumeInfo(String rootDir) throws IOException {
StorageLocation location = StorageLocation.parse(rootDir);
StorageType storageType = location.getStorageType();
VolumeInfo.Builder volumeBuilder = new VolumeInfo.Builder(rootDir);
VolumeInfo.Builder volumeBuilder = new VolumeInfo.Builder(rootDir, conf);
volumeBuilder.storageType(storageType);
return volumeBuilder.build();
}
@ -159,21 +152,17 @@ public void addVolume(String dataDir) throws IOException {
try (AutoCloseableLock lock = volumeSetLock.acquire()) {
if (volumeMap.containsKey(dirPath)) {
VolumeInfo volumeInfo = volumeMap.get(dirPath);
if (volumeInfo.isFailed()) {
volumeInfo.setState(VolumeState.NORMAL);
failedVolumeList.remove(volumeInfo);
volumeList.add(volumeInfo);
} else {
LOG.warn("Volume : " + volumeInfo.getRootDir() + " already " +
"exists in VolumeMap");
}
LOG.warn("Volume : {} already exists in VolumeMap", dataDir);
} else {
VolumeInfo volumeInfo = getVolumeInfo(dataDir);
if (failedVolumeMap.containsKey(dirPath)) {
failedVolumeMap.remove(dirPath);
}
volumeList.add(volumeInfo);
volumeMap.put(volumeInfo.getRootDir(), volumeInfo);
VolumeInfo volumeInfo = getVolumeInfo(dirPath.toString());
volumeMap.put(dirPath, volumeInfo);
volumeStateMap.get(volumeInfo.getStorageType()).add(volumeInfo);
LOG.debug("Added Volume : {} to VolumeSet", dataDir);
}
}
}
@ -185,13 +174,17 @@ public void failVolume(String dataDir) {
try (AutoCloseableLock lock = volumeSetLock.acquire()) {
if (volumeMap.containsKey(dirPath)) {
VolumeInfo volumeInfo = volumeMap.get(dirPath);
if (!volumeInfo.isFailed()) {
volumeInfo.setState(VolumeState.FAILED);
volumeList.remove(volumeInfo);
failedVolumeList.add(volumeInfo);
}
volumeInfo.failVolume();
volumeMap.remove(dirPath);
volumeStateMap.get(volumeInfo.getStorageType()).remove(volumeInfo);
failedVolumeMap.put(dirPath, volumeInfo);
LOG.debug("Moving Volume : {} to failed Volumes", dataDir);
} else if (failedVolumeMap.containsKey(dirPath)) {
LOG.debug("Volume : {} is not active", dataDir);
} else {
LOG.warn("Volume : " + dataDir + " does not exist in VolumeMap");
LOG.warn("Volume : {} does not exist in VolumeSet", dataDir);
}
}
}
@ -203,39 +196,47 @@ public void removeVolume(String dataDir) throws IOException {
try (AutoCloseableLock lock = volumeSetLock.acquire()) {
if (volumeMap.containsKey(dirPath)) {
VolumeInfo volumeInfo = volumeMap.get(dirPath);
if (!volumeInfo.isFailed()) {
volumeList.remove(volumeInfo);
} else {
failedVolumeList.remove(volumeInfo);
}
volumeInfo.shutdown();
volumeMap.remove(dirPath);
volumeStateMap.get(volumeInfo.getStorageType()).remove(volumeInfo);
LOG.debug("Removed Volume : {} from VolumeSet", dataDir);
} else if (failedVolumeMap.containsKey(dirPath)) {
VolumeInfo volumeInfo = failedVolumeMap.get(dirPath);
volumeInfo.setState(VolumeState.NON_EXISTENT);
failedVolumeMap.remove(dirPath);
LOG.debug("Removed Volume : {} from failed VolumeSet", dataDir);
} else {
LOG.warn("Volume: " + dataDir + " does not exist in " + "volumeMap.");
LOG.warn("Volume : {} does not exist in VolumeSet", dataDir);
}
}
}
/**
* Return an iterator over {@link VolumeSet#volumeList}.
*/
public Iterator<VolumeInfo> getIterator() {
return volumeList.iterator();
}
public VolumeInfo chooseVolume(long containerSize,
VolumeChoosingPolicy choosingPolicy) throws IOException {
return choosingPolicy.chooseVolume(volumeList, containerSize);
return choosingPolicy.chooseVolume(getVolumesList(), containerSize);
}
public void shutdown() {
for (VolumeInfo volumeInfo : volumeMap.values()) {
try {
volumeInfo.shutdown();
} catch (Exception e) {
LOG.error("Failed to shutdown volume : " + volumeInfo.getRootDir(), e);
}
}
}
@VisibleForTesting
public List<VolumeInfo> getVolumesList() {
return ImmutableList.copyOf(volumeList);
return ImmutableList.copyOf(volumeMap.values());
}
@VisibleForTesting
public List<VolumeInfo> getFailedVolumesList() {
return ImmutableList.copyOf(failedVolumeList);
return ImmutableList.copyOf(failedVolumeMap.values());
}
@VisibleForTesting

View File

@ -0,0 +1,189 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.common.impl;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CachingGetSpaceUsed;
import org.apache.hadoop.fs.DF;
import org.apache.hadoop.fs.GetSpaceUsed;
import org.apache.hadoop.io.IOUtils;
import static org.apache.hadoop.util.RunJar.SHUTDOWN_HOOK_PRIORITY;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;
/**
* Class that wraps the space df of the Datanode Volumes used by SCM
* containers.
*/
public class VolumeUsage {
private static final Logger LOG = LoggerFactory.getLogger(VolumeUsage.class);
private final File rootDir;
private final DF df;
private final File scmUsedFile;
private GetSpaceUsed scmUsage;
private Runnable shutdownHook;
private static final String DU_CACHE_FILE = "scmUsed";
private volatile boolean scmUsedSaved = false;
VolumeUsage(File dataLoc, Configuration conf)
throws IOException {
this.rootDir = dataLoc;
// SCM used cache file
scmUsedFile = new File(rootDir, DU_CACHE_FILE);
// get overall disk df
this.df = new DF(rootDir, conf);
startScmUsageThread(conf);
}
void startScmUsageThread(Configuration conf) throws IOException {
// get SCM specific df
this.scmUsage = new CachingGetSpaceUsed.Builder().setPath(rootDir)
.setConf(conf)
.setInitialUsed(loadScmUsed())
.build();
// Ensure scm df is saved during shutdown.
shutdownHook = () -> {
if (!scmUsedSaved) {
saveScmUsed();
}
};
ShutdownHookManager.get().addShutdownHook(shutdownHook,
SHUTDOWN_HOOK_PRIORITY);
}
long getCapacity() {
long capacity = df.getCapacity();
return (capacity > 0) ? capacity : 0;
}
/*
* Calculate the available space in the volume.
*/
long getAvailable() throws IOException {
long remaining = getCapacity() - getScmUsed();
long available = df.getAvailable();
if (remaining > available) {
remaining = available;
}
return (remaining > 0) ? remaining : 0;
}
long getScmUsed() throws IOException{
return scmUsage.getUsed();
}
public void shutdown() {
saveScmUsed();
scmUsedSaved = true;
if (shutdownHook != null) {
ShutdownHookManager.get().removeShutdownHook(shutdownHook);
}
if (scmUsage instanceof CachingGetSpaceUsed) {
IOUtils.cleanupWithLogger(null, ((CachingGetSpaceUsed) scmUsage));
}
}
/**
* Read in the cached DU value and return it if it is less than 600 seconds
* old (DU update interval). Slight imprecision of scmUsed is not critical
* and skipping DU can significantly shorten the startup time.
* If the cached value is not available or too old, -1 is returned.
*/
long loadScmUsed() {
long cachedScmUsed;
long mtime;
Scanner sc;
try {
sc = new Scanner(scmUsedFile, "UTF-8");
} catch (FileNotFoundException fnfe) {
return -1;
}
try {
// Get the recorded scmUsed from the file.
if (sc.hasNextLong()) {
cachedScmUsed = sc.nextLong();
} else {
return -1;
}
// Get the recorded mtime from the file.
if (sc.hasNextLong()) {
mtime = sc.nextLong();
} else {
return -1;
}
// Return the cached value if mtime is okay.
if (mtime > 0 && (Time.now() - mtime < 600000L)) {
LOG.info("Cached ScmUsed found for {} : {} ", rootDir,
cachedScmUsed);
return cachedScmUsed;
}
return -1;
} finally {
sc.close();
}
}
/**
* Write the current scmUsed to the cache file.
*/
void saveScmUsed() {
if (scmUsedFile.exists() && !scmUsedFile.delete()) {
LOG.warn("Failed to delete old scmUsed file in {}.", rootDir);
}
OutputStreamWriter out = null;
try {
long used = getScmUsed();
if (used > 0) {
out = new OutputStreamWriter(new FileOutputStream(scmUsedFile),
StandardCharsets.UTF_8);
// mtime is written last, so that truncated writes won't be valid.
out.write(Long.toString(used) + " " + Long.toString(Time.now()));
out.flush();
out.close();
out = null;
}
} catch (IOException ioe) {
// If write failed, the volume might be bad. Since the cache file is
// not critical, log the error and continue.
LOG.warn("Failed to write scmUsed to " + scmUsedFile, ioe);
} finally {
IOUtils.cleanupWithLogger(null, out);
}
}
}

View File

@ -27,6 +27,7 @@
import org.apache.hadoop.ozone.container.common.impl.VolumeSet;
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import org.junit.Before;
import org.junit.Rule;
@ -73,8 +74,10 @@ public void testVolumeSetInitialization() throws Exception {
// VolumeSet initialization should add volume1 and volume2 to VolumeSet
assertEquals("VolumeSet intialization is incorrect",
volumesList.size(), volumes.size());
assertEquals(volume1, volumesList.get(0).getRootDir().toString());
assertEquals(volume2, volumesList.get(1).getRootDir().toString());
assertTrue("VolumeSet not initailized correctly",
checkVolumeExistsInVolumeSet(volume1));
assertTrue("VolumeSet not initailized correctly",
checkVolumeExistsInVolumeSet(volume2));
}
@Test
@ -88,9 +91,8 @@ public void testAddVolume() throws Exception {
volumeSet.addVolume(volume3);
assertEquals(3, volumeSet.getVolumesList().size());
assertEquals("AddVolume did not add requested volume to VolumeSet",
volume3,
volumeSet.getVolumesList().get(2).getRootDir().toString());
assertTrue("AddVolume did not add requested volume to VolumeSet",
checkVolumeExistsInVolumeSet(volume3));
}
@Test
@ -103,15 +105,15 @@ public void testFailVolume() throws Exception {
assertEquals(1, volumeSet.getVolumesList().size());
// Failed volume should be added to FailedVolumeList
assertEquals("Failed volume not present in FailedVolumeList",
assertEquals("Failed volume not present in FailedVolumeMap",
1, volumeSet.getFailedVolumesList().size());
assertEquals("Failed Volume list did not match", volume1,
volumeSet.getFailedVolumesList().get(0).getRootDir().toString());
assertTrue(volumeSet.getFailedVolumesList().get(0).isFailed());
// Failed volume should exist in VolumeMap with isFailed flag set to true
// Failed volume should not exist in VolumeMap
Path volume1Path = new Path(volume1);
assertTrue(volumeSet.getVolumeMap().containsKey(volume1Path));
assertTrue(volumeSet.getVolumeMap().get(volume1Path).isFailed());
assertFalse(volumeSet.getVolumeMap().containsKey(volume1Path));
}
@Test
@ -130,9 +132,18 @@ public void testRemoveVolume() throws Exception {
LogFactory.getLog(VolumeSet.class));
volumeSet.removeVolume(volume1);
assertEquals(1, volumeSet.getVolumesList().size());
String expectedLogMessage = "Volume: " + volume1 + " does not exist in "
+ "volumeMap.";
String expectedLogMessage = "Volume : " + volume1 + " does not exist in "
+ "VolumeSet";
assertTrue("Log output does not contain expected log message: "
+ expectedLogMessage, logs.getOutput().contains(expectedLogMessage));
}
private boolean checkVolumeExistsInVolumeSet(String volume) {
for (VolumeInfo volumeInfo : volumeSet.getVolumesList()) {
if (volumeInfo.getRootDir().toString().equals(volume)) {
return true;
}
}
return false;
}
}

View File

@ -32,7 +32,7 @@ public interface VolumeChoosingPolicy<V extends FsVolumeSpi> {
* Choose a volume to place a replica,
* given a list of volumes and the replica size sought for storage.
*
* The implementations of this interface must be thread-safe.
* The caller should synchronize access to the list of volumes.
*
* @param volumes - a list of available volumes.
* @param replicaSize - the size of the replica for which a volume is sought.