diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt index 961b421833..335c55dc9a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt @@ -13,3 +13,6 @@ HDFS-6925. DataNode should attempt to place replicas on transient storage first if lazyPersist flag is received. (Arpit Agarwal) + HDFS-6926. DN support for saving replicas to persistent storage and + evicting in-memory replicas. (Arpit Agarwal) + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 71a530ba18..580b7a9674 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -123,6 +123,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final long DFS_DATANODE_MAX_LOCKED_MEMORY_DEFAULT = 0; public static final String DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_KEY = "dfs.datanode.fsdatasetcache.max.threads.per.volume"; public static final int DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_DEFAULT = 4; + public static final String DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC = "dfs.datanode.lazywriter.interval.sec"; + public static final int DFS_DATANODE_LAZY_WRITER_INTERVAL_DEFAULT_SEC = 60; public static final String DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT = "dfs.namenode.path.based.cache.block.map.allocation.percent"; public static final float DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT_DEFAULT = 0.25f; @@ -227,6 +229,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final float DFS_NAMENODE_EDIT_LOG_AUTOROLL_MULTIPLIER_THRESHOLD_DEFAULT = 2.0f; public static final String DFS_NAMENODE_EDIT_LOG_AUTOROLL_CHECK_INTERVAL_MS = "dfs.namenode.edit.log.autoroll.check.interval.ms"; public static final int DFS_NAMENODE_EDIT_LOG_AUTOROLL_CHECK_INTERVAL_MS_DEFAULT = 5*60*1000; + + public static final String DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC = "dfs.namenode.lazypersist.file.scrub.interval.sec"; + public static final int DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC_DEFAULT = 5 * 60; public static final String DFS_NAMENODE_EDITS_NOEDITLOGCHANNELFLUSH = "dfs.namenode.edits.noeditlogchannelflush"; public static final boolean DFS_NAMENODE_EDITS_NOEDITLOGCHANNELFLUSH_DEFAULT = false; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java index bbb67fc473..61f1e7e7ca 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java @@ -191,7 +191,7 @@ public LinkedElement getNext() { + hours + " hours for block pool " + bpid); // get the list of blocks and arrange them in random order - List arr = dataset.getFinalizedBlocks(blockPoolId); + List arr = dataset.getFinalizedBlocksOnPersistentStorage(blockPoolId); Collections.shuffle(arr); long scanTime = -1; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java index 4b9656eb8e..693dcab5ea 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java @@ -81,6 +81,7 @@ public class DataStorage extends Storage { final static String STORAGE_DIR_DETACHED = "detach"; public final static String STORAGE_DIR_RBW = "rbw"; public final static String STORAGE_DIR_FINALIZED = "finalized"; + public final static String STORAGE_DIR_LAZY_PERSIST = "lazypersist"; public final static String STORAGE_DIR_TMP = "tmp"; // Set of bpids for which 'trash' is currently enabled. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java index f808e0107f..dc57688834 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java @@ -225,7 +225,7 @@ public ReplicaOutputStreams createStreams(boolean isCreate, } } } else { - // for create, we can use the requested checksum + // for create, we can use the requested checksum checksum = requestedChecksum; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java index 235cd7b9c6..d0d36ba1df 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java @@ -138,8 +138,7 @@ public synchronized V chooseVolume(List volumes, if (mostAvailableAmongLowVolumes < replicaSize || random.nextFloat() < scaledPreferencePercent) { volume = roundRobinPolicyHighAvailable.chooseVolume( - highAvailableVolumes, - replicaSize); + highAvailableVolumes, replicaSize); if (LOG.isDebugEnabled()) { LOG.debug("Volumes are imbalanced. Selecting " + volume + " from high available space volumes for write of block size " @@ -147,8 +146,7 @@ public synchronized V chooseVolume(List volumes, } } else { volume = roundRobinPolicyLowAvailable.chooseVolume( - lowAvailableVolumes, - replicaSize); + lowAvailableVolumes, replicaSize); if (LOG.isDebugEnabled()) { LOG.debug("Volumes are imbalanced. Selecting " + volume + " from low available space volumes for write of block size " diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java index 5f39a3d784..94de48b512 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java @@ -113,6 +113,9 @@ public StorageReport[] getStorageReports(String bpid) /** @return a list of finalized blocks for the given block pool. */ public List getFinalizedBlocks(String bpid); + /** @return a list of finalized blocks for the given block pool. */ + public List getFinalizedBlocksOnPersistentStorage(String bpid); + /** * Check whether the in-memory block record matches the block on the disk, * and, in case that they are not matched, update the record or mark it diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java index af467b93f0..31a254bf22 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java @@ -28,6 +28,7 @@ import java.io.RandomAccessFile; import java.util.Scanner; +import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.DU; import org.apache.hadoop.fs.FileUtil; @@ -61,6 +62,7 @@ class BlockPoolSlice { private final File currentDir; // StorageDirectory/current/bpid/current // directory where finalized replicas are stored private final File finalizedDir; + private final File lazypersistDir; private final File rbwDir; // directory store RBW replica private final File tmpDir; // directory store Temporary replica private static final String DU_CACHE_FILE = "dfsUsed"; @@ -85,12 +87,24 @@ class BlockPoolSlice { this.currentDir = new File(bpDir, DataStorage.STORAGE_DIR_CURRENT); this.finalizedDir = new File( currentDir, DataStorage.STORAGE_DIR_FINALIZED); + this.lazypersistDir = new File(currentDir, DataStorage.STORAGE_DIR_LAZY_PERSIST); if (!this.finalizedDir.exists()) { if (!this.finalizedDir.mkdirs()) { throw new IOException("Failed to mkdirs " + this.finalizedDir); } } + // Delete all checkpointed replicas on startup. + // TODO: We can move checkpointed replicas to the finalized dir and delete + // the copy on RAM_DISK. For now we take the simpler approach. + + FileUtil.fullyDelete(lazypersistDir); + if (!this.lazypersistDir.exists()) { + if (!this.lazypersistDir.mkdirs()) { + throw new IOException("Failed to mkdirs " + this.lazypersistDir); + } + } + // Files that were being written when the datanode was last shutdown // are now moved back to the data directory. It is possible that // in the future, we might want to do some sort of datanode-local @@ -136,6 +150,10 @@ File getFinalizedDir() { return finalizedDir; } + File getLazypersistDir() { + return lazypersistDir; + } + File getRbwDir() { return rbwDir; } @@ -252,12 +270,37 @@ File addBlock(Block b, File f) throws IOException { dfsUsage.incDfsUsed(b.getNumBytes()+metaFile.length()); return blockFile; } - + + File lazyPersistReplica(Block b, File f) throws IOException { + File blockFile = FsDatasetImpl.copyBlockFiles(b, f, lazypersistDir); + File metaFile = FsDatasetUtil.getMetaFile(blockFile, b.getGenerationStamp()); + dfsUsage.incDfsUsed(b.getNumBytes() + metaFile.length()); + return blockFile; + } + + /** + * Move a persisted replica from lazypersist directory to a subdirectory + * under finalized. + */ + File activateSavedReplica(Block b, File blockFile) throws IOException { + final File blockDir = DatanodeUtil.idToBlockDir(finalizedDir, b.getBlockId()); + final File metaFile = FsDatasetUtil.getMetaFile(blockFile, b.getGenerationStamp()); + final File targetBlockFile = new File(blockDir, blockFile.getName()); + final File targetMetaFile = new File(blockDir, metaFile.getName()); + FileUtils.moveFile(blockFile, targetBlockFile); + FsDatasetImpl.LOG.info("Moved " + blockFile + " to " + targetBlockFile); + FileUtils.moveFile(metaFile, targetMetaFile); + FsDatasetImpl.LOG.info("Moved " + metaFile + " to " + targetMetaFile); + return targetBlockFile; + } + void checkDirs() throws DiskErrorException { DiskChecker.checkDirs(finalizedDir); DiskChecker.checkDir(tmpDir); DiskChecker.checkDir(rbwDir); } + + void getVolumeMap(ReplicaMap volumeMap) throws IOException { // add finalized replicas diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index b875497dc8..8643d6bc08 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -27,12 +27,7 @@ import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.Executor; import javax.management.NotCompliantMBeanException; @@ -88,6 +83,7 @@ import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.metrics2.util.MBeans; +import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DiskChecker.DiskErrorException; import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; @@ -111,7 +107,6 @@ class FsDatasetImpl implements FsDatasetSpi { } } - @Override // FsDatasetSpi public List getVolumes() { return volumes.volumes; @@ -204,11 +199,17 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b) final FsVolumeList volumes; final Map storageMap; final FsDatasetAsyncDiskService asyncDiskService; + final Daemon lazyWriter; final FsDatasetCache cacheManager; private final Configuration conf; private final int validVolsRequired; + private volatile boolean fsRunning; final ReplicaMap volumeMap; + final LazyWriteReplicaTracker lazyWriteReplicaTracker; + + private static final int MAX_BLOCK_EVICTIONS_PER_ITERATION = 3; + // Used for synchronizing access to usage stats private final Object statsLock = new Object(); @@ -218,6 +219,7 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b) */ FsDatasetImpl(DataNode datanode, DataStorage storage, Configuration conf ) throws IOException { + this.fsRunning = true; this.datanode = datanode; this.dataStorage = storage; this.conf = conf; @@ -248,6 +250,8 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b) storageMap = new HashMap(); volumeMap = new ReplicaMap(this); + lazyWriteReplicaTracker = new LazyWriteReplicaTracker(this); + @SuppressWarnings("unchecked") final VolumeChoosingPolicy blockChooserImpl = ReflectionUtils.newInstance(conf.getClass( @@ -257,11 +261,17 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b) volumes = new FsVolumeList(volsFailed, blockChooserImpl); asyncDiskService = new FsDatasetAsyncDiskService(datanode); + // TODO: Initialize transientReplicaTracker from blocks on disk. + for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) { addVolume(dataLocations, storage.getStorageDir(idx)); } cacheManager = new FsDatasetCache(this); + lazyWriter = new Daemon(new LazyWriter( + conf.getInt(DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC, + DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_DEFAULT_SEC))); + lazyWriter.start(); registerMBean(datanode.getDatanodeUuid()); } @@ -531,8 +541,8 @@ public synchronized ReplicaInputStreams getTmpInputStreams(ExtendedBlock b, return new ReplicaInputStreams(blockInFile.getFD(), metaInFile.getFD()); } - static File moveBlockFiles(Block b, File srcfile, File destdir - ) throws IOException { + static File moveBlockFiles(Block b, File srcfile, File destdir) + throws IOException { final File dstfile = new File(destdir, b.getBlockName()); final File srcmeta = FsDatasetUtil.getMetaFile(srcfile, b.getGenerationStamp()); final File dstmeta = FsDatasetUtil.getMetaFile(dstfile, b.getGenerationStamp()); @@ -555,6 +565,30 @@ static File moveBlockFiles(Block b, File srcfile, File destdir return dstfile; } + static File copyBlockFiles(Block b, File srcfile, File destdir) + throws IOException { + final File dstfile = new File(destdir, b.getBlockName()); + final File srcmeta = FsDatasetUtil.getMetaFile(srcfile, b.getGenerationStamp()); + final File dstmeta = FsDatasetUtil.getMetaFile(dstfile, b.getGenerationStamp()); + try { + FileUtils.copyFile(srcmeta, dstmeta); + } catch (IOException e) { + throw new IOException("Failed to copy meta file for " + b + + " from " + srcmeta + " to " + dstmeta, e); + } + try { + FileUtils.copyFile(srcfile, dstfile); + } catch (IOException e) { + throw new IOException("Failed to copy block file for " + b + + " from " + srcfile + " to " + dstfile.getAbsolutePath(), e); + } + if (LOG.isDebugEnabled()) { + LOG.debug("addBlock: Moved " + srcmeta + " to " + dstmeta + + " and " + srcfile + " to " + dstfile); + } + return dstfile; + } + static private void truncateBlock(File blockFile, File metaFile, long oldlen, long newlen) throws IOException { LOG.info("truncateBlock: blockFile=" + blockFile @@ -817,6 +851,83 @@ private void bumpReplicaGS(ReplicaInfo replicaInfo, } } + /** + * Attempt to evict one or more transient block replicas we have at least + * spaceNeeded bytes free. + * + * @return true if we were able to free up at least spaceNeeded bytes, false + * otherwise. + */ + private boolean tryToEvictBlocks(final String bpid, final long spaceNeeded) + throws IOException { + + boolean isAvailable = false; + + LOG.info("Attempting to evict blocks from transient storage"); + + // Reverse the map so we can iterate in order of replica creation times, + // evicting oldest replicas one at a time until we have sufficient space. + TreeMultimap lruMap = + lazyWriteReplicaTracker.getLruMap(); + int blocksEvicted = 0; + + // TODO: It is really inefficient to do this with the Object lock held! + // TODO: This logic is here just for prototyping. + // TODO: We should replace it with proactive discard when ram_disk free space + // TODO: falls below a low watermark. That way we avoid fs operations on the + // TODO: hot path with the lock held. + synchronized (this) { + long currentTime = System.currentTimeMillis() / 1000; + for (Map.Entry entry : lruMap.entries()) { + LazyWriteReplicaTracker.ReplicaState lazyWriteReplica = entry.getValue(); + LOG.info("RAM_DISK: Evicting blockId=" + lazyWriteReplica.blockId + + "; block LMT=" + entry.getKey() + + "; currentTime=" + currentTime); + ReplicaInfo replicaInfo = getReplicaInfo(bpid, lazyWriteReplica.blockId); + Preconditions.checkState(replicaInfo.getVolume().isTransientStorage()); + File blockFile = replicaInfo.getBlockFile(); + File metaFile = replicaInfo.getMetaFile(); + long used = blockFile.length() + metaFile.length(); + lazyWriteReplicaTracker.discardReplica(bpid, entry.getValue().blockId, false); + + // Move the persisted replica to the finalized directory of + // the target volume. + BlockPoolSlice bpSlice = + lazyWriteReplica.lazyPersistVolume.getBlockPoolSlice(bpid); + File newBlockFile = bpSlice.activateSavedReplica( + replicaInfo, lazyWriteReplica.savedBlockFile); + + ReplicaInfo newReplicaInfo = + new FinalizedReplica(replicaInfo.getBlockId(), + replicaInfo.getBytesOnDisk(), + replicaInfo.getGenerationStamp(), + lazyWriteReplica.lazyPersistVolume, + newBlockFile.getParentFile()); + + // Update the volumeMap entry. This removes the old entry. + volumeMap.add(bpid, newReplicaInfo); + + // Remove the old replicas. + blockFile.delete(); + metaFile.delete(); + ((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(bpid, used); + ++blocksEvicted; + + if (replicaInfo.getVolume().getAvailable() > spaceNeeded) { + LOG.info("RAM_DISK: freed up " + spaceNeeded + " bytes for new block"); + isAvailable = true; + break; + } + + if (blocksEvicted == MAX_BLOCK_EVICTIONS_PER_ITERATION) { + break; + } + } + } + + return isAvailable; + } + @Override // FsDatasetSpi public synchronized ReplicaInPipeline createRbw(StorageType storageType, ExtendedBlock b, boolean allowLazyPersist) throws IOException { @@ -839,7 +950,13 @@ public synchronized ReplicaInPipeline createRbw(StorageType storageType, } } catch (DiskOutOfSpaceException de) { if (allowLazyPersist) { - allowLazyPersist = false; + if (!tryToEvictBlocks(b.getBlockPoolId(), b.getNumBytes())) { + // Eviction did not work, we'll just fallback to DEFAULT storage. + LOG.info("RAM_DISK: Failed to free up " + b.getNumBytes() + + " bytes for new block. Will fallback to DEFAULT " + + "storage"); + allowLazyPersist = false; + } continue; } throw de; @@ -851,6 +968,7 @@ public synchronized ReplicaInPipeline createRbw(StorageType storageType, ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(), b.getGenerationStamp(), v, f.getParentFile()); volumeMap.add(b.getBlockPoolId(), newReplicaInfo); + return newReplicaInfo; } @@ -988,7 +1106,6 @@ public synchronized ReplicaInPipeline createTemporary(StorageType storageType, ReplicaInPipeline newReplicaInfo = new ReplicaInPipeline(b.getBlockId(), b.getGenerationStamp(), v, f.getParentFile()); volumeMap.add(b.getBlockPoolId(), newReplicaInfo); - return newReplicaInfo; } @@ -1054,8 +1171,17 @@ private synchronized FinalizedReplica finalizeReplica(String bpid, File dest = v.addBlock(bpid, replicaInfo, f); newReplicaInfo = new FinalizedReplica(replicaInfo, v, dest.getParentFile()); + + if (v.isTransientStorage()) { + lazyWriteReplicaTracker.addReplica(bpid, replicaInfo.getBlockId(), v); + + // Schedule a checkpoint. + ((LazyWriter) lazyWriter.getRunnable()) + .addReplicaToLazyWriteQueue(bpid, replicaInfo.getBlockId()); + } } volumeMap.add(bpid, newReplicaInfo); + return newReplicaInfo; } @@ -1075,6 +1201,9 @@ public synchronized void unfinalizeBlock(ExtendedBlock b) throws IOException { replicaInfo.getMetaFile(), b.getLocalBlock())) { LOG.warn("Block " + b + " unfinalized and removed. " ); } + if (replicaInfo.getVolume().isTransientStorage()) { + lazyWriteReplicaTracker.discardReplica(b.getBlockPoolId(), b.getBlockId(), true); + } } } @@ -1170,6 +1299,22 @@ public synchronized List getFinalizedBlocks(String bpid) { return finalized; } + /** + * Get the list of finalized blocks from in-memory blockmap for a block pool. + */ + @Override + public synchronized List getFinalizedBlocksOnPersistentStorage(String bpid) { + ArrayList finalized = + new ArrayList(volumeMap.size(bpid)); + for (ReplicaInfo b : volumeMap.replicas(bpid)) { + if(!b.getVolume().isTransientStorage() && + b.getState() == ReplicaState.FINALIZED) { + finalized.add(new FinalizedReplica((FinalizedReplica)b)); + } + } + return finalized; + } + /** * Check whether the given block is a valid one. * valid means finalized @@ -1287,6 +1432,10 @@ public void invalidate(String bpid, Block invalidBlks[]) throws IOException { volumeMap.remove(bpid, invalidBlks[i]); } + if (v.isTransientStorage()) { + lazyWriteReplicaTracker.discardReplica(bpid, invalidBlks[i].getBlockId(), true); + } + // If a DFSClient has the replica in its cache of short-circuit file // descriptors (and the client is using ShortCircuitShm), invalidate it. datanode.getShortCircuitRegistry().processBlockInvalidation( @@ -1482,8 +1631,14 @@ void registerMBean(final String datanodeUuid) { @Override // FsDatasetSpi public void shutdown() { - if (mbeanName != null) + fsRunning = false; + + ((LazyWriter) lazyWriter.getRunnable()).stop(); + lazyWriter.interrupt(); + + if (mbeanName != null) { MBeans.unregister(mbeanName); + } if (asyncDiskService != null) { asyncDiskService.shutdown(); @@ -1492,6 +1647,13 @@ public void shutdown() { if(volumes != null) { volumes.shutdown(); } + + try { + lazyWriter.join(); + } catch (InterruptedException ie) { + LOG.warn("FsDatasetImpl.shutdown ignoring InterruptedException " + + "from LazyWriter.join"); + } } @Override // FSDatasetMBean @@ -1524,7 +1686,7 @@ public String getStorageInfo() { */ @Override public void checkAndUpdate(String bpid, long blockId, File diskFile, - File diskMetaFile, FsVolumeSpi vol) { + File diskMetaFile, FsVolumeSpi vol) throws IOException { Block corruptBlock = null; ReplicaInfo memBlockInfo; synchronized (this) { @@ -1557,6 +1719,9 @@ public void checkAndUpdate(String bpid, long blockId, File diskFile, if (blockScanner != null) { blockScanner.deleteBlock(bpid, new Block(blockId)); } + if (vol.isTransientStorage()) { + lazyWriteReplicaTracker.discardReplica(bpid, blockId, true); + } LOG.warn("Removed block " + blockId + " from memory with missing block file on the disk"); // Finally remove the metadata file @@ -1580,6 +1745,9 @@ public void checkAndUpdate(String bpid, long blockId, File diskFile, if (blockScanner != null) { blockScanner.addBlock(new ExtendedBlock(bpid, diskBlockInfo)); } + if (vol.isTransientStorage()) { + lazyWriteReplicaTracker.addReplica(bpid, blockId, (FsVolumeImpl) vol); + } LOG.warn("Added missing block to memory " + diskBlockInfo); return; } @@ -1757,9 +1925,9 @@ public synchronized String updateReplicaUnderRecovery( final String bpid = oldBlock.getBlockPoolId(); final ReplicaInfo replica = volumeMap.get(bpid, oldBlock.getBlockId()); LOG.info("updateReplica: " + oldBlock - + ", recoveryId=" + recoveryId - + ", length=" + newlength - + ", replica=" + replica); + + ", recoveryId=" + recoveryId + + ", length=" + newlength + + ", replica=" + replica); //check replica if (replica == null) { @@ -2019,5 +2187,123 @@ public void submitBackgroundSyncFileRangeRequest(ExtendedBlock block, asyncDiskService.submitSyncFileRangeRequest(fsVolumeImpl, fd, offset, nbytes, flags); } + + private static class BlockIdPair { + final String bpid; + final long blockId; + + BlockIdPair(final String bpid, final long blockId) { + this.bpid = bpid; + this.blockId = blockId; + } + } + + private class LazyWriter implements Runnable { + private volatile boolean shouldRun = true; + final int checkpointerInterval; + + final private Queue blocksPendingCheckpoint; + + public LazyWriter(final int checkpointerInterval) { + this.checkpointerInterval = checkpointerInterval; + blocksPendingCheckpoint = new LinkedList(); + } + + // Schedule a replica for writing to persistent storage. + public synchronized void addReplicaToLazyWriteQueue( + String bpid, long blockId) { + LOG.info("Block with blockId=" + blockId + "; bpid=" + bpid + " added to lazy writer queue"); + blocksPendingCheckpoint.add(new BlockIdPair(bpid, blockId)); + } + + private void moveReplicaToNewVolume(String bpid, long blockId) + throws IOException { + + LOG.info("LazyWriter invoked to save blockId=" + blockId + "; bpid=" + bpid); + + FsVolumeImpl targetVolume = null; + Block block = null; + File blockFile = null; + + synchronized (this) { + block = getStoredBlock(bpid, blockId); + blockFile = getFile(bpid, blockId); + + if (block == null) { + // The block was deleted before it could be checkpointed. + return; + } + + // Pick a target volume for the block. + targetVolume = volumes.getNextVolume( + StorageType.DEFAULT, block.getNumBytes()); + } + + LOG.info("LazyWriter starting to save blockId=" + blockId + "; bpid=" + bpid); + lazyWriteReplicaTracker.recordStartLazyPersist(bpid, blockId, targetVolume); + File savedBlockFile = targetVolume.getBlockPoolSlice(bpid) + .lazyPersistReplica(block, blockFile); + lazyWriteReplicaTracker.recordEndLazyPersist(bpid, blockId, savedBlockFile); + LOG.info("LazyWriter finished saving blockId=" + blockId + "; bpid=" + bpid + + " to file " + savedBlockFile); + } + + /** + * Checkpoint a pending replica to persistent storage now. + * @return true if there is more work to be done, false otherwise. + */ + private boolean saveNextReplica() { + BlockIdPair blockIdPair = null; + int moreWorkThreshold = 0; + + try { + synchronized (this) { + // Dequeue the next replica waiting to be checkpointed. + blockIdPair = blocksPendingCheckpoint.poll(); + if (blockIdPair == null) { + LOG.info("LazyWriter has no blocks to persist. " + + "Thread going to sleep."); + return false; + } + } + + // Move the replica outside the lock. + moveReplicaToNewVolume(blockIdPair.bpid, blockIdPair.blockId); + + } catch(IOException ioe) { + // If we failed, put the block on the queue and let a retry + // interval elapse before we try again so we don't try to keep + // checkpointing the same block in a tight loop. + synchronized (this) { + blocksPendingCheckpoint.add(blockIdPair); + ++moreWorkThreshold; + } + } + + synchronized (this) { + return blocksPendingCheckpoint.size() > moreWorkThreshold; + } + } + + @Override + public void run() { + while (fsRunning && shouldRun) { + try { + if (!saveNextReplica()) { + Thread.sleep(checkpointerInterval * 1000); + } + } catch (InterruptedException e) { + LOG.info("LazyWriter was interrupted, exiting"); + break; + } catch (Exception e) { + LOG.error("Ignoring exception in LazyWriter:", e); + } + } + } + + public void stop() { + shouldRun = false; + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java index e4b0f2b1ec..d3c585d6f5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java @@ -328,6 +328,8 @@ void deleteBPDirectories(String bpid, boolean force) throws IOException { File bpCurrentDir = new File(bpDir, DataStorage.STORAGE_DIR_CURRENT); File finalizedDir = new File(bpCurrentDir, DataStorage.STORAGE_DIR_FINALIZED); + File lazypersistDir = new File(bpCurrentDir, + DataStorage.STORAGE_DIR_LAZY_PERSIST); File rbwDir = new File(bpCurrentDir, DataStorage.STORAGE_DIR_RBW); if (force) { FileUtil.fullyDelete(bpDir); @@ -339,6 +341,10 @@ void deleteBPDirectories(String bpid, boolean force) throws IOException { !FileUtil.fullyDelete(finalizedDir)) { throw new IOException("Failed to delete " + finalizedDir); } + if (!DatanodeUtil.dirNoFilesRecursive(lazypersistDir) || + !FileUtil.fullyDelete(lazypersistDir)) { + throw new IOException("Failed to delete " + lazypersistDir); + } FileUtil.fullyDelete(tmpDir); for (File f : FileUtil.listFiles(bpCurrentDir)) { if (!f.delete()) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyWriteReplicaTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyWriteReplicaTracker.java new file mode 100644 index 0000000000..ae28f09212 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyWriteReplicaTracker.java @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; + + +import com.google.common.collect.Multimap; +import com.google.common.collect.TreeMultimap; + +import java.io.File; +import java.util.HashMap; +import java.util.Map; + +class LazyWriteReplicaTracker { + + enum State { + IN_MEMORY, + LAZY_PERSIST_IN_PROGRESS, + LAZY_PERSIST_COMPLETE, + } + + static class ReplicaState implements Comparable { + + final String bpid; + final long blockId; + State state; + + /** + * transient storage volume that holds the original replica. + */ + final FsVolumeImpl transientVolume; + + /** + * Persistent volume that holds or will hold the saved replica. + */ + FsVolumeImpl lazyPersistVolume; + File savedBlockFile; + + ReplicaState(final String bpid, final long blockId, FsVolumeImpl transientVolume) { + this.bpid = bpid; + this.blockId = blockId; + this.transientVolume = transientVolume; + state = State.IN_MEMORY; + lazyPersistVolume = null; + savedBlockFile = null; + } + + @Override + public int hashCode() { + return bpid.hashCode() ^ (int) blockId; + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + + if (other == null || getClass() != other.getClass()) { + return false; + } + + ReplicaState otherState = (ReplicaState) other; + return (otherState.bpid.equals(bpid) && otherState.blockId == blockId); + } + + @Override + public int compareTo(ReplicaState other) { + if (blockId == other.blockId) { + return 0; + } else if (blockId < other.blockId) { + return -1; + } else { + return 1; + } + } + } + + final FsDatasetImpl fsDataset; + + /** + * Map of blockpool ID to map of blockID to ReplicaInfo. + */ + final Map> replicaMaps; + + /** + * A map of blockId to persist complete time for transient blocks. This allows + * us to evict LRU blocks from transient storage. Protected by 'this' + * Object lock. + */ + final Map persistTimeMap; + + LazyWriteReplicaTracker(final FsDatasetImpl fsDataset) { + this.fsDataset = fsDataset; + replicaMaps = new HashMap>(); + persistTimeMap = new HashMap(); + } + + TreeMultimap getLruMap() { + // TODO: This can be made more efficient. + TreeMultimap reversedMap = TreeMultimap.create(); + for (Map.Entry entry : persistTimeMap.entrySet()) { + reversedMap.put(entry.getValue(), entry.getKey()); + } + return reversedMap; + } + + synchronized void addReplica(String bpid, long blockId, + final FsVolumeImpl transientVolume) { + Map map = replicaMaps.get(bpid); + if (map == null) { + map = new HashMap(); + replicaMaps.put(bpid, map); + } + map.put(blockId, new ReplicaState(bpid, blockId, transientVolume)); + } + + synchronized void recordStartLazyPersist( + final String bpid, final long blockId, FsVolumeImpl checkpointVolume) { + Map map = replicaMaps.get(bpid); + ReplicaState replicaState = map.get(blockId); + replicaState.state = State.LAZY_PERSIST_IN_PROGRESS; + replicaState.lazyPersistVolume = checkpointVolume; + } + + synchronized void recordEndLazyPersist( + final String bpid, final long blockId, File savedBlockFile) { + Map map = replicaMaps.get(bpid); + ReplicaState replicaState = map.get(blockId); + + if (replicaState == null) { + throw new IllegalStateException("Unknown replica bpid=" + + bpid + "; blockId=" + blockId); + } + replicaState.state = State.LAZY_PERSIST_COMPLETE; + replicaState.savedBlockFile = savedBlockFile; + persistTimeMap.put(replicaState, System.currentTimeMillis() / 1000); + } + + synchronized void discardReplica( + final String bpid, final long blockId, boolean force) { + Map map = replicaMaps.get(bpid); + ReplicaState replicaState = map.get(blockId); + + if (replicaState == null) { + if (force) { + return; + } + throw new IllegalStateException("Unknown replica bpid=" + + bpid + "; blockId=" + blockId); + } + + if (replicaState.state != State.LAZY_PERSIST_COMPLETE && !force) { + throw new IllegalStateException("Discarding replica without " + + "saving it to disk bpid=" + bpid + "; blockId=" + blockId); + + } + + map.remove(blockId); + persistTimeMap.remove(replicaState); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index f1b570d3b1..6183ba1b6d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -1111,6 +1111,11 @@ public List getFinalizedBlocks(String bpid) { throw new UnsupportedOperationException(); } + @Override + public List getFinalizedBlocksOnPersistentStorage(String bpid) { + throw new UnsupportedOperationException(); + } + @Override public Map getVolumeInfoMap() { throw new UnsupportedOperationException(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java index 5153e76f96..db1379119d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java @@ -35,6 +35,7 @@ import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;