HDFS-14401. Refine the implementation for HDFS cache on SCM. Contributed by Feilong He.

This commit is contained in:
Rakesh Radhakrishnan 2019-05-08 17:20:21 +05:30
parent 96dc5cedfe
commit 9b0aace1e6
13 changed files with 256 additions and 202 deletions

View File

@ -26,8 +26,6 @@
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRackFaultTolerant;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MappableBlockLoader;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MemoryMappableBlockLoader;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaLruTracker;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.ReservedSpaceCalculator;
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
@ -392,22 +390,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_DATANODE_CACHE_REVOCATION_POLLING_MS = "dfs.datanode.cache.revocation.polling.ms";
public static final long DFS_DATANODE_CACHE_REVOCATION_POLLING_MS_DEFAULT = 500L;
// Currently, the available cache loaders are MemoryMappableBlockLoader,
// PmemMappableBlockLoader. MemoryMappableBlockLoader is the default cache
// loader to cache block replica to memory.
public static final String DFS_DATANODE_CACHE_LOADER_CLASS =
"dfs.datanode.cache.loader.class";
public static final Class<? extends MappableBlockLoader>
DFS_DATANODE_CACHE_LOADER_CLASS_DEFAULT =
MemoryMappableBlockLoader.class;
// Multiple dirs separated by "," are acceptable.
public static final String DFS_DATANODE_CACHE_PMEM_DIRS_KEY =
"dfs.datanode.cache.pmem.dirs";
public static final String DFS_DATANODE_CACHE_PMEM_DIRS_DEFAULT = "";
// The cache capacity of persistent memory
public static final String DFS_DATANODE_CACHE_PMEM_CAPACITY_KEY =
"dfs.datanode.cache.pmem.capacity";
public static final long DFS_DATANODE_CACHE_PMEM_CAPACITY_DEFAULT = 0L;
public static final String DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_KEY = "dfs.namenode.datanode.registration.ip-hostname-check";
public static final boolean DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_DEFAULT = true;

View File

@ -27,10 +27,6 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHOLD_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_LOADER_CLASS;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_LOADER_CLASS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_CAPACITY_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_CAPACITY_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_DIRS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_LIFELINE_INTERVAL_SECONDS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_PERSIST;
@ -71,7 +67,6 @@
import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MappableBlockLoader;
import org.apache.hadoop.security.SaslPropertiesResolver;
import java.util.concurrent.TimeUnit;
@ -121,9 +116,7 @@ public class DNConf {
final long xceiverStopTimeout;
final long restartReplicaExpiry;
private final Class<? extends MappableBlockLoader> cacheLoaderClass;
final long maxLockedMemory;
private final long maxLockedPmem;
private final String[] pmemDirs;
private final long bpReadyTimeout;
@ -266,17 +259,10 @@ public DNConf(final Configurable dn) {
DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY,
DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_DEFAULT);
this.cacheLoaderClass = getConf().getClass(DFS_DATANODE_CACHE_LOADER_CLASS,
DFS_DATANODE_CACHE_LOADER_CLASS_DEFAULT, MappableBlockLoader.class);
this.maxLockedMemory = getConf().getLongBytes(
DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
DFS_DATANODE_MAX_LOCKED_MEMORY_DEFAULT);
this.maxLockedPmem = getConf().getLongBytes(
DFS_DATANODE_CACHE_PMEM_CAPACITY_KEY,
DFS_DATANODE_CACHE_PMEM_CAPACITY_DEFAULT);
this.pmemDirs = getConf().getTrimmedStrings(
DFS_DATANODE_CACHE_PMEM_DIRS_KEY);
@ -342,10 +328,6 @@ public long getMaxLockedMemory() {
return maxLockedMemory;
}
public long getMaxLockedPmem() {
return maxLockedPmem;
}
/**
* Returns true if connect to datanode via hostname
*
@ -449,10 +431,6 @@ int getMaxDataLength() {
return maxDataLength;
}
public Class<? extends MappableBlockLoader> getCacheLoaderClass() {
return cacheLoaderClass;
}
public String[] getPmemVolumes() {
return pmemDirs;
}

View File

@ -53,7 +53,6 @@
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.DNConf;
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -183,9 +182,8 @@ public FsDatasetCache(FsDatasetImpl dataset) throws IOException {
this.memCacheStats = new MemoryCacheStats(
dataset.datanode.getDnConf().getMaxLockedMemory());
Class<? extends MappableBlockLoader> cacheLoaderClass =
dataset.datanode.getDnConf().getCacheLoaderClass();
this.cacheLoader = ReflectionUtils.newInstance(cacheLoaderClass, null);
this.cacheLoader = MappableBlockLoaderFactory.createCacheLoader(
this.getDnConf());
cacheLoader.initialize(this);
}
@ -213,7 +211,7 @@ String getReplicaCachePath(String bpid, long blockId) {
return null;
}
ExtendedBlockId key = new ExtendedBlockId(blockId, bpid);
return cacheLoader.getCachedPath(key);
return PmemVolumeManager.getInstance().getCachePath(key);
}
/**
@ -380,14 +378,13 @@ public void run() {
MappableBlock mappableBlock = null;
ExtendedBlock extBlk = new ExtendedBlock(key.getBlockPoolId(),
key.getBlockId(), length, genstamp);
long newUsedBytes = cacheLoader.reserve(length);
long newUsedBytes = cacheLoader.reserve(key, length);
boolean reservedBytes = false;
try {
if (newUsedBytes < 0) {
LOG.warn("Failed to cache " + key + ": could not reserve " + length +
" more bytes in the cache: " +
cacheLoader.getCacheCapacityConfigKey() +
" of " + cacheLoader.getCacheCapacity() + " exceeded.");
LOG.warn("Failed to cache " + key + ": could not reserve " +
"more bytes in the cache: " + cacheLoader.getCacheCapacity() +
" exceeded when try to reserve " + length + "bytes.");
return;
}
reservedBytes = true;
@ -442,10 +439,10 @@ public void run() {
IOUtils.closeQuietly(metaIn);
if (!success) {
if (reservedBytes) {
cacheLoader.release(length);
cacheLoader.release(key, length);
}
LOG.debug("Caching of {} was aborted. We are now caching only {} "
+ "bytes in total.", key, memCacheStats.getCacheUsed());
+ "bytes in total.", key, cacheLoader.getCacheUsed());
IOUtils.closeQuietly(mappableBlock);
numBlocksFailedToCache.incrementAndGet();
@ -519,7 +516,8 @@ public void run() {
synchronized (FsDatasetCache.this) {
mappableBlockMap.remove(key);
}
long newUsedBytes = cacheLoader.release(value.mappableBlock.getLength());
long newUsedBytes = cacheLoader.
release(key, value.mappableBlock.getLength());
numBlocksCached.addAndGet(-1);
dataset.datanode.getMetrics().incrBlocksUncached(1);
if (revocationTimeMs != 0) {
@ -592,4 +590,11 @@ public synchronized boolean isCached(String bpid, long blockId) {
MappableBlockLoader getCacheLoader() {
return cacheLoader;
}
/**
* This method can be executed during DataNode shutdown.
*/
void shutdown() {
cacheLoader.shutdown();
}
}

View File

@ -2328,6 +2328,8 @@ public void shutdown() {
"from LazyWriter.join");
}
}
cacheManager.shutdown();
}
@Override // FSDatasetMBean

View File

@ -65,26 +65,25 @@ abstract MappableBlock load(long length, FileInputStream blockIn,
/**
* Try to reserve some given bytes.
*
* @param key The ExtendedBlockId for a block.
*
* @param bytesCount The number of bytes to add.
*
* @return The new number of usedBytes if we succeeded;
* -1 if we failed.
*/
abstract long reserve(long bytesCount);
abstract long reserve(ExtendedBlockId key, long bytesCount);
/**
* Release some bytes that we're using.
*
* @param key The ExtendedBlockId for a block.
*
* @param bytesCount The number of bytes to release.
*
* @return The new number of usedBytes.
*/
abstract long release(long bytesCount);
/**
* Get the config key of cache capacity.
*/
abstract String getCacheCapacityConfigKey();
abstract long release(ExtendedBlockId key, long bytesCount);
/**
* Get the approximate amount of cache space used.
@ -102,9 +101,11 @@ abstract MappableBlock load(long length, FileInputStream blockIn,
abstract boolean isTransientCache();
/**
* Get a cache file path if applicable. Otherwise return null.
* Clean up cache, can be used during DataNode shutdown.
*/
abstract String getCachedPath(ExtendedBlockId key);
void shutdown() {
// Do nothing.
}
/**
* Reads bytes into a buffer until EOF or the buffer's limit is reached.

View File

@ -0,0 +1,47 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.server.datanode.DNConf;
/**
* Creates MappableBlockLoader.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public final class MappableBlockLoaderFactory {
private MappableBlockLoaderFactory() {
// Prevent instantiation
}
/**
* Create a specific cache loader according to the configuration.
* If persistent memory volume is not configured, return a cache loader
* for DRAM cache. Otherwise, return a cache loader for pmem cache.
*/
public static MappableBlockLoader createCacheLoader(DNConf conf) {
if (conf.getPmemVolumes() == null || conf.getPmemVolumes().length == 0) {
return new MemoryMappableBlockLoader();
}
return new PmemMappableBlockLoader();
}
}

View File

@ -22,11 +22,12 @@
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.util.DataChecksum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedInputStream;
import java.io.DataInputStream;
@ -42,11 +43,13 @@
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class MemoryMappableBlockLoader extends MappableBlockLoader {
private static final Logger LOG =
LoggerFactory.getLogger(MemoryMappableBlockLoader.class);
private MemoryCacheStats memCacheStats;
@Override
void initialize(FsDatasetCache cacheManager) throws IOException {
LOG.info("Initializing cache loader: MemoryMappableBlockLoader.");
this.memCacheStats = cacheManager.getMemCacheStats();
}
@ -148,11 +151,6 @@ private void verifyChecksum(long length, FileInputStream metaIn,
}
}
@Override
public String getCacheCapacityConfigKey() {
return DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
}
@Override
public long getCacheUsed() {
return memCacheStats.getCacheUsed();
@ -164,12 +162,12 @@ public long getCacheCapacity() {
}
@Override
long reserve(long bytesCount) {
long reserve(ExtendedBlockId key, long bytesCount) {
return memCacheStats.reserve(bytesCount);
}
@Override
long release(long bytesCount) {
long release(ExtendedBlockId key, long bytesCount) {
return memCacheStats.release(bytesCount);
}
@ -177,9 +175,4 @@ long release(long bytesCount) {
public boolean isTransientCache() {
return true;
}
@Override
public String getCachedPath(ExtendedBlockId key) {
return null;
}
}

View File

@ -18,12 +18,10 @@
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.server.datanode.DNConf;
@ -53,14 +51,10 @@ public class PmemMappableBlockLoader extends MappableBlockLoader {
@Override
void initialize(FsDatasetCache cacheManager) throws IOException {
LOG.info("Initializing cache loader: PmemMappableBlockLoader.");
DNConf dnConf = cacheManager.getDnConf();
this.pmemVolumeManager = new PmemVolumeManager(dnConf.getMaxLockedPmem(),
dnConf.getPmemVolumes());
}
@VisibleForTesting
PmemVolumeManager getPmemVolumeManager() {
return pmemVolumeManager;
PmemVolumeManager.init(dnConf.getPmemVolumes());
pmemVolumeManager = PmemVolumeManager.getInstance();
}
/**
@ -69,7 +63,7 @@ PmemVolumeManager getPmemVolumeManager() {
* Map the block and verify its checksum.
*
* The block will be mapped to PmemDir/BlockPoolId-BlockId, in which PmemDir
* is a persistent memory volume selected by getOneLocation() method.
* is a persistent memory volume chosen by PmemVolumeManager.
*
* @param length The current length of the block.
* @param blockIn The block input stream. Should be positioned at the
@ -100,8 +94,7 @@ MappableBlock load(long length, FileInputStream blockIn,
throw new IOException("Block InputStream has no FileChannel.");
}
Byte volumeIndex = pmemVolumeManager.getOneVolumeIndex();
filePath = pmemVolumeManager.inferCacheFilePath(volumeIndex, key);
filePath = pmemVolumeManager.getCachePath(key);
file = new RandomAccessFile(filePath, "rw");
out = file.getChannel().
map(FileChannel.MapMode.READ_WRITE, 0, length);
@ -111,9 +104,7 @@ MappableBlock load(long length, FileInputStream blockIn,
}
verifyChecksumAndMapBlock(out, length, metaIn, blockChannel,
blockFileName);
mappableBlock = new PmemMappedBlock(
length, volumeIndex, key, pmemVolumeManager);
pmemVolumeManager.afterCache(key, volumeIndex);
mappableBlock = new PmemMappedBlock(length, key);
LOG.info("Successfully cached one replica:{} into persistent memory"
+ ", [cached path={}, length={}]", key, filePath, length);
} finally {
@ -123,6 +114,7 @@ MappableBlock load(long length, FileInputStream blockIn,
}
IOUtils.closeQuietly(file);
if (mappableBlock == null) {
LOG.debug("Delete {} due to unsuccessful mapping.", filePath);
FsDatasetUtil.deleteMappedFile(filePath);
}
}
@ -193,11 +185,6 @@ private void verifyChecksumAndMapBlock(
}
}
@Override
public String getCacheCapacityConfigKey() {
return DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_CAPACITY_KEY;
}
@Override
public long getCacheUsed() {
return pmemVolumeManager.getCacheUsed();
@ -209,13 +196,13 @@ public long getCacheCapacity() {
}
@Override
long reserve(long bytesCount) {
return pmemVolumeManager.reserve(bytesCount);
long reserve(ExtendedBlockId key, long bytesCount) {
return pmemVolumeManager.reserve(key, bytesCount);
}
@Override
long release(long bytesCount) {
return pmemVolumeManager.release(bytesCount);
long release(ExtendedBlockId key, long bytesCount) {
return pmemVolumeManager.release(key, bytesCount);
}
@Override
@ -224,7 +211,8 @@ public boolean isTransientCache() {
}
@Override
public String getCachedPath(ExtendedBlockId key) {
return pmemVolumeManager.getCacheFilePath(key);
void shutdown() {
LOG.info("Clean up cache on persistent memory during shutdown.");
PmemVolumeManager.getInstance().cleanup();
}
}

View File

@ -35,18 +35,13 @@
public class PmemMappedBlock implements MappableBlock {
private static final Logger LOG =
LoggerFactory.getLogger(PmemMappedBlock.class);
private final PmemVolumeManager pmemVolumeManager;
private long length;
private Byte volumeIndex = null;
private ExtendedBlockId key;
PmemMappedBlock(long length, Byte volumeIndex, ExtendedBlockId key,
PmemVolumeManager pmemVolumeManager) {
PmemMappedBlock(long length, ExtendedBlockId key) {
assert length > 0;
this.length = length;
this.volumeIndex = volumeIndex;
this.key = key;
this.pmemVolumeManager = pmemVolumeManager;
}
@Override
@ -57,10 +52,9 @@ public long getLength() {
@Override
public void close() {
String cacheFilePath =
pmemVolumeManager.inferCacheFilePath(volumeIndex, key);
PmemVolumeManager.getInstance().getCachePath(key);
try {
FsDatasetUtil.deleteMappedFile(cacheFilePath);
pmemVolumeManager.afterUncache(key);
LOG.info("Successfully uncached one replica:{} from persistent memory"
+ ", [cached path={}, length={}]", key, cacheFilePath, length);
} catch (IOException e) {

View File

@ -35,6 +35,7 @@
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
@ -45,14 +46,19 @@
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class PmemVolumeManager {
public final class PmemVolumeManager {
/**
* Counts used bytes for persistent memory.
*/
private class UsedBytesCount {
private static class UsedBytesCount {
private final long maxBytes;
private final AtomicLong usedBytes = new AtomicLong(0);
UsedBytesCount(long maxBytes) {
this.maxBytes = maxBytes;
}
/**
* Try to reserve more bytes.
*
@ -65,7 +71,7 @@ long reserve(long bytesCount) {
while (true) {
long cur = usedBytes.get();
long next = cur + bytesCount;
if (next > cacheCapacity) {
if (next > maxBytes) {
return -1;
}
if (usedBytes.compareAndSet(cur, next)) {
@ -85,42 +91,76 @@ long release(long bytesCount) {
return usedBytes.addAndGet(-bytesCount);
}
long get() {
long getUsedBytes() {
return usedBytes.get();
}
long getMaxBytes() {
return maxBytes;
}
long getAvailableBytes() {
return maxBytes - usedBytes.get();
}
}
private static final Logger LOG =
LoggerFactory.getLogger(PmemVolumeManager.class);
public static final String CACHE_DIR = "hdfs_pmem_cache";
private static PmemVolumeManager pmemVolumeManager = null;
private final ArrayList<String> pmemVolumes = new ArrayList<>();
// Maintain which pmem volume a block is cached to.
private final Map<ExtendedBlockId, Byte> blockKeyToVolume =
new ConcurrentHashMap<>();
private final UsedBytesCount usedBytesCount;
private final List<UsedBytesCount> usedBytesCounts = new ArrayList<>();
/**
* The total cache capacity in bytes of persistent memory.
* It is 0L if the specific mappableBlockLoader couldn't cache data to pmem.
*/
private final long cacheCapacity;
private long cacheCapacity;
private static long maxBytesPerPmem = -1;
private int count = 0;
// Strict atomic operation is not guaranteed for the performance sake.
private int i = 0;
private byte nextIndex = 0;
PmemVolumeManager(long maxBytes, String[] pmemVolumesConfigured)
throws IOException {
if (pmemVolumesConfigured == null || pmemVolumesConfigured.length == 0) {
private PmemVolumeManager(String[] pmemVolumesConfig) throws IOException {
if (pmemVolumesConfig == null || pmemVolumesConfig.length == 0) {
throw new IOException("The persistent memory volume, " +
DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_DIRS_KEY +
" is not configured!");
}
this.loadVolumes(pmemVolumesConfigured);
this.usedBytesCount = new UsedBytesCount();
this.cacheCapacity = maxBytes;
this.loadVolumes(pmemVolumesConfig);
cacheCapacity = 0L;
for (UsedBytesCount counter : usedBytesCounts) {
cacheCapacity += counter.getMaxBytes();
}
}
public synchronized static void init(String[] pmemVolumesConfig)
throws IOException {
if (pmemVolumeManager == null) {
pmemVolumeManager = new PmemVolumeManager(pmemVolumesConfig);
}
}
public static PmemVolumeManager getInstance() {
if (pmemVolumeManager == null) {
throw new RuntimeException(
"The pmemVolumeManager should be instantiated!");
}
return pmemVolumeManager;
}
@VisibleForTesting
public static void setMaxBytes(long maxBytes) {
maxBytesPerPmem = maxBytes;
}
public long getCacheUsed() {
return usedBytesCount.get();
long usedBytes = 0L;
for (UsedBytesCount counter : usedBytesCounts) {
usedBytes += counter.getUsedBytes();
}
return usedBytes;
}
public long getCacheCapacity() {
@ -130,24 +170,40 @@ public long getCacheCapacity() {
/**
* Try to reserve more bytes on persistent memory.
*
* @param key The ExtendedBlockId for a block.
*
* @param bytesCount The number of bytes to add.
*
* @return The new number of usedBytes if we succeeded;
* -1 if we failed.
*/
long reserve(long bytesCount) {
return usedBytesCount.reserve(bytesCount);
synchronized long reserve(ExtendedBlockId key, long bytesCount) {
try {
byte index = chooseVolume(bytesCount);
long usedBytes = usedBytesCounts.get(index).reserve(bytesCount);
// Put the entry into blockKeyToVolume if reserving bytes succeeded.
if (usedBytes > 0) {
blockKeyToVolume.put(key, index);
}
return usedBytes;
} catch (IOException e) {
LOG.warn(e.getMessage());
return -1L;
}
}
/**
* Release some bytes that we're using on persistent memory.
*
* @param key The ExtendedBlockId for a block.
*
* @param bytesCount The number of bytes to release.
*
* @return The new number of usedBytes.
*/
long release(long bytesCount) {
return usedBytesCount.release(bytesCount);
long release(ExtendedBlockId key, long bytesCount) {
Byte index = blockKeyToVolume.remove(key);
return usedBytesCounts.get(index).release(bytesCount);
}
/**
@ -155,46 +211,70 @@ long release(long bytesCount) {
*
* @throws IOException If there is no available pmem volume.
*/
private void loadVolumes(String[] volumes) throws IOException {
private void loadVolumes(String[] volumes)
throws IOException {
// Check whether the volume exists
for (String volume: volumes) {
for (byte n = 0; n < volumes.length; n++) {
try {
File pmemDir = new File(volume);
verifyIfValidPmemVolume(pmemDir);
// Remove all files under the volume.
FileUtils.cleanDirectory(pmemDir);
File pmemDir = new File(volumes[n]);
File realPmemDir = verifyIfValidPmemVolume(pmemDir);
this.pmemVolumes.add(realPmemDir.getPath());
long maxBytes;
if (maxBytesPerPmem == -1) {
maxBytes = realPmemDir.getUsableSpace();
} else {
maxBytes = maxBytesPerPmem;
}
UsedBytesCount usedBytesCount = new UsedBytesCount(maxBytes);
this.usedBytesCounts.add(usedBytesCount);
LOG.info("Added persistent memory - {} with size={}",
volumes[n], maxBytes);
} catch (IllegalArgumentException e) {
LOG.error("Failed to parse persistent memory volume " + volume, e);
LOG.error("Failed to parse persistent memory volume " + volumes[n], e);
continue;
} catch (IOException e) {
LOG.error("Bad persistent memory volume: " + volume, e);
LOG.error("Bad persistent memory volume: " + volumes[n], e);
continue;
}
pmemVolumes.add(volume);
LOG.info("Added persistent memory - " + volume);
}
count = pmemVolumes.size();
if (count == 0) {
throw new IOException(
"At least one valid persistent memory volume is required!");
}
cleanup();
}
void cleanup() {
// Remove all files under the volume.
for (String pmemDir: pmemVolumes) {
try {
FileUtils.cleanDirectory(new File(pmemDir));
} catch (IOException e) {
LOG.error("Failed to clean up " + pmemDir, e);
}
}
}
@VisibleForTesting
static void verifyIfValidPmemVolume(File pmemDir)
static File verifyIfValidPmemVolume(File pmemDir)
throws IOException {
if (!pmemDir.exists()) {
final String message = pmemDir + " does not exist";
throw new IOException(message);
}
if (!pmemDir.isDirectory()) {
final String message = pmemDir + " is not a directory";
throw new IllegalArgumentException(message);
}
File realPmemDir = new File(getRealPmemDir(pmemDir.getPath()));
if (!realPmemDir.exists() && !realPmemDir.mkdir()) {
throw new IOException("Failed to create " + realPmemDir.getPath());
}
String uuidStr = UUID.randomUUID().toString();
String testFilePath = pmemDir.getPath() + "/.verify.pmem." + uuidStr;
String testFilePath = realPmemDir.getPath() + "/.verify.pmem." + uuidStr;
byte[] contents = uuidStr.getBytes("UTF-8");
RandomAccessFile testFile = null;
MappedByteBuffer out = null;
@ -203,15 +283,17 @@ static void verifyIfValidPmemVolume(File pmemDir)
out = testFile.getChannel().map(FileChannel.MapMode.READ_WRITE, 0,
contents.length);
if (out == null) {
throw new IOException("Failed to map the test file under " + pmemDir);
throw new IOException(
"Failed to map the test file under " + realPmemDir);
}
out.put(contents);
// Forces to write data to storage device containing the mapped file
out.force();
return realPmemDir;
} catch (IOException e) {
throw new IOException(
"Exception while writing data to persistent storage dir: " +
pmemDir, e);
realPmemDir, e);
} finally {
if (out != null) {
out.clear();
@ -229,18 +311,38 @@ static void verifyIfValidPmemVolume(File pmemDir)
}
}
public static String getRealPmemDir(String rawPmemDir) {
return new File(rawPmemDir, CACHE_DIR).getAbsolutePath();
}
/**
* Choose a persistent memory volume based on a specific algorithm.
* Currently it is a round-robin policy.
*
* TODO: Refine volume selection policy by considering storage utilization.
*/
Byte getOneVolumeIndex() throws IOException {
if (count != 0) {
return (byte)(i++ % count);
} else {
synchronized Byte chooseVolume(long bytesCount) throws IOException {
if (count == 0) {
throw new IOException("No usable persistent memory is found");
}
int k = 0;
long maxAvailableSpace = 0L;
while (k++ != count) {
if (nextIndex == count) {
nextIndex = 0;
}
byte index = nextIndex++;
long availableBytes = usedBytesCounts.get(index).getAvailableBytes();
if (availableBytes >= bytesCount) {
return index;
}
if (availableBytes > maxAvailableSpace) {
maxAvailableSpace = availableBytes;
}
}
throw new IOException("There is no enough persistent memory space " +
"for caching. The current max available space is " +
maxAvailableSpace + ", but " + bytesCount + "is required.");
}
@VisibleForTesting
@ -276,7 +378,7 @@ public String inferCacheFilePath(Byte volumeIndex, ExtendedBlockId key) {
/**
* The cache file path is pmemVolume/BlockPoolId-BlockId.
*/
public String getCacheFilePath(ExtendedBlockId key) {
public String getCachePath(ExtendedBlockId key) {
Byte volumeIndex = blockKeyToVolume.get(key);
if (volumeIndex == null) {
return null;
@ -288,19 +390,4 @@ public String getCacheFilePath(ExtendedBlockId key) {
Map<ExtendedBlockId, Byte> getBlockKeyToVolume() {
return blockKeyToVolume;
}
/**
* Add cached block's ExtendedBlockId and its cache volume index to a map
* after cache.
*/
public void afterCache(ExtendedBlockId key, Byte volumeIndex) {
blockKeyToVolume.put(key, volumeIndex);
}
/**
* Remove the record in blockKeyToVolume for uncached block after uncache.
*/
public void afterUncache(ExtendedBlockId key) {
blockKeyToVolume.remove(key);
}
}

View File

@ -2526,18 +2526,6 @@
</description>
</property>
<property>
<name>dfs.datanode.cache.loader.class</name>
<value>org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MemoryMappableBlockLoader</value>
<description>
Currently, the available cache loaders are MemoryMappableBlockLoader, PmemMappableBlockLoader.
By default, MemoryMappableBlockLoader is used and it maps block replica into memory.
PmemMappableBlockLoader can map block to persistent memory with mapped byte buffer, which is
implemented by Java code. The value of dfs.datanode.cache.pmem.dirs specifies the persistent
memory directory.
</description>
</property>
<property>
<name>dfs.datanode.max.locked.memory</name>
<value>0</value>
@ -2555,18 +2543,6 @@
</description>
</property>
<property>
<name>dfs.datanode.cache.pmem.capacity</name>
<value>0</value>
<description>
The amount of persistent memory in bytes that can be used to cache block
replicas to persistent memory. Currently, persistent memory is only enabled
in HDFS Centralized Cache Management feature.
By default, this parameter is 0, which disables persistent memory caching.
</description>
</property>
<property>
<name>dfs.datanode.cache.pmem.dirs</name>
<value></value>

View File

@ -21,8 +21,6 @@
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_LOADER_CLASS;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_CAPACITY_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_DIRS_KEY;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.junit.Assert.assertEquals;
@ -139,14 +137,11 @@ public void setUp() throws Exception {
conf.setInt(DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_KEY, 10);
// Configuration for pmem cache
conf.set(DFS_DATANODE_CACHE_LOADER_CLASS,
"org.apache.hadoop.hdfs.server.datanode." +
"fsdataset.impl.PmemMappableBlockLoader");
new File(PMEM_DIR_0).getAbsoluteFile().mkdir();
new File(PMEM_DIR_1).getAbsoluteFile().mkdir();
// Configure two bogus pmem volumes
conf.set(DFS_DATANODE_CACHE_PMEM_DIRS_KEY, PMEM_DIR_0 + "," + PMEM_DIR_1);
conf.setLong(DFS_DATANODE_CACHE_PMEM_CAPACITY_KEY, CACHE_CAPACITY);
PmemVolumeManager.setMaxBytes((long) (CACHE_CAPACITY * 0.5));
prevCacheManipulator = NativeIO.POSIX.getCacheManipulator();
NativeIO.POSIX.setCacheManipulator(new NoMlockCacheManipulator());
@ -183,18 +178,17 @@ protected static void shutdownCluster() {
@Test
public void testPmemVolumeManager() throws IOException {
PmemVolumeManager pmemVolumeManager =
cacheLoader.getPmemVolumeManager();
PmemVolumeManager pmemVolumeManager = PmemVolumeManager.getInstance();
assertNotNull(pmemVolumeManager);
assertEquals(CACHE_CAPACITY, pmemVolumeManager.getCacheCapacity());
// Test round-robin selection policy
long count1 = 0, count2 = 0;
for (int i = 0; i < 10; i++) {
Byte index = pmemVolumeManager.getOneVolumeIndex();
Byte index = pmemVolumeManager.chooseVolume(BLOCK_SIZE);
String volume = pmemVolumeManager.getVolumeByIndex(index);
if (volume.equals(PMEM_DIR_0)) {
if (volume.equals(PmemVolumeManager.getRealPmemDir(PMEM_DIR_0))) {
count1++;
} else if (volume.equals(PMEM_DIR_1)) {
} else if (volume.equals(PmemVolumeManager.getRealPmemDir(PMEM_DIR_1))) {
count2++;
} else {
fail("Unexpected persistent storage location:" + volume);
@ -254,7 +248,7 @@ public Boolean get() {
// The pmem cache space is expected to have been used up.
assertEquals(CACHE_CAPACITY, cacheManager.getPmemCacheUsed());
Map<ExtendedBlockId, Byte> blockKeyToVolume =
cacheLoader.getPmemVolumeManager().getBlockKeyToVolume();
PmemVolumeManager.getInstance().getBlockKeyToVolume();
// All block keys should be kept in blockKeyToVolume
assertEquals(blockKeyToVolume.size(), maxCacheBlocksNum);
assertTrue(blockKeyToVolume.keySet().containsAll(blockKeys));
@ -266,11 +260,13 @@ public Boolean get() {
// to pmem.
assertNotNull(cachePath);
String expectFileName =
cacheLoader.getPmemVolumeManager().getCacheFileName(key);
PmemVolumeManager.getInstance().getCacheFileName(key);
if (cachePath.startsWith(PMEM_DIR_0)) {
assertTrue(cachePath.equals(PMEM_DIR_0 + "/" + expectFileName));
assertTrue(cachePath.equals(PmemVolumeManager
.getRealPmemDir(PMEM_DIR_0) + "/" + expectFileName));
} else if (cachePath.startsWith(PMEM_DIR_1)) {
assertTrue(cachePath.equals(PMEM_DIR_1 + "/" + expectFileName));
assertTrue(cachePath.equals(PmemVolumeManager
.getRealPmemDir(PMEM_DIR_1) + "/" + expectFileName));
} else {
fail("The cache path is not the expected one: " + cachePath);
}

View File

@ -401,9 +401,10 @@ public void testFilesExceedMaxLockedMemory() throws Exception {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
// check the log reported by FsDataSetCache
// in the case that cache capacity is exceeded.
int lines = appender.countLinesWithMessage(
"more bytes in the cache: " +
DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY);
"could not reserve more bytes in the cache: ");
return lines > 0;
}
}, 500, 30000);