From fa6e59891c6125ae83fd601dbbcf928685f5dbfd Mon Sep 17 00:00:00 2001 From: Kihwal Lee Date: Tue, 25 Feb 2014 19:27:53 +0000 Subject: [PATCH] HDFS-5498. Improve datanode startup time. Contributed by Kihwal Lee. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-5535@1571797 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop-hdfs/CHANGES_HDFS-5535.txt | 2 + .../fsdataset/impl/BlockPoolSlice.java | 91 ++++++++++++++++++- .../fsdataset/impl/FsDatasetImpl.java | 8 +- .../datanode/fsdataset/impl/FsVolumeList.java | 39 +++++++- .../apache/hadoop/hdfs/UpgradeUtilities.java | 8 +- 5 files changed, 138 insertions(+), 10 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-5535.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-5535.txt index facf58ed5a..bade7aea58 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-5535.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-5535.txt @@ -95,3 +95,5 @@ HDFS-5535 subtasks: #testRaceBetweenReplicaRecoveryAndFinalizeBlock. (kihwal) HDFS-5924. Utilize OOB upgrade message processing for writes. (kihwal) + + HDFS-5498. Improve datanode startup time. (kihwal) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java index e3e441028d..f77f12b886 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java @@ -22,6 +22,7 @@ import java.io.DataInputStream; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; +import java.io.FileWriter; import java.io.IOException; import java.io.InputStream; import java.io.RandomAccessFile; @@ -44,6 +45,7 @@ import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DiskChecker; import org.apache.hadoop.util.DiskChecker.DiskErrorException; +import org.apache.hadoop.util.ShutdownHookManager; import org.apache.hadoop.util.Time; /** @@ -60,6 +62,9 @@ class BlockPoolSlice { private final LDir finalizedDir; // directory store Finalized replica private final File rbwDir; // directory store RBW replica private final File tmpDir; // directory store Temporary replica + private static String DU_CACHE_FILE = "dfsUsed"; + private volatile boolean dfsUsedSaved = false; + private static final int SHUTDOWN_HOOK_PRIORITY = 30; // TODO:FEDERATION scalability issue - a thread per DU is needed private final DU dfsUsage; @@ -110,8 +115,21 @@ class BlockPoolSlice { throw new IOException("Mkdirs failed to create " + tmpDir.toString()); } } - this.dfsUsage = new DU(bpDir, conf); + // Use cached value initially if available. Or the following call will + // block until the initial du command completes. + this.dfsUsage = new DU(bpDir, conf, loadDfsUsed()); this.dfsUsage.start(); + + // Make the dfs usage to be saved during shutdown. + ShutdownHookManager.get().addShutdownHook( + new Runnable() { + @Override + public void run() { + if (!dfsUsedSaved) { + saveDfsUsed(); + } + } + }, SHUTDOWN_HOOK_PRIORITY); } File getDirectory() { @@ -135,6 +153,74 @@ class BlockPoolSlice { return dfsUsage.getUsed(); } + /** + * Read in the cached DU value and return it if it is less than 600 seconds + * old (DU update interval). Slight imprecision of dfsUsed is not critical + * and skipping DU can significantly shorten the startup time. + * If the cached value is not available or too old, -1 is returned. + */ + long loadDfsUsed() { + long cachedDfsUsed; + long mtime; + Scanner sc; + + try { + sc = new Scanner(new File(currentDir, DU_CACHE_FILE)); + } catch (FileNotFoundException fnfe) { + return -1; + } + + try { + // Get the recorded dfsUsed from the file. + if (sc.hasNextLong()) { + cachedDfsUsed = sc.nextLong(); + } else { + return -1; + } + // Get the recorded mtime from the file. + if (sc.hasNextLong()) { + mtime = sc.nextLong(); + } else { + return -1; + } + + // Return the cached value if mtime is okay. + if (mtime > 0 && (Time.now() - mtime < 600000L)) { + FsDatasetImpl.LOG.info("Cached dfsUsed found for " + currentDir + ": " + + cachedDfsUsed); + return cachedDfsUsed; + } + return -1; + } finally { + sc.close(); + } + } + + /** + * Write the current dfsUsed to the cache file. + */ + void saveDfsUsed() { + File outFile = new File(currentDir, DU_CACHE_FILE); + if (outFile.exists()) { + outFile.delete(); + } + + try { + long used = getDfsUsed(); + if (used > 0) { + FileWriter out = new FileWriter(outFile); + // mtime is written last, so that truncated writes won't be valid. + out.write(Long.toString(used) + " " + Long.toString(Time.now())); + out.flush(); + out.close(); + } + } catch (IOException ioe) { + // If write failed, the volume might be bad. Since the cache file is + // not critical, log the error and continue. + FsDatasetImpl.LOG.warn("Failed to write dfsUsed to " + outFile, ioe); + } + } + /** * Temporary files. They get moved to the finalized block directory when * the block is finalized. @@ -210,6 +296,7 @@ class BlockPoolSlice { genStamp, volume, blockFile.getParentFile(), null); loadRwr = false; } + sc.close(); restartMeta.delete(); } catch (FileNotFoundException fnfe) { // nothing to do here @@ -326,6 +413,8 @@ class BlockPoolSlice { } void shutdown() { + saveDfsUsed(); + dfsUsedSaved = true; dfsUsage.shutdown(); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 92f2ed990d..0f6e308f35 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -1727,11 +1727,13 @@ class FsDatasetImpl implements FsDatasetSpi { } @Override - public synchronized void addBlockPool(String bpid, Configuration conf) + public void addBlockPool(String bpid, Configuration conf) throws IOException { LOG.info("Adding block pool " + bpid); - volumes.addBlockPool(bpid, conf); - volumeMap.initBlockPool(bpid); + synchronized(this) { + volumes.addBlockPool(bpid, conf); + volumeMap.initBlockPool(bpid); + } volumes.getAllVolumesMap(bpid, volumeMap); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java index 89830fcf24..9563dcc987 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java @@ -96,10 +96,41 @@ class FsVolumeList { } } - void getAllVolumesMap(String bpid, ReplicaMap volumeMap) throws IOException { + void getAllVolumesMap(final String bpid, final ReplicaMap volumeMap) throws IOException { long totalStartTime = System.currentTimeMillis(); - for (FsVolumeImpl v : volumes) { - getVolumeMap(bpid, v, volumeMap); + final List exceptions = Collections.synchronizedList( + new ArrayList()); + List replicaAddingThreads = new ArrayList(); + for (final FsVolumeImpl v : volumes) { + Thread t = new Thread() { + public void run() { + try { + FsDatasetImpl.LOG.info("Adding replicas to map for block pool " + + bpid + " on volume " + v + "..."); + long startTime = System.currentTimeMillis(); + v.getVolumeMap(bpid, volumeMap); + long timeTaken = System.currentTimeMillis() - startTime; + FsDatasetImpl.LOG.info("Time to add replicas to map for block pool" + + " " + bpid + " on volume " + v + ": " + timeTaken + "ms"); + } catch (IOException ioe) { + FsDatasetImpl.LOG.info("Caught exception while adding replicas " + + "from " + v + ". Will throw later.", ioe); + exceptions.add(ioe); + } + } + }; + replicaAddingThreads.add(t); + t.start(); + } + for (Thread t : replicaAddingThreads) { + try { + t.join(); + } catch (InterruptedException ie) { + throw new IOException(ie); + } + } + if (!exceptions.isEmpty()) { + throw exceptions.get(0); } long totalTimeTaken = System.currentTimeMillis() - totalStartTime; FsDatasetImpl.LOG.info("Total time to add all replicas to map: " @@ -219,4 +250,4 @@ class FsVolumeList { } } } -} \ No newline at end of file +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/UpgradeUtilities.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/UpgradeUtilities.java index 0cab74e6c5..b4660c29ea 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/UpgradeUtilities.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/UpgradeUtilities.java @@ -290,10 +290,14 @@ public class UpgradeUtilities { if (!list[i].isFile()) { continue; } - // skip VERSION file for DataNodes - if (nodeType == DATA_NODE && list[i].getName().equals("VERSION")) { + + // skip VERSION and dfsUsed file for DataNodes + if (nodeType == DATA_NODE && + (list[i].getName().equals("VERSION") || + list[i].getName().equals("dfsUsed"))) { continue; } + FileInputStream fis = null; try { fis = new FileInputStream(list[i]);