diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/RoundRobinVolumeChoosingPolicy.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/RoundRobinVolumeChoosingPolicy.java index 0a20bf236b..55b3049be5 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/RoundRobinVolumeChoosingPolicy.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/RoundRobinVolumeChoosingPolicy.java @@ -28,7 +28,7 @@ /** * Choose volumes in round-robin order. - * Use fine-grained locks to synchronize volume choosing. + * The caller should synchronize access to the list of volumes. */ public class RoundRobinVolumeChoosingPolicy implements VolumeChoosingPolicy { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/VolumeInfo.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/VolumeInfo.java index 1b5a7aa052..3e8dda6d05 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/VolumeInfo.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/VolumeInfo.java @@ -18,13 +18,14 @@ package org.apache.hadoop.ozone.container.common.impl; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.StorageType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; import java.io.IOException; -import java.util.concurrent.atomic.AtomicLong; /** * Stores information about a disk/volume. @@ -36,24 +37,29 @@ public class VolumeInfo { private final Path rootDir; private final StorageType storageType; private VolumeState state; + + // Space usage calculator + private VolumeUsage usage; // Capacity configured. This is useful when we want to // limit the visible capacity for tests. If negative, then we just // query from the filesystem. private long configuredCapacity; - private volatile AtomicLong scmUsed = new AtomicLong(0); public static class Builder { + private final Configuration conf; private final Path rootDir; private StorageType storageType; private VolumeState state; private long configuredCapacity; - public Builder(Path rootDir) { + public Builder(Path rootDir, Configuration conf) { this.rootDir = rootDir; + this.conf = conf; } - public Builder(String rootDirStr) { + public Builder(String rootDirStr, Configuration conf) { this.rootDir = new Path(rootDirStr); + this.conf = conf; } public Builder storageType(StorageType storageType) { @@ -76,9 +82,17 @@ public VolumeInfo build() throws IOException { } } - private VolumeInfo(Builder b) { + private VolumeInfo(Builder b) throws IOException { this.rootDir = b.rootDir; + File root = new File(rootDir.toString()); + + Boolean succeeded = root.isDirectory() || root.mkdirs(); + + if (!succeeded) { + LOG.error("Unable to create the volume root dir at : {}", root); + throw new IOException("Unable to create the volume root dir at " + root); + } this.storageType = (b.storageType != null ? b.storageType : StorageType.DEFAULT); @@ -88,19 +102,42 @@ private VolumeInfo(Builder b) { this.state = (b.state != null ? b.state : VolumeState.NOT_FORMATTED); - LOG.info("Creating Volume : " + rootDir + " of storage type : " + + this.usage = new VolumeUsage(root, b.conf); + + LOG.info("Creating Volume : " + rootDir + " of storage type : " + storageType + " and capacity : " + configuredCapacity); } - public void addSpaceUsed(long spaceUsed) { - this.scmUsed.getAndAdd(spaceUsed); + public long getCapacity() { + return configuredCapacity < 0 ? usage.getCapacity() : configuredCapacity; } - public long getAvailable() { - return configuredCapacity - scmUsed.get(); + public long getAvailable() throws IOException { + return usage.getAvailable(); } - public void setState(VolumeState state) { + public long getScmUsed() throws IOException { + return usage.getScmUsed(); + } + + void shutdown() { + this.state = VolumeState.NON_EXISTENT; + shutdownUsageThread(); + } + + void failVolume() { + setState(VolumeState.FAILED); + shutdownUsageThread(); + } + + private void shutdownUsageThread() { + if (usage != null) { + usage.shutdown(); + } + usage = null; + } + + void setState(VolumeState state) { this.state = state; } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/VolumeSet.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/VolumeSet.java index 27fd657f4d..c55c84adb9 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/VolumeSet.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/VolumeSet.java @@ -40,7 +40,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.EnumMap; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -55,32 +54,28 @@ public class VolumeSet { private static final Logger LOG = LoggerFactory.getLogger(VolumeSet.class); private Configuration conf; + /** - * {@link VolumeSet#volumeList} maintains a list of active volumes in the + * {@link VolumeSet#volumeMap} maintains a map of all active volumes in the * DataNode. Each volume has one-to-one mapping with a volumeInfo object. */ - private List volumeList; - /** - * {@link VolumeSet#failedVolumeList} maintains a list of volumes which have - * failed. This list is mutually exclusive to {@link VolumeSet#volumeList}. - */ - private List failedVolumeList; - /** - * {@link VolumeSet#volumeMap} maintains a map of all volumes in the - * DataNode irrespective of their state. - */ private Map volumeMap; /** - * {@link VolumeSet#volumeStateMap} maintains a list of volumes per + * {@link VolumeSet#failedVolumeMap} maintains a map of volumes which have + * failed. The keys in this map and {@link VolumeSet#volumeMap} are + * mutually exclusive. + */ + private Map failedVolumeMap; + /** + * {@link VolumeSet#volumeStateMap} maintains a list of active volumes per * StorageType. */ private EnumMap> volumeStateMap; /** * Lock to synchronize changes to the VolumeSet. Any update to - * {@link VolumeSet#volumeList}, {@link VolumeSet#failedVolumeList}, - * {@link VolumeSet#volumeMap} or {@link VolumeSet#volumeStateMap} should - * be done after acquiring this lock. + * {@link VolumeSet#volumeMap}, {@link VolumeSet#failedVolumeMap}, or + * {@link VolumeSet#volumeStateMap} should be done after acquiring this lock. */ private final AutoCloseableLock volumeSetLock; @@ -100,9 +95,8 @@ public VolumeSet(Configuration conf) throws DiskOutOfSpaceException { // Add DN volumes configured through ConfigKeys to volumeMap. private void initializeVolumeSet() throws DiskOutOfSpaceException { - volumeList = new ArrayList<>(); - failedVolumeList = new ArrayList<>(); volumeMap = new ConcurrentHashMap<>(); + failedVolumeMap = new ConcurrentHashMap<>(); volumeStateMap = new EnumMap<>(StorageType.class); Collection datanodeDirs = conf.getTrimmedStringCollection( @@ -123,7 +117,6 @@ private void initializeVolumeSet() throws DiskOutOfSpaceException { try { VolumeInfo volumeInfo = getVolumeInfo(dir); - volumeList.add(volumeInfo); volumeMap.put(volumeInfo.getRootDir(), volumeInfo); volumeStateMap.get(volumeInfo.getStorageType()).add(volumeInfo); } catch (IOException e) { @@ -131,7 +124,7 @@ private void initializeVolumeSet() throws DiskOutOfSpaceException { } } - if (volumeList.size() == 0) { + if (volumeMap.size() == 0) { throw new DiskOutOfSpaceException("No storage location configured"); } } @@ -148,7 +141,7 @@ private VolumeInfo getVolumeInfo(String rootDir) throws IOException { StorageLocation location = StorageLocation.parse(rootDir); StorageType storageType = location.getStorageType(); - VolumeInfo.Builder volumeBuilder = new VolumeInfo.Builder(rootDir); + VolumeInfo.Builder volumeBuilder = new VolumeInfo.Builder(rootDir, conf); volumeBuilder.storageType(storageType); return volumeBuilder.build(); } @@ -159,21 +152,17 @@ public void addVolume(String dataDir) throws IOException { try (AutoCloseableLock lock = volumeSetLock.acquire()) { if (volumeMap.containsKey(dirPath)) { - VolumeInfo volumeInfo = volumeMap.get(dirPath); - if (volumeInfo.isFailed()) { - volumeInfo.setState(VolumeState.NORMAL); - failedVolumeList.remove(volumeInfo); - volumeList.add(volumeInfo); - } else { - LOG.warn("Volume : " + volumeInfo.getRootDir() + " already " + - "exists in VolumeMap"); - } + LOG.warn("Volume : {} already exists in VolumeMap", dataDir); } else { - VolumeInfo volumeInfo = getVolumeInfo(dataDir); + if (failedVolumeMap.containsKey(dirPath)) { + failedVolumeMap.remove(dirPath); + } - volumeList.add(volumeInfo); - volumeMap.put(volumeInfo.getRootDir(), volumeInfo); + VolumeInfo volumeInfo = getVolumeInfo(dirPath.toString()); + volumeMap.put(dirPath, volumeInfo); volumeStateMap.get(volumeInfo.getStorageType()).add(volumeInfo); + + LOG.debug("Added Volume : {} to VolumeSet", dataDir); } } } @@ -185,13 +174,17 @@ public void failVolume(String dataDir) { try (AutoCloseableLock lock = volumeSetLock.acquire()) { if (volumeMap.containsKey(dirPath)) { VolumeInfo volumeInfo = volumeMap.get(dirPath); - if (!volumeInfo.isFailed()) { - volumeInfo.setState(VolumeState.FAILED); - volumeList.remove(volumeInfo); - failedVolumeList.add(volumeInfo); - } + volumeInfo.failVolume(); + + volumeMap.remove(dirPath); + volumeStateMap.get(volumeInfo.getStorageType()).remove(volumeInfo); + failedVolumeMap.put(dirPath, volumeInfo); + + LOG.debug("Moving Volume : {} to failed Volumes", dataDir); + } else if (failedVolumeMap.containsKey(dirPath)) { + LOG.debug("Volume : {} is not active", dataDir); } else { - LOG.warn("Volume : " + dataDir + " does not exist in VolumeMap"); + LOG.warn("Volume : {} does not exist in VolumeSet", dataDir); } } } @@ -203,39 +196,47 @@ public void removeVolume(String dataDir) throws IOException { try (AutoCloseableLock lock = volumeSetLock.acquire()) { if (volumeMap.containsKey(dirPath)) { VolumeInfo volumeInfo = volumeMap.get(dirPath); - if (!volumeInfo.isFailed()) { - volumeList.remove(volumeInfo); - } else { - failedVolumeList.remove(volumeInfo); - } + volumeInfo.shutdown(); + volumeMap.remove(dirPath); volumeStateMap.get(volumeInfo.getStorageType()).remove(volumeInfo); + + LOG.debug("Removed Volume : {} from VolumeSet", dataDir); + } else if (failedVolumeMap.containsKey(dirPath)) { + VolumeInfo volumeInfo = failedVolumeMap.get(dirPath); + volumeInfo.setState(VolumeState.NON_EXISTENT); + + failedVolumeMap.remove(dirPath); + LOG.debug("Removed Volume : {} from failed VolumeSet", dataDir); } else { - LOG.warn("Volume: " + dataDir + " does not exist in " + "volumeMap."); + LOG.warn("Volume : {} does not exist in VolumeSet", dataDir); } } } - /** - * Return an iterator over {@link VolumeSet#volumeList}. - */ - public Iterator getIterator() { - return volumeList.iterator(); - } - public VolumeInfo chooseVolume(long containerSize, VolumeChoosingPolicy choosingPolicy) throws IOException { - return choosingPolicy.chooseVolume(volumeList, containerSize); + return choosingPolicy.chooseVolume(getVolumesList(), containerSize); + } + + public void shutdown() { + for (VolumeInfo volumeInfo : volumeMap.values()) { + try { + volumeInfo.shutdown(); + } catch (Exception e) { + LOG.error("Failed to shutdown volume : " + volumeInfo.getRootDir(), e); + } + } } @VisibleForTesting public List getVolumesList() { - return ImmutableList.copyOf(volumeList); + return ImmutableList.copyOf(volumeMap.values()); } @VisibleForTesting public List getFailedVolumesList() { - return ImmutableList.copyOf(failedVolumeList); + return ImmutableList.copyOf(failedVolumeMap.values()); } @VisibleForTesting diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/VolumeUsage.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/VolumeUsage.java new file mode 100644 index 0000000000..bcd78ba9fd --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/VolumeUsage.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.common.impl; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CachingGetSpaceUsed; +import org.apache.hadoop.fs.DF; +import org.apache.hadoop.fs.GetSpaceUsed; +import org.apache.hadoop.io.IOUtils; +import static org.apache.hadoop.util.RunJar.SHUTDOWN_HOOK_PRIORITY; +import org.apache.hadoop.util.ShutdownHookManager; +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.util.Scanner; + +/** + * Class that wraps the space df of the Datanode Volumes used by SCM + * containers. + */ +public class VolumeUsage { + private static final Logger LOG = LoggerFactory.getLogger(VolumeUsage.class); + + private final File rootDir; + private final DF df; + private final File scmUsedFile; + private GetSpaceUsed scmUsage; + private Runnable shutdownHook; + + private static final String DU_CACHE_FILE = "scmUsed"; + private volatile boolean scmUsedSaved = false; + + VolumeUsage(File dataLoc, Configuration conf) + throws IOException { + this.rootDir = dataLoc; + + // SCM used cache file + scmUsedFile = new File(rootDir, DU_CACHE_FILE); + // get overall disk df + this.df = new DF(rootDir, conf); + + startScmUsageThread(conf); + } + + void startScmUsageThread(Configuration conf) throws IOException { + // get SCM specific df + this.scmUsage = new CachingGetSpaceUsed.Builder().setPath(rootDir) + .setConf(conf) + .setInitialUsed(loadScmUsed()) + .build(); + + // Ensure scm df is saved during shutdown. + shutdownHook = () -> { + if (!scmUsedSaved) { + saveScmUsed(); + } + }; + ShutdownHookManager.get().addShutdownHook(shutdownHook, + SHUTDOWN_HOOK_PRIORITY); + } + + long getCapacity() { + long capacity = df.getCapacity(); + return (capacity > 0) ? capacity : 0; + } + + /* + * Calculate the available space in the volume. + */ + long getAvailable() throws IOException { + long remaining = getCapacity() - getScmUsed(); + long available = df.getAvailable(); + if (remaining > available) { + remaining = available; + } + return (remaining > 0) ? remaining : 0; + } + + long getScmUsed() throws IOException{ + return scmUsage.getUsed(); + } + + public void shutdown() { + saveScmUsed(); + scmUsedSaved = true; + + if (shutdownHook != null) { + ShutdownHookManager.get().removeShutdownHook(shutdownHook); + } + + if (scmUsage instanceof CachingGetSpaceUsed) { + IOUtils.cleanupWithLogger(null, ((CachingGetSpaceUsed) scmUsage)); + } + } + + /** + * Read in the cached DU value and return it if it is less than 600 seconds + * old (DU update interval). Slight imprecision of scmUsed is not critical + * and skipping DU can significantly shorten the startup time. + * If the cached value is not available or too old, -1 is returned. + */ + long loadScmUsed() { + long cachedScmUsed; + long mtime; + Scanner sc; + + try { + sc = new Scanner(scmUsedFile, "UTF-8"); + } catch (FileNotFoundException fnfe) { + return -1; + } + + try { + // Get the recorded scmUsed from the file. + if (sc.hasNextLong()) { + cachedScmUsed = sc.nextLong(); + } else { + return -1; + } + // Get the recorded mtime from the file. + if (sc.hasNextLong()) { + mtime = sc.nextLong(); + } else { + return -1; + } + + // Return the cached value if mtime is okay. + if (mtime > 0 && (Time.now() - mtime < 600000L)) { + LOG.info("Cached ScmUsed found for {} : {} ", rootDir, + cachedScmUsed); + return cachedScmUsed; + } + return -1; + } finally { + sc.close(); + } + } + + /** + * Write the current scmUsed to the cache file. + */ + void saveScmUsed() { + if (scmUsedFile.exists() && !scmUsedFile.delete()) { + LOG.warn("Failed to delete old scmUsed file in {}.", rootDir); + } + OutputStreamWriter out = null; + try { + long used = getScmUsed(); + if (used > 0) { + out = new OutputStreamWriter(new FileOutputStream(scmUsedFile), + StandardCharsets.UTF_8); + // mtime is written last, so that truncated writes won't be valid. + out.write(Long.toString(used) + " " + Long.toString(Time.now())); + out.flush(); + out.close(); + out = null; + } + } catch (IOException ioe) { + // If write failed, the volume might be bad. Since the cache file is + // not critical, log the error and continue. + LOG.warn("Failed to write scmUsed to " + scmUsedFile, ioe); + } finally { + IOUtils.cleanupWithLogger(null, out); + } + } +} diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/interfaces/TestVolumeSet.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/interfaces/TestVolumeSet.java index 5a1bc791cf..ceeacff5af 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/interfaces/TestVolumeSet.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/interfaces/TestVolumeSet.java @@ -27,6 +27,7 @@ import org.apache.hadoop.ozone.container.common.impl.VolumeSet; import org.apache.hadoop.test.GenericTestUtils.LogCapturer; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import org.junit.Before; import org.junit.Rule; @@ -73,8 +74,10 @@ public void testVolumeSetInitialization() throws Exception { // VolumeSet initialization should add volume1 and volume2 to VolumeSet assertEquals("VolumeSet intialization is incorrect", volumesList.size(), volumes.size()); - assertEquals(volume1, volumesList.get(0).getRootDir().toString()); - assertEquals(volume2, volumesList.get(1).getRootDir().toString()); + assertTrue("VolumeSet not initailized correctly", + checkVolumeExistsInVolumeSet(volume1)); + assertTrue("VolumeSet not initailized correctly", + checkVolumeExistsInVolumeSet(volume2)); } @Test @@ -88,9 +91,8 @@ public void testAddVolume() throws Exception { volumeSet.addVolume(volume3); assertEquals(3, volumeSet.getVolumesList().size()); - assertEquals("AddVolume did not add requested volume to VolumeSet", - volume3, - volumeSet.getVolumesList().get(2).getRootDir().toString()); + assertTrue("AddVolume did not add requested volume to VolumeSet", + checkVolumeExistsInVolumeSet(volume3)); } @Test @@ -103,15 +105,15 @@ public void testFailVolume() throws Exception { assertEquals(1, volumeSet.getVolumesList().size()); // Failed volume should be added to FailedVolumeList - assertEquals("Failed volume not present in FailedVolumeList", + assertEquals("Failed volume not present in FailedVolumeMap", 1, volumeSet.getFailedVolumesList().size()); assertEquals("Failed Volume list did not match", volume1, volumeSet.getFailedVolumesList().get(0).getRootDir().toString()); + assertTrue(volumeSet.getFailedVolumesList().get(0).isFailed()); - // Failed volume should exist in VolumeMap with isFailed flag set to true + // Failed volume should not exist in VolumeMap Path volume1Path = new Path(volume1); - assertTrue(volumeSet.getVolumeMap().containsKey(volume1Path)); - assertTrue(volumeSet.getVolumeMap().get(volume1Path).isFailed()); + assertFalse(volumeSet.getVolumeMap().containsKey(volume1Path)); } @Test @@ -130,9 +132,18 @@ public void testRemoveVolume() throws Exception { LogFactory.getLog(VolumeSet.class)); volumeSet.removeVolume(volume1); assertEquals(1, volumeSet.getVolumesList().size()); - String expectedLogMessage = "Volume: " + volume1 + " does not exist in " - + "volumeMap."; + String expectedLogMessage = "Volume : " + volume1 + " does not exist in " + + "VolumeSet"; assertTrue("Log output does not contain expected log message: " + expectedLogMessage, logs.getOutput().contains(expectedLogMessage)); } + + private boolean checkVolumeExistsInVolumeSet(String volume) { + for (VolumeInfo volumeInfo : volumeSet.getVolumesList()) { + if (volumeInfo.getRootDir().toString().equals(volume)) { + return true; + } + } + return false; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/VolumeChoosingPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/VolumeChoosingPolicy.java index 8cbc0587b0..e9fa37bdea 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/VolumeChoosingPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/VolumeChoosingPolicy.java @@ -32,7 +32,7 @@ public interface VolumeChoosingPolicy { * Choose a volume to place a replica, * given a list of volumes and the replica size sought for storage. * - * The implementations of this interface must be thread-safe. + * The caller should synchronize access to the list of volumes. * * @param volumes - a list of available volumes. * @param replicaSize - the size of the replica for which a volume is sought.