From 9b0aace1e6c54f201784912c0b623707aa82b761 Mon Sep 17 00:00:00 2001 From: Rakesh Radhakrishnan Date: Wed, 8 May 2019 17:20:21 +0530 Subject: [PATCH] HDFS-14401. Refine the implementation for HDFS cache on SCM. Contributed by Feilong He. --- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 14 -- .../hadoop/hdfs/server/datanode/DNConf.java | 22 -- .../fsdataset/impl/FsDatasetCache.java | 31 +-- .../fsdataset/impl/FsDatasetImpl.java | 2 + .../fsdataset/impl/MappableBlockLoader.java | 19 +- .../impl/MappableBlockLoaderFactory.java | 47 +++++ .../impl/MemoryMappableBlockLoader.java | 21 +- .../impl/PmemMappableBlockLoader.java | 40 ++-- .../fsdataset/impl/PmemMappedBlock.java | 10 +- .../fsdataset/impl/PmemVolumeManager.java | 197 +++++++++++++----- .../src/main/resources/hdfs-default.xml | 24 --- .../TestCacheByPmemMappableBlockLoader.java | 26 +-- .../fsdataset/impl/TestFsDatasetCache.java | 5 +- 13 files changed, 256 insertions(+), 202 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoaderFactory.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index f9a4ab2b38..e9d0eec8d2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -26,8 +26,6 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRackFaultTolerant; -import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MappableBlockLoader; -import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MemoryMappableBlockLoader; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaLruTracker; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.ReservedSpaceCalculator; import org.apache.hadoop.hdfs.web.URLConnectionFactory; @@ -392,22 +390,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_DATANODE_CACHE_REVOCATION_POLLING_MS = "dfs.datanode.cache.revocation.polling.ms"; public static final long DFS_DATANODE_CACHE_REVOCATION_POLLING_MS_DEFAULT = 500L; - // Currently, the available cache loaders are MemoryMappableBlockLoader, - // PmemMappableBlockLoader. MemoryMappableBlockLoader is the default cache - // loader to cache block replica to memory. - public static final String DFS_DATANODE_CACHE_LOADER_CLASS = - "dfs.datanode.cache.loader.class"; - public static final Class - DFS_DATANODE_CACHE_LOADER_CLASS_DEFAULT = - MemoryMappableBlockLoader.class; // Multiple dirs separated by "," are acceptable. public static final String DFS_DATANODE_CACHE_PMEM_DIRS_KEY = "dfs.datanode.cache.pmem.dirs"; public static final String DFS_DATANODE_CACHE_PMEM_DIRS_DEFAULT = ""; - // The cache capacity of persistent memory - public static final String DFS_DATANODE_CACHE_PMEM_CAPACITY_KEY = - "dfs.datanode.cache.pmem.capacity"; - public static final long DFS_DATANODE_CACHE_PMEM_CAPACITY_DEFAULT = 0L; public static final String DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_KEY = "dfs.namenode.datanode.registration.ip-hostname-check"; public static final boolean DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_DEFAULT = true; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java index b4bf86896d..00ac64c30d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java @@ -27,10 +27,6 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHOLD_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_LOADER_CLASS; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_LOADER_CLASS_DEFAULT; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_CAPACITY_DEFAULT; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_CAPACITY_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_DIRS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_LIFELINE_INTERVAL_SECONDS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_PERSIST; @@ -71,7 +67,6 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver; import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil; import org.apache.hadoop.hdfs.server.common.Util; -import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MappableBlockLoader; import org.apache.hadoop.security.SaslPropertiesResolver; import java.util.concurrent.TimeUnit; @@ -121,9 +116,7 @@ public class DNConf { final long xceiverStopTimeout; final long restartReplicaExpiry; - private final Class cacheLoaderClass; final long maxLockedMemory; - private final long maxLockedPmem; private final String[] pmemDirs; private final long bpReadyTimeout; @@ -266,17 +259,10 @@ public DNConf(final Configurable dn) { DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY, DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_DEFAULT); - this.cacheLoaderClass = getConf().getClass(DFS_DATANODE_CACHE_LOADER_CLASS, - DFS_DATANODE_CACHE_LOADER_CLASS_DEFAULT, MappableBlockLoader.class); - this.maxLockedMemory = getConf().getLongBytes( DFS_DATANODE_MAX_LOCKED_MEMORY_KEY, DFS_DATANODE_MAX_LOCKED_MEMORY_DEFAULT); - this.maxLockedPmem = getConf().getLongBytes( - DFS_DATANODE_CACHE_PMEM_CAPACITY_KEY, - DFS_DATANODE_CACHE_PMEM_CAPACITY_DEFAULT); - this.pmemDirs = getConf().getTrimmedStrings( DFS_DATANODE_CACHE_PMEM_DIRS_KEY); @@ -342,10 +328,6 @@ public long getMaxLockedMemory() { return maxLockedMemory; } - public long getMaxLockedPmem() { - return maxLockedPmem; - } - /** * Returns true if connect to datanode via hostname * @@ -449,10 +431,6 @@ int getMaxDataLength() { return maxDataLength; } - public Class getCacheLoaderClass() { - return cacheLoaderClass; - } - public String[] getPmemVolumes() { return pmemDirs; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java index dce84c2650..4fab214a05 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java @@ -53,7 +53,6 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.datanode.DNConf; import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil; -import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -183,9 +182,8 @@ public FsDatasetCache(FsDatasetImpl dataset) throws IOException { this.memCacheStats = new MemoryCacheStats( dataset.datanode.getDnConf().getMaxLockedMemory()); - Class cacheLoaderClass = - dataset.datanode.getDnConf().getCacheLoaderClass(); - this.cacheLoader = ReflectionUtils.newInstance(cacheLoaderClass, null); + this.cacheLoader = MappableBlockLoaderFactory.createCacheLoader( + this.getDnConf()); cacheLoader.initialize(this); } @@ -213,7 +211,7 @@ String getReplicaCachePath(String bpid, long blockId) { return null; } ExtendedBlockId key = new ExtendedBlockId(blockId, bpid); - return cacheLoader.getCachedPath(key); + return PmemVolumeManager.getInstance().getCachePath(key); } /** @@ -380,14 +378,13 @@ public void run() { MappableBlock mappableBlock = null; ExtendedBlock extBlk = new ExtendedBlock(key.getBlockPoolId(), key.getBlockId(), length, genstamp); - long newUsedBytes = cacheLoader.reserve(length); + long newUsedBytes = cacheLoader.reserve(key, length); boolean reservedBytes = false; try { if (newUsedBytes < 0) { - LOG.warn("Failed to cache " + key + ": could not reserve " + length + - " more bytes in the cache: " + - cacheLoader.getCacheCapacityConfigKey() + - " of " + cacheLoader.getCacheCapacity() + " exceeded."); + LOG.warn("Failed to cache " + key + ": could not reserve " + + "more bytes in the cache: " + cacheLoader.getCacheCapacity() + + " exceeded when try to reserve " + length + "bytes."); return; } reservedBytes = true; @@ -442,10 +439,10 @@ public void run() { IOUtils.closeQuietly(metaIn); if (!success) { if (reservedBytes) { - cacheLoader.release(length); + cacheLoader.release(key, length); } LOG.debug("Caching of {} was aborted. We are now caching only {} " - + "bytes in total.", key, memCacheStats.getCacheUsed()); + + "bytes in total.", key, cacheLoader.getCacheUsed()); IOUtils.closeQuietly(mappableBlock); numBlocksFailedToCache.incrementAndGet(); @@ -519,7 +516,8 @@ public void run() { synchronized (FsDatasetCache.this) { mappableBlockMap.remove(key); } - long newUsedBytes = cacheLoader.release(value.mappableBlock.getLength()); + long newUsedBytes = cacheLoader. + release(key, value.mappableBlock.getLength()); numBlocksCached.addAndGet(-1); dataset.datanode.getMetrics().incrBlocksUncached(1); if (revocationTimeMs != 0) { @@ -592,4 +590,11 @@ public synchronized boolean isCached(String bpid, long blockId) { MappableBlockLoader getCacheLoader() { return cacheLoader; } + + /** + * This method can be executed during DataNode shutdown. + */ + void shutdown() { + cacheLoader.shutdown(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 47f0c0111f..80738d3dcf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -2328,6 +2328,8 @@ public void shutdown() { "from LazyWriter.join"); } } + + cacheManager.shutdown(); } @Override // FSDatasetMBean diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java index a9e9610172..044e5c5927 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java @@ -65,26 +65,25 @@ abstract MappableBlock load(long length, FileInputStream blockIn, /** * Try to reserve some given bytes. * + * @param key The ExtendedBlockId for a block. + * * @param bytesCount The number of bytes to add. * * @return The new number of usedBytes if we succeeded; * -1 if we failed. */ - abstract long reserve(long bytesCount); + abstract long reserve(ExtendedBlockId key, long bytesCount); /** * Release some bytes that we're using. * + * @param key The ExtendedBlockId for a block. + * * @param bytesCount The number of bytes to release. * * @return The new number of usedBytes. */ - abstract long release(long bytesCount); - - /** - * Get the config key of cache capacity. - */ - abstract String getCacheCapacityConfigKey(); + abstract long release(ExtendedBlockId key, long bytesCount); /** * Get the approximate amount of cache space used. @@ -102,9 +101,11 @@ abstract MappableBlock load(long length, FileInputStream blockIn, abstract boolean isTransientCache(); /** - * Get a cache file path if applicable. Otherwise return null. + * Clean up cache, can be used during DataNode shutdown. */ - abstract String getCachedPath(ExtendedBlockId key); + void shutdown() { + // Do nothing. + } /** * Reads bytes into a buffer until EOF or the buffer's limit is reached. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoaderFactory.java new file mode 100644 index 0000000000..43b1b531af --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoaderFactory.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.server.datanode.DNConf; + +/** + * Creates MappableBlockLoader. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public final class MappableBlockLoaderFactory { + + private MappableBlockLoaderFactory() { + // Prevent instantiation + } + + /** + * Create a specific cache loader according to the configuration. + * If persistent memory volume is not configured, return a cache loader + * for DRAM cache. Otherwise, return a cache loader for pmem cache. + */ + public static MappableBlockLoader createCacheLoader(DNConf conf) { + if (conf.getPmemVolumes() == null || conf.getPmemVolumes().length == 0) { + return new MemoryMappableBlockLoader(); + } + return new PmemMappableBlockLoader(); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java index 4b7af19513..919835a5ee 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java @@ -22,11 +22,12 @@ import org.apache.commons.io.IOUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.ExtendedBlockId; import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.util.DataChecksum; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.BufferedInputStream; import java.io.DataInputStream; @@ -42,11 +43,13 @@ @InterfaceAudience.Private @InterfaceStability.Unstable public class MemoryMappableBlockLoader extends MappableBlockLoader { - + private static final Logger LOG = + LoggerFactory.getLogger(MemoryMappableBlockLoader.class); private MemoryCacheStats memCacheStats; @Override void initialize(FsDatasetCache cacheManager) throws IOException { + LOG.info("Initializing cache loader: MemoryMappableBlockLoader."); this.memCacheStats = cacheManager.getMemCacheStats(); } @@ -148,11 +151,6 @@ private void verifyChecksum(long length, FileInputStream metaIn, } } - @Override - public String getCacheCapacityConfigKey() { - return DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY; - } - @Override public long getCacheUsed() { return memCacheStats.getCacheUsed(); @@ -164,12 +162,12 @@ public long getCacheCapacity() { } @Override - long reserve(long bytesCount) { + long reserve(ExtendedBlockId key, long bytesCount) { return memCacheStats.reserve(bytesCount); } @Override - long release(long bytesCount) { + long release(ExtendedBlockId key, long bytesCount) { return memCacheStats.release(bytesCount); } @@ -177,9 +175,4 @@ long release(long bytesCount) { public boolean isTransientCache() { return true; } - - @Override - public String getCachedPath(ExtendedBlockId key) { - return null; - } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappableBlockLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappableBlockLoader.java index c581d3101a..05a9ba717e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappableBlockLoader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappableBlockLoader.java @@ -18,12 +18,10 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.commons.io.IOUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.ExtendedBlockId; import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; import org.apache.hadoop.hdfs.server.datanode.DNConf; @@ -53,14 +51,10 @@ public class PmemMappableBlockLoader extends MappableBlockLoader { @Override void initialize(FsDatasetCache cacheManager) throws IOException { + LOG.info("Initializing cache loader: PmemMappableBlockLoader."); DNConf dnConf = cacheManager.getDnConf(); - this.pmemVolumeManager = new PmemVolumeManager(dnConf.getMaxLockedPmem(), - dnConf.getPmemVolumes()); - } - - @VisibleForTesting - PmemVolumeManager getPmemVolumeManager() { - return pmemVolumeManager; + PmemVolumeManager.init(dnConf.getPmemVolumes()); + pmemVolumeManager = PmemVolumeManager.getInstance(); } /** @@ -69,7 +63,7 @@ PmemVolumeManager getPmemVolumeManager() { * Map the block and verify its checksum. * * The block will be mapped to PmemDir/BlockPoolId-BlockId, in which PmemDir - * is a persistent memory volume selected by getOneLocation() method. + * is a persistent memory volume chosen by PmemVolumeManager. * * @param length The current length of the block. * @param blockIn The block input stream. Should be positioned at the @@ -100,8 +94,7 @@ MappableBlock load(long length, FileInputStream blockIn, throw new IOException("Block InputStream has no FileChannel."); } - Byte volumeIndex = pmemVolumeManager.getOneVolumeIndex(); - filePath = pmemVolumeManager.inferCacheFilePath(volumeIndex, key); + filePath = pmemVolumeManager.getCachePath(key); file = new RandomAccessFile(filePath, "rw"); out = file.getChannel(). map(FileChannel.MapMode.READ_WRITE, 0, length); @@ -111,9 +104,7 @@ MappableBlock load(long length, FileInputStream blockIn, } verifyChecksumAndMapBlock(out, length, metaIn, blockChannel, blockFileName); - mappableBlock = new PmemMappedBlock( - length, volumeIndex, key, pmemVolumeManager); - pmemVolumeManager.afterCache(key, volumeIndex); + mappableBlock = new PmemMappedBlock(length, key); LOG.info("Successfully cached one replica:{} into persistent memory" + ", [cached path={}, length={}]", key, filePath, length); } finally { @@ -123,6 +114,7 @@ MappableBlock load(long length, FileInputStream blockIn, } IOUtils.closeQuietly(file); if (mappableBlock == null) { + LOG.debug("Delete {} due to unsuccessful mapping.", filePath); FsDatasetUtil.deleteMappedFile(filePath); } } @@ -193,11 +185,6 @@ private void verifyChecksumAndMapBlock( } } - @Override - public String getCacheCapacityConfigKey() { - return DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_CAPACITY_KEY; - } - @Override public long getCacheUsed() { return pmemVolumeManager.getCacheUsed(); @@ -209,13 +196,13 @@ public long getCacheCapacity() { } @Override - long reserve(long bytesCount) { - return pmemVolumeManager.reserve(bytesCount); + long reserve(ExtendedBlockId key, long bytesCount) { + return pmemVolumeManager.reserve(key, bytesCount); } @Override - long release(long bytesCount) { - return pmemVolumeManager.release(bytesCount); + long release(ExtendedBlockId key, long bytesCount) { + return pmemVolumeManager.release(key, bytesCount); } @Override @@ -224,7 +211,8 @@ public boolean isTransientCache() { } @Override - public String getCachedPath(ExtendedBlockId key) { - return pmemVolumeManager.getCacheFilePath(key); + void shutdown() { + LOG.info("Clean up cache on persistent memory during shutdown."); + PmemVolumeManager.getInstance().cleanup(); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappedBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappedBlock.java index ce4fa22137..25c3d400bd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappedBlock.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappedBlock.java @@ -35,18 +35,13 @@ public class PmemMappedBlock implements MappableBlock { private static final Logger LOG = LoggerFactory.getLogger(PmemMappedBlock.class); - private final PmemVolumeManager pmemVolumeManager; private long length; - private Byte volumeIndex = null; private ExtendedBlockId key; - PmemMappedBlock(long length, Byte volumeIndex, ExtendedBlockId key, - PmemVolumeManager pmemVolumeManager) { + PmemMappedBlock(long length, ExtendedBlockId key) { assert length > 0; this.length = length; - this.volumeIndex = volumeIndex; this.key = key; - this.pmemVolumeManager = pmemVolumeManager; } @Override @@ -57,10 +52,9 @@ public long getLength() { @Override public void close() { String cacheFilePath = - pmemVolumeManager.inferCacheFilePath(volumeIndex, key); + PmemVolumeManager.getInstance().getCachePath(key); try { FsDatasetUtil.deleteMappedFile(cacheFilePath); - pmemVolumeManager.afterUncache(key); LOG.info("Successfully uncached one replica:{} from persistent memory" + ", [cached path={}, length={}]", key, cacheFilePath, length); } catch (IOException e) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemVolumeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemVolumeManager.java index 76aa2dd5c8..2d77f7ad96 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemVolumeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemVolumeManager.java @@ -35,6 +35,7 @@ import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; @@ -45,14 +46,19 @@ */ @InterfaceAudience.Private @InterfaceStability.Unstable -public class PmemVolumeManager { +public final class PmemVolumeManager { /** * Counts used bytes for persistent memory. */ - private class UsedBytesCount { + private static class UsedBytesCount { + private final long maxBytes; private final AtomicLong usedBytes = new AtomicLong(0); + UsedBytesCount(long maxBytes) { + this.maxBytes = maxBytes; + } + /** * Try to reserve more bytes. * @@ -65,7 +71,7 @@ long reserve(long bytesCount) { while (true) { long cur = usedBytes.get(); long next = cur + bytesCount; - if (next > cacheCapacity) { + if (next > maxBytes) { return -1; } if (usedBytes.compareAndSet(cur, next)) { @@ -85,42 +91,76 @@ long release(long bytesCount) { return usedBytes.addAndGet(-bytesCount); } - long get() { + long getUsedBytes() { return usedBytes.get(); } + + long getMaxBytes() { + return maxBytes; + } + + long getAvailableBytes() { + return maxBytes - usedBytes.get(); + } } private static final Logger LOG = LoggerFactory.getLogger(PmemVolumeManager.class); + public static final String CACHE_DIR = "hdfs_pmem_cache"; + private static PmemVolumeManager pmemVolumeManager = null; private final ArrayList pmemVolumes = new ArrayList<>(); // Maintain which pmem volume a block is cached to. private final Map blockKeyToVolume = new ConcurrentHashMap<>(); - private final UsedBytesCount usedBytesCount; + private final List usedBytesCounts = new ArrayList<>(); /** * The total cache capacity in bytes of persistent memory. - * It is 0L if the specific mappableBlockLoader couldn't cache data to pmem. */ - private final long cacheCapacity; + private long cacheCapacity; + private static long maxBytesPerPmem = -1; private int count = 0; - // Strict atomic operation is not guaranteed for the performance sake. - private int i = 0; + private byte nextIndex = 0; - PmemVolumeManager(long maxBytes, String[] pmemVolumesConfigured) - throws IOException { - if (pmemVolumesConfigured == null || pmemVolumesConfigured.length == 0) { + private PmemVolumeManager(String[] pmemVolumesConfig) throws IOException { + if (pmemVolumesConfig == null || pmemVolumesConfig.length == 0) { throw new IOException("The persistent memory volume, " + DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_DIRS_KEY + " is not configured!"); } - this.loadVolumes(pmemVolumesConfigured); - this.usedBytesCount = new UsedBytesCount(); - this.cacheCapacity = maxBytes; + this.loadVolumes(pmemVolumesConfig); + cacheCapacity = 0L; + for (UsedBytesCount counter : usedBytesCounts) { + cacheCapacity += counter.getMaxBytes(); + } + } + + public synchronized static void init(String[] pmemVolumesConfig) + throws IOException { + if (pmemVolumeManager == null) { + pmemVolumeManager = new PmemVolumeManager(pmemVolumesConfig); + } + } + + public static PmemVolumeManager getInstance() { + if (pmemVolumeManager == null) { + throw new RuntimeException( + "The pmemVolumeManager should be instantiated!"); + } + return pmemVolumeManager; + } + + @VisibleForTesting + public static void setMaxBytes(long maxBytes) { + maxBytesPerPmem = maxBytes; } public long getCacheUsed() { - return usedBytesCount.get(); + long usedBytes = 0L; + for (UsedBytesCount counter : usedBytesCounts) { + usedBytes += counter.getUsedBytes(); + } + return usedBytes; } public long getCacheCapacity() { @@ -130,24 +170,40 @@ public long getCacheCapacity() { /** * Try to reserve more bytes on persistent memory. * + * @param key The ExtendedBlockId for a block. + * * @param bytesCount The number of bytes to add. * * @return The new number of usedBytes if we succeeded; * -1 if we failed. */ - long reserve(long bytesCount) { - return usedBytesCount.reserve(bytesCount); + synchronized long reserve(ExtendedBlockId key, long bytesCount) { + try { + byte index = chooseVolume(bytesCount); + long usedBytes = usedBytesCounts.get(index).reserve(bytesCount); + // Put the entry into blockKeyToVolume if reserving bytes succeeded. + if (usedBytes > 0) { + blockKeyToVolume.put(key, index); + } + return usedBytes; + } catch (IOException e) { + LOG.warn(e.getMessage()); + return -1L; + } } /** * Release some bytes that we're using on persistent memory. * + * @param key The ExtendedBlockId for a block. + * * @param bytesCount The number of bytes to release. * * @return The new number of usedBytes. */ - long release(long bytesCount) { - return usedBytesCount.release(bytesCount); + long release(ExtendedBlockId key, long bytesCount) { + Byte index = blockKeyToVolume.remove(key); + return usedBytesCounts.get(index).release(bytesCount); } /** @@ -155,46 +211,70 @@ long release(long bytesCount) { * * @throws IOException If there is no available pmem volume. */ - private void loadVolumes(String[] volumes) throws IOException { + private void loadVolumes(String[] volumes) + throws IOException { // Check whether the volume exists - for (String volume: volumes) { + for (byte n = 0; n < volumes.length; n++) { try { - File pmemDir = new File(volume); - verifyIfValidPmemVolume(pmemDir); - // Remove all files under the volume. - FileUtils.cleanDirectory(pmemDir); + File pmemDir = new File(volumes[n]); + File realPmemDir = verifyIfValidPmemVolume(pmemDir); + this.pmemVolumes.add(realPmemDir.getPath()); + long maxBytes; + if (maxBytesPerPmem == -1) { + maxBytes = realPmemDir.getUsableSpace(); + } else { + maxBytes = maxBytesPerPmem; + } + UsedBytesCount usedBytesCount = new UsedBytesCount(maxBytes); + this.usedBytesCounts.add(usedBytesCount); + LOG.info("Added persistent memory - {} with size={}", + volumes[n], maxBytes); } catch (IllegalArgumentException e) { - LOG.error("Failed to parse persistent memory volume " + volume, e); + LOG.error("Failed to parse persistent memory volume " + volumes[n], e); continue; } catch (IOException e) { - LOG.error("Bad persistent memory volume: " + volume, e); + LOG.error("Bad persistent memory volume: " + volumes[n], e); continue; } - pmemVolumes.add(volume); - LOG.info("Added persistent memory - " + volume); } count = pmemVolumes.size(); if (count == 0) { throw new IOException( "At least one valid persistent memory volume is required!"); } + cleanup(); + } + + void cleanup() { + // Remove all files under the volume. + for (String pmemDir: pmemVolumes) { + try { + FileUtils.cleanDirectory(new File(pmemDir)); + } catch (IOException e) { + LOG.error("Failed to clean up " + pmemDir, e); + } + } } @VisibleForTesting - static void verifyIfValidPmemVolume(File pmemDir) + static File verifyIfValidPmemVolume(File pmemDir) throws IOException { if (!pmemDir.exists()) { final String message = pmemDir + " does not exist"; throw new IOException(message); } - if (!pmemDir.isDirectory()) { final String message = pmemDir + " is not a directory"; throw new IllegalArgumentException(message); } + File realPmemDir = new File(getRealPmemDir(pmemDir.getPath())); + if (!realPmemDir.exists() && !realPmemDir.mkdir()) { + throw new IOException("Failed to create " + realPmemDir.getPath()); + } + String uuidStr = UUID.randomUUID().toString(); - String testFilePath = pmemDir.getPath() + "/.verify.pmem." + uuidStr; + String testFilePath = realPmemDir.getPath() + "/.verify.pmem." + uuidStr; byte[] contents = uuidStr.getBytes("UTF-8"); RandomAccessFile testFile = null; MappedByteBuffer out = null; @@ -203,15 +283,17 @@ static void verifyIfValidPmemVolume(File pmemDir) out = testFile.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, contents.length); if (out == null) { - throw new IOException("Failed to map the test file under " + pmemDir); + throw new IOException( + "Failed to map the test file under " + realPmemDir); } out.put(contents); // Forces to write data to storage device containing the mapped file out.force(); + return realPmemDir; } catch (IOException e) { throw new IOException( "Exception while writing data to persistent storage dir: " + - pmemDir, e); + realPmemDir, e); } finally { if (out != null) { out.clear(); @@ -229,18 +311,38 @@ static void verifyIfValidPmemVolume(File pmemDir) } } + public static String getRealPmemDir(String rawPmemDir) { + return new File(rawPmemDir, CACHE_DIR).getAbsolutePath(); + } + /** * Choose a persistent memory volume based on a specific algorithm. * Currently it is a round-robin policy. * * TODO: Refine volume selection policy by considering storage utilization. */ - Byte getOneVolumeIndex() throws IOException { - if (count != 0) { - return (byte)(i++ % count); - } else { + synchronized Byte chooseVolume(long bytesCount) throws IOException { + if (count == 0) { throw new IOException("No usable persistent memory is found"); } + int k = 0; + long maxAvailableSpace = 0L; + while (k++ != count) { + if (nextIndex == count) { + nextIndex = 0; + } + byte index = nextIndex++; + long availableBytes = usedBytesCounts.get(index).getAvailableBytes(); + if (availableBytes >= bytesCount) { + return index; + } + if (availableBytes > maxAvailableSpace) { + maxAvailableSpace = availableBytes; + } + } + throw new IOException("There is no enough persistent memory space " + + "for caching. The current max available space is " + + maxAvailableSpace + ", but " + bytesCount + "is required."); } @VisibleForTesting @@ -276,7 +378,7 @@ public String inferCacheFilePath(Byte volumeIndex, ExtendedBlockId key) { /** * The cache file path is pmemVolume/BlockPoolId-BlockId. */ - public String getCacheFilePath(ExtendedBlockId key) { + public String getCachePath(ExtendedBlockId key) { Byte volumeIndex = blockKeyToVolume.get(key); if (volumeIndex == null) { return null; @@ -288,19 +390,4 @@ public String getCacheFilePath(ExtendedBlockId key) { Map getBlockKeyToVolume() { return blockKeyToVolume; } - - /** - * Add cached block's ExtendedBlockId and its cache volume index to a map - * after cache. - */ - public void afterCache(ExtendedBlockId key, Byte volumeIndex) { - blockKeyToVolume.put(key, volumeIndex); - } - - /** - * Remove the record in blockKeyToVolume for uncached block after uncache. - */ - public void afterUncache(ExtendedBlockId key) { - blockKeyToVolume.remove(key); - } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index ba80a3fe3b..8c575af90c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -2526,18 +2526,6 @@ - - dfs.datanode.cache.loader.class - org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MemoryMappableBlockLoader - - Currently, the available cache loaders are MemoryMappableBlockLoader, PmemMappableBlockLoader. - By default, MemoryMappableBlockLoader is used and it maps block replica into memory. - PmemMappableBlockLoader can map block to persistent memory with mapped byte buffer, which is - implemented by Java code. The value of dfs.datanode.cache.pmem.dirs specifies the persistent - memory directory. - - - dfs.datanode.max.locked.memory 0 @@ -2555,18 +2543,6 @@ - - dfs.datanode.cache.pmem.capacity - 0 - - The amount of persistent memory in bytes that can be used to cache block - replicas to persistent memory. Currently, persistent memory is only enabled - in HDFS Centralized Cache Management feature. - - By default, this parameter is 0, which disables persistent memory caching. - - - dfs.datanode.cache.pmem.dirs diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestCacheByPmemMappableBlockLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestCacheByPmemMappableBlockLoader.java index 9b4f06fc9d..58812db7b4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestCacheByPmemMappableBlockLoader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestCacheByPmemMappableBlockLoader.java @@ -21,8 +21,6 @@ import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_LOADER_CLASS; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_CAPACITY_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_DIRS_KEY; import static org.apache.hadoop.test.MetricsAsserts.getMetrics; import static org.junit.Assert.assertEquals; @@ -139,14 +137,11 @@ public void setUp() throws Exception { conf.setInt(DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_KEY, 10); // Configuration for pmem cache - conf.set(DFS_DATANODE_CACHE_LOADER_CLASS, - "org.apache.hadoop.hdfs.server.datanode." + - "fsdataset.impl.PmemMappableBlockLoader"); new File(PMEM_DIR_0).getAbsoluteFile().mkdir(); new File(PMEM_DIR_1).getAbsoluteFile().mkdir(); // Configure two bogus pmem volumes conf.set(DFS_DATANODE_CACHE_PMEM_DIRS_KEY, PMEM_DIR_0 + "," + PMEM_DIR_1); - conf.setLong(DFS_DATANODE_CACHE_PMEM_CAPACITY_KEY, CACHE_CAPACITY); + PmemVolumeManager.setMaxBytes((long) (CACHE_CAPACITY * 0.5)); prevCacheManipulator = NativeIO.POSIX.getCacheManipulator(); NativeIO.POSIX.setCacheManipulator(new NoMlockCacheManipulator()); @@ -183,18 +178,17 @@ protected static void shutdownCluster() { @Test public void testPmemVolumeManager() throws IOException { - PmemVolumeManager pmemVolumeManager = - cacheLoader.getPmemVolumeManager(); + PmemVolumeManager pmemVolumeManager = PmemVolumeManager.getInstance(); assertNotNull(pmemVolumeManager); assertEquals(CACHE_CAPACITY, pmemVolumeManager.getCacheCapacity()); // Test round-robin selection policy long count1 = 0, count2 = 0; for (int i = 0; i < 10; i++) { - Byte index = pmemVolumeManager.getOneVolumeIndex(); + Byte index = pmemVolumeManager.chooseVolume(BLOCK_SIZE); String volume = pmemVolumeManager.getVolumeByIndex(index); - if (volume.equals(PMEM_DIR_0)) { + if (volume.equals(PmemVolumeManager.getRealPmemDir(PMEM_DIR_0))) { count1++; - } else if (volume.equals(PMEM_DIR_1)) { + } else if (volume.equals(PmemVolumeManager.getRealPmemDir(PMEM_DIR_1))) { count2++; } else { fail("Unexpected persistent storage location:" + volume); @@ -254,7 +248,7 @@ public Boolean get() { // The pmem cache space is expected to have been used up. assertEquals(CACHE_CAPACITY, cacheManager.getPmemCacheUsed()); Map blockKeyToVolume = - cacheLoader.getPmemVolumeManager().getBlockKeyToVolume(); + PmemVolumeManager.getInstance().getBlockKeyToVolume(); // All block keys should be kept in blockKeyToVolume assertEquals(blockKeyToVolume.size(), maxCacheBlocksNum); assertTrue(blockKeyToVolume.keySet().containsAll(blockKeys)); @@ -266,11 +260,13 @@ public Boolean get() { // to pmem. assertNotNull(cachePath); String expectFileName = - cacheLoader.getPmemVolumeManager().getCacheFileName(key); + PmemVolumeManager.getInstance().getCacheFileName(key); if (cachePath.startsWith(PMEM_DIR_0)) { - assertTrue(cachePath.equals(PMEM_DIR_0 + "/" + expectFileName)); + assertTrue(cachePath.equals(PmemVolumeManager + .getRealPmemDir(PMEM_DIR_0) + "/" + expectFileName)); } else if (cachePath.startsWith(PMEM_DIR_1)) { - assertTrue(cachePath.equals(PMEM_DIR_1 + "/" + expectFileName)); + assertTrue(cachePath.equals(PmemVolumeManager + .getRealPmemDir(PMEM_DIR_1) + "/" + expectFileName)); } else { fail("The cache path is not the expected one: " + cachePath); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetCache.java index 8f8da109e7..ac9c2fc832 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetCache.java @@ -401,9 +401,10 @@ public void testFilesExceedMaxLockedMemory() throws Exception { GenericTestUtils.waitFor(new Supplier() { @Override public Boolean get() { + // check the log reported by FsDataSetCache + // in the case that cache capacity is exceeded. int lines = appender.countLinesWithMessage( - "more bytes in the cache: " + - DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY); + "could not reserve more bytes in the cache: "); return lines > 0; } }, 500, 30000);