From a9331fe9b071fdcdae0c6c747d7b6b306142e671 Mon Sep 17 00:00:00 2001 From: Colin Patrick Mccabe Date: Thu, 30 Oct 2014 17:31:23 -0700 Subject: [PATCH] HDFS-7035. Make adding a new data directory to the DataNode an atomic operation and improve error handling (Lei Xu via Colin P. McCabe) --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../hadoop/hdfs/server/common/Storage.java | 15 + .../hdfs/server/common/StorageInfo.java | 6 +- .../datanode/BlockPoolSliceStorage.java | 168 +++++--- .../hadoop/hdfs/server/datanode/DataNode.java | 113 +++-- .../hdfs/server/datanode/DataStorage.java | 392 +++++++++--------- .../datanode/fsdataset/FsDatasetSpi.java | 6 +- .../fsdataset/impl/FsDatasetImpl.java | 161 +++---- .../server/datanode/SimulatedFSDataset.java | 7 +- .../datanode/TestDataNodeHotSwapVolumes.java | 108 ++++- .../hdfs/server/datanode/TestDataStorage.java | 26 +- .../fsdataset/impl/TestFsDatasetImpl.java | 29 +- 12 files changed, 587 insertions(+), 447 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 9f331d9a37..438ed6663e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -322,6 +322,9 @@ Release 2.7.0 - UNRELEASED HDFS-3342. SocketTimeoutException in BlockSender.sendChunks could have a better error message. (Yongjun Zhang via wang) + HDFS-7035. Make adding a new data directory to the DataNode an atomic + operation and improve error handling (Lei Xu via Colin P. McCabe) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java index 913d890773..f83cf3b004 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java @@ -820,6 +820,21 @@ protected void addStorageDir(StorageDirectory sd) { storageDirs.add(sd); } + /** + * Returns true if the storage directory on the given directory is already + * loaded. + * @param root the root directory of a {@link StorageDirectory} + * @throws IOException if failed to get canonical path. + */ + protected boolean containsStorageDir(File root) throws IOException { + for (StorageDirectory sd : storageDirs) { + if (sd.getRoot().getCanonicalPath().equals(root.getCanonicalPath())) { + return true; + } + } + return false; + } + /** * Return true if the layout of the given storage directory is from a version * of Hadoop prior to the introduction of the "current" and "previous" diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java index 545fb132c0..f40b079ffd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java @@ -216,7 +216,11 @@ protected void setNamespaceID(Properties props, StorageDirectory sd) } namespaceID = nsId; } - + + public void setServiceLayoutVersion(int lv) { + this.layoutVersion = lv; + } + public int getServiceLayoutVersion() { return storageType == NodeType.DATA_NODE ? HdfsConstants.DATANODE_LAYOUT_VERSION : HdfsConstants.NAMENODE_LAYOUT_VERSION; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java index 8333bb4af5..8c819a754d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java @@ -20,6 +20,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.HardLink; @@ -35,9 +36,7 @@ import java.io.File; import java.io.IOException; -import java.util.ArrayList; import java.util.Collection; -import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Properties; @@ -127,9 +126,110 @@ private BlockPoolSliceStorage() { new ConcurrentHashMap()); } + // Expose visibility for VolumeBuilder#commit(). + public void addStorageDir(StorageDirectory sd) { + super.addStorageDir(sd); + } + + /** + * Load one storage directory. Recover from previous transitions if required. + * + * @param datanode datanode instance + * @param nsInfo namespace information + * @param dataDir the root path of the storage directory + * @param startOpt startup option + * @return the StorageDirectory successfully loaded. + * @throws IOException + */ + private StorageDirectory loadStorageDirectory(DataNode datanode, + NamespaceInfo nsInfo, File dataDir, StartupOption startOpt) throws IOException { + StorageDirectory sd = new StorageDirectory(dataDir, null, true); + try { + StorageState curState = sd.analyzeStorage(startOpt, this); + // sd is locked but not opened + switch (curState) { + case NORMAL: + break; + case NON_EXISTENT: + LOG.info("Block pool storage directory " + dataDir + " does not exist"); + throw new IOException("Storage directory " + dataDir + + " does not exist"); + case NOT_FORMATTED: // format + LOG.info("Block pool storage directory " + dataDir + + " is not formatted for " + nsInfo.getBlockPoolID()); + LOG.info("Formatting ..."); + format(sd, nsInfo); + break; + default: // recovery part is common + sd.doRecover(curState); + } + + // 2. Do transitions + // Each storage directory is treated individually. + // During startup some of them can upgrade or roll back + // while others could be up-to-date for the regular startup. + doTransition(datanode, sd, nsInfo, startOpt); + if (getCTime() != nsInfo.getCTime()) { + throw new IOException( + "Data-node and name-node CTimes must be the same."); + } + + // 3. Update successfully loaded storage. + setServiceLayoutVersion(getServiceLayoutVersion()); + writeProperties(sd); + + return sd; + } catch (IOException ioe) { + sd.unlock(); + throw ioe; + } + } + + /** + * Analyze and load storage directories. Recover from previous transitions if + * required. + * + * The block pool storages are either all analyzed or none of them is loaded. + * Therefore, a failure on loading any block pool storage results a faulty + * data volume. + * + * @param datanode Datanode to which this storage belongs to + * @param nsInfo namespace information + * @param dataDirs storage directories of block pool + * @param startOpt startup option + * @return an array of loaded block pool directories. + * @throws IOException on error + */ + List loadBpStorageDirectories( + DataNode datanode, NamespaceInfo nsInfo, + Collection dataDirs, StartupOption startOpt) throws IOException { + List succeedDirs = Lists.newArrayList(); + try { + for (File dataDir : dataDirs) { + if (containsStorageDir(dataDir)) { + throw new IOException( + "BlockPoolSliceStorage.recoverTransitionRead: " + + "attempt to load an used block storage: " + dataDir); + } + StorageDirectory sd = + loadStorageDirectory(datanode, nsInfo, dataDir, startOpt); + succeedDirs.add(sd); + } + } catch (IOException e) { + LOG.warn("Failed to analyze storage directories for block pool " + + nsInfo.getBlockPoolID(), e); + throw e; + } + return succeedDirs; + } + /** * Analyze storage directories. Recover from previous transitions if required. - * + * + * The block pool storages are either all analyzed or none of them is loaded. + * Therefore, a failure on loading any block pool storage results a faulty + * data volume. + * * @param datanode Datanode to which this storage belongs to * @param nsInfo namespace information * @param dataDirs storage directories of block pool @@ -139,68 +239,10 @@ private BlockPoolSliceStorage() { void recoverTransitionRead(DataNode datanode, NamespaceInfo nsInfo, Collection dataDirs, StartupOption startOpt) throws IOException { LOG.info("Analyzing storage directories for bpid " + nsInfo.getBlockPoolID()); - Set existingStorageDirs = new HashSet(); - for (int i = 0; i < getNumStorageDirs(); i++) { - existingStorageDirs.add(getStorageDir(i).getRoot().getAbsolutePath()); - } - - // 1. For each BP data directory analyze the state and - // check whether all is consistent before transitioning. - ArrayList dataDirStates = new ArrayList( - dataDirs.size()); - for (Iterator it = dataDirs.iterator(); it.hasNext();) { - File dataDir = it.next(); - if (existingStorageDirs.contains(dataDir.getAbsolutePath())) { - LOG.info("Storage directory " + dataDir + " has already been used."); - it.remove(); - continue; - } - StorageDirectory sd = new StorageDirectory(dataDir, null, true); - StorageState curState; - try { - curState = sd.analyzeStorage(startOpt, this); - // sd is locked but not opened - switch (curState) { - case NORMAL: - break; - case NON_EXISTENT: - // ignore this storage - LOG.info("Storage directory " + dataDir + " does not exist."); - it.remove(); - continue; - case NOT_FORMATTED: // format - LOG.info("Storage directory " + dataDir + " is not formatted."); - LOG.info("Formatting ..."); - format(sd, nsInfo); - break; - default: // recovery part is common - sd.doRecover(curState); - } - } catch (IOException ioe) { - sd.unlock(); - throw ioe; - } - // add to the storage list. This is inherited from parent class, Storage. + for (StorageDirectory sd : loadBpStorageDirectories( + datanode, nsInfo, dataDirs, startOpt)) { addStorageDir(sd); - dataDirStates.add(curState); } - - if (dataDirs.size() == 0) // none of the data dirs exist - throw new IOException( - "All specified directories are not accessible or do not exist."); - - // 2. Do transitions - // Each storage directory is treated individually. - // During startup some of them can upgrade or roll back - // while others could be up-to-date for the regular startup. - for (int idx = 0; idx < getNumStorageDirs(); idx++) { - doTransition(datanode, getStorageDir(idx), nsInfo, startOpt); - assert getCTime() == nsInfo.getCTime() - : "Data-node and name-node CTimes must be the same."; - } - - // 3. Update all storages. Some of them might have just been formatted. - this.writeAll(); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index eeda2375f1..6bd27fa292 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -44,12 +44,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_DEFAULT; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.IGNORE_SECURE_PORTS_FOR_TESTING_DEFAULT; -import static org.apache.hadoop.hdfs.DFSConfigKeys.IGNORE_SECURE_PORTS_FOR_TESTING_KEY; import static org.apache.hadoop.util.ExitUtil.terminate; import java.io.BufferedOutputStream; @@ -81,6 +77,10 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; import javax.management.ObjectName; @@ -182,7 +182,6 @@ import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; -import org.apache.hadoop.tracing.TraceAdminPB; import org.apache.hadoop.tracing.TraceAdminPB.TraceAdminService; import org.apache.hadoop.tracing.TraceAdminProtocolPB; import org.apache.hadoop.tracing.TraceAdminProtocolServerSideTranslatorPB; @@ -451,20 +450,27 @@ public Collection getReconfigurableProperties() { */ @VisibleForTesting static class ChangedVolumes { + /** The storage locations of the newly added volumes. */ List newLocations = Lists.newArrayList(); + /** The storage locations of the volumes that are removed. */ List deactivateLocations = Lists.newArrayList(); + /** The unchanged locations that existed in the old configuration. */ + List unchangedLocations = Lists.newArrayList(); } /** * Parse the new DFS_DATANODE_DATA_DIR value in the configuration to detect * changed volumes. + * @param newVolumes a comma separated string that specifies the data volumes. * @return changed volumes. * @throws IOException if none of the directories are specified in the * configuration. */ @VisibleForTesting - ChangedVolumes parseChangedVolumes() throws IOException { - List locations = getStorageLocations(getConf()); + ChangedVolumes parseChangedVolumes(String newVolumes) throws IOException { + Configuration conf = new Configuration(); + conf.set(DFS_DATANODE_DATA_DIR_KEY, newVolumes); + List locations = getStorageLocations(conf); if (locations.isEmpty()) { throw new IOException("No directory is specified."); @@ -479,9 +485,11 @@ ChangedVolumes parseChangedVolumes() throws IOException { boolean found = false; for (Iterator sl = results.newLocations.iterator(); sl.hasNext(); ) { - if (sl.next().getFile().getCanonicalPath().equals( + StorageLocation location = sl.next(); + if (location.getFile().getCanonicalPath().equals( dir.getRoot().getCanonicalPath())) { sl.remove(); + results.unchangedLocations.add(location); found = true; break; } @@ -499,18 +507,21 @@ ChangedVolumes parseChangedVolumes() throws IOException { /** * Attempts to reload data volumes with new configuration. * @param newVolumes a comma separated string that specifies the data volumes. - * @throws Exception + * @throws IOException on error. If an IOException is thrown, some new volumes + * may have been successfully added and removed. */ - private synchronized void refreshVolumes(String newVolumes) throws Exception { + private synchronized void refreshVolumes(String newVolumes) throws IOException { Configuration conf = getConf(); conf.set(DFS_DATANODE_DATA_DIR_KEY, newVolumes); - List locations = getStorageLocations(conf); - - final int numOldDataDirs = dataDirs.size(); - dataDirs = locations; - ChangedVolumes changedVolumes = parseChangedVolumes(); + int numOldDataDirs = dataDirs.size(); + ChangedVolumes changedVolumes = parseChangedVolumes(newVolumes); StringBuilder errorMessageBuilder = new StringBuilder(); + List effectiveVolumes = Lists.newArrayList(); + for (StorageLocation sl : changedVolumes.unchangedLocations) { + effectiveVolumes.add(sl.toString()); + } + try { if (numOldDataDirs + changedVolumes.newLocations.size() - changedVolumes.deactivateLocations.size() <= 0) { @@ -521,34 +532,43 @@ private synchronized void refreshVolumes(String newVolumes) throws Exception { Joiner.on(",").join(changedVolumes.newLocations)); // Add volumes for each Namespace + final List nsInfos = Lists.newArrayList(); for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) { - NamespaceInfo nsInfo = bpos.getNamespaceInfo(); - LOG.info("Loading volumes for namesapce: " + nsInfo.getNamespaceID()); - storage.addStorageLocations( - this, nsInfo, changedVolumes.newLocations, StartupOption.HOTSWAP); + nsInfos.add(bpos.getNamespaceInfo()); } - List bpids = Lists.newArrayList(); - for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) { - bpids.add(bpos.getBlockPoolId()); - } - List succeedVolumes = - data.addVolumes(changedVolumes.newLocations, bpids); - - if (succeedVolumes.size() < changedVolumes.newLocations.size()) { - List failedVolumes = Lists.newArrayList(); - // Clean all failed volumes. - for (StorageLocation location : changedVolumes.newLocations) { - if (!succeedVolumes.contains(location)) { - errorMessageBuilder.append("FAILED TO ADD:"); - failedVolumes.add(location); - } else { - errorMessageBuilder.append("ADDED:"); + ExecutorService service = Executors.newFixedThreadPool( + changedVolumes.newLocations.size()); + List> exceptions = Lists.newArrayList(); + for (final StorageLocation location : changedVolumes.newLocations) { + exceptions.add(service.submit(new Callable() { + @Override + public IOException call() { + try { + data.addVolume(location, nsInfos); + } catch (IOException e) { + return e; + } + return null; } - errorMessageBuilder.append(location); - errorMessageBuilder.append("\n"); + })); + } + + for (int i = 0; i < changedVolumes.newLocations.size(); i++) { + StorageLocation volume = changedVolumes.newLocations.get(i); + Future ioExceptionFuture = exceptions.get(i); + try { + IOException ioe = ioExceptionFuture.get(); + if (ioe != null) { + errorMessageBuilder.append(String.format("FAILED TO ADD: %s: %s\n", + volume.toString(), ioe.getMessage())); + } else { + effectiveVolumes.add(volume.toString()); + } + LOG.info("Storage directory is loaded: " + volume.toString()); + } catch (Exception e) { + errorMessageBuilder.append(String.format("FAILED to ADD: %s: %s\n", + volume.toString(), e.getMessage())); } - storage.removeVolumes(failedVolumes); - data.removeVolumes(failedVolumes); } } @@ -557,15 +577,20 @@ private synchronized void refreshVolumes(String newVolumes) throws Exception { Joiner.on(",").join(changedVolumes.deactivateLocations)); data.removeVolumes(changedVolumes.deactivateLocations); - storage.removeVolumes(changedVolumes.deactivateLocations); + try { + storage.removeVolumes(changedVolumes.deactivateLocations); + } catch (IOException e) { + errorMessageBuilder.append(e.getMessage()); + } } if (errorMessageBuilder.length() > 0) { throw new IOException(errorMessageBuilder.toString()); } - } catch (IOException e) { - LOG.warn("There is IOException when refresh volumes! ", e); - throw e; + } finally { + conf.set(DFS_DATANODE_DATA_DIR_KEY, + Joiner.on(",").join(effectiveVolumes)); + dataDirs = getStorageLocations(conf); } } @@ -1304,7 +1329,7 @@ private void initStorage(final NamespaceInfo nsInfo) throws IOException { final String bpid = nsInfo.getBlockPoolID(); //read storage info, lock data dirs and transition fs state if necessary synchronized (this) { - storage.recoverTransitionRead(this, bpid, nsInfo, dataDirs, startOpt); + storage.recoverTransitionRead(this, nsInfo, dataDirs, startOpt); } final StorageInfo bpStorage = storage.getBPStorage(bpid); LOG.info("Setting up storage: nsid=" + bpStorage.getNamespaceID() diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java index 99eedb1f9b..c90ef954d9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java @@ -18,9 +18,12 @@ package org.apache.hadoop.hdfs.server.datanode; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.util.concurrent.Futures; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; @@ -188,15 +191,147 @@ public String getTrashDirectoryForBlockFile(String bpid, File blockFile) { } return null; } - + /** - * {{@inheritDoc org.apache.hadoop.hdfs.server.common.Storage#writeAll()}} + * VolumeBuilder holds the metadata (e.g., the storage directories) of the + * prepared volume returned from {@link prepareVolume()}. Calling {@link build()} + * to add the metadata to {@link DataStorage} so that this prepared volume can + * be active. */ - private void writeAll(Collection dirs) throws IOException { - this.layoutVersion = getServiceLayoutVersion(); - for (StorageDirectory dir : dirs) { - writeProperties(dir); + @InterfaceAudience.Private + @InterfaceStability.Unstable + static public class VolumeBuilder { + private DataStorage storage; + /** Volume level storage directory. */ + private StorageDirectory sd; + /** Mapping from block pool ID to an array of storage directories. */ + private Map> bpStorageDirMap = + Maps.newHashMap(); + + @VisibleForTesting + public VolumeBuilder(DataStorage storage, StorageDirectory sd) { + this.storage = storage; + this.sd = sd; } + + public final StorageDirectory getStorageDirectory() { + return this.sd; + } + + private void addBpStorageDirectories(String bpid, + List dirs) { + bpStorageDirMap.put(bpid, dirs); + } + + /** + * Add loaded metadata of a data volume to {@link DataStorage}. + */ + public void build() { + assert this.sd != null; + synchronized (storage) { + for (Map.Entry> e : + bpStorageDirMap.entrySet()) { + final String bpid = e.getKey(); + BlockPoolSliceStorage bpStorage = this.storage.bpStorageMap.get(bpid); + assert bpStorage != null; + for (StorageDirectory bpSd : e.getValue()) { + bpStorage.addStorageDir(bpSd); + } + } + storage.addStorageDir(sd); + } + } + } + + private StorageDirectory loadStorageDirectory(DataNode datanode, + NamespaceInfo nsInfo, File dataDir, StartupOption startOpt) + throws IOException { + StorageDirectory sd = new StorageDirectory(dataDir, null, false); + try { + StorageState curState = sd.analyzeStorage(startOpt, this); + // sd is locked but not opened + switch (curState) { + case NORMAL: + break; + case NON_EXISTENT: + LOG.info("Storage directory " + dataDir + " does not exist"); + throw new IOException("Storage directory " + dataDir + + " does not exist"); + case NOT_FORMATTED: // format + LOG.info("Storage directory " + dataDir + " is not formatted for " + + nsInfo.getBlockPoolID()); + LOG.info("Formatting ..."); + format(sd, nsInfo, datanode.getDatanodeUuid()); + break; + default: // recovery part is common + sd.doRecover(curState); + } + + // 2. Do transitions + // Each storage directory is treated individually. + // During startup some of them can upgrade or roll back + // while others could be up-to-date for the regular startup. + doTransition(datanode, sd, nsInfo, startOpt); + + // 3. Update successfully loaded storage. + setServiceLayoutVersion(getServiceLayoutVersion()); + writeProperties(sd); + + return sd; + } catch (IOException ioe) { + sd.unlock(); + throw ioe; + } + } + + /** + * Prepare a storage directory. It creates a builder which can be used to add + * to the volume. If the volume cannot be added, it is OK to discard the + * builder later. + * + * @param datanode DataNode object. + * @param volume the root path of a storage directory. + * @param nsInfos an array of namespace infos. + * @return a VolumeBuilder that holds the metadata of this storage directory + * and can be added to DataStorage later. + * @throws IOException if encounters I/O errors. + * + * Note that if there is IOException, the state of DataStorage is not modified. + */ + public VolumeBuilder prepareVolume(DataNode datanode, File volume, + List nsInfos) throws IOException { + if (containsStorageDir(volume)) { + final String errorMessage = "Storage directory is in use"; + LOG.warn(errorMessage + "."); + throw new IOException(errorMessage); + } + + StorageDirectory sd = loadStorageDirectory( + datanode, nsInfos.get(0), volume, StartupOption.HOTSWAP); + VolumeBuilder builder = + new VolumeBuilder(this, sd); + for (NamespaceInfo nsInfo : nsInfos) { + List bpDataDirs = Lists.newArrayList(); + bpDataDirs.add(BlockPoolSliceStorage.getBpRoot( + nsInfo.getBlockPoolID(), new File(volume, STORAGE_DIR_CURRENT))); + makeBlockPoolDataDir(bpDataDirs, null); + + BlockPoolSliceStorage bpStorage; + final String bpid = nsInfo.getBlockPoolID(); + synchronized (this) { + bpStorage = this.bpStorageMap.get(bpid); + if (bpStorage == null) { + bpStorage = new BlockPoolSliceStorage( + nsInfo.getNamespaceID(), bpid, nsInfo.getCTime(), + nsInfo.getClusterID()); + addBlockPoolStorage(bpid, bpStorage); + } + } + builder.addBpStorageDirectories( + bpid, bpStorage.loadBpStorageDirectories( + datanode, nsInfo, bpDataDirs, StartupOption.HOTSWAP)); + } + return builder; } /** @@ -207,157 +342,64 @@ private void writeAll(Collection dirs) throws IOException { * @param nsInfo namespace information * @param dataDirs array of data storage directories * @param startOpt startup option + * @return a list of successfully loaded volumes. * @throws IOException */ - synchronized void addStorageLocations(DataNode datanode, + @VisibleForTesting + synchronized List addStorageLocations(DataNode datanode, NamespaceInfo nsInfo, Collection dataDirs, - StartupOption startOpt) - throws IOException { - // Similar to recoverTransitionRead, it first ensures the datanode level - // format is completed. - List tmpDataDirs = - new ArrayList(dataDirs); - addStorageLocations(datanode, nsInfo, tmpDataDirs, startOpt, false, true); - - Collection bpDataDirs = new ArrayList(); - String bpid = nsInfo.getBlockPoolID(); - for (StorageLocation dir : dataDirs) { - File dnRoot = dir.getFile(); - File bpRoot = BlockPoolSliceStorage.getBpRoot(bpid, new File(dnRoot, - STORAGE_DIR_CURRENT)); - bpDataDirs.add(bpRoot); - } - // mkdir for the list of BlockPoolStorage - makeBlockPoolDataDir(bpDataDirs, null); - BlockPoolSliceStorage bpStorage = this.bpStorageMap.get(bpid); - if (bpStorage == null) { - bpStorage = new BlockPoolSliceStorage( - nsInfo.getNamespaceID(), bpid, nsInfo.getCTime(), - nsInfo.getClusterID()); - } - - bpStorage.recoverTransitionRead(datanode, nsInfo, bpDataDirs, startOpt); - addBlockPoolStorage(bpid, bpStorage); - } - - /** - * Add a list of volumes to be managed by this DataStorage. If the volume is - * empty, it formats the volume, otherwise it recovers it from previous - * transitions if required. - * - * If isInitialize is false, only the directories that have finished the - * doTransition() process will be added into DataStorage. - * - * @param datanode the reference to DataNode. - * @param nsInfo namespace information - * @param dataDirs array of data storage directories - * @param startOpt startup option - * @param isInitialize whether it is called when DataNode starts up. - * @throws IOException - */ - private synchronized void addStorageLocations(DataNode datanode, - NamespaceInfo nsInfo, Collection dataDirs, - StartupOption startOpt, boolean isInitialize, boolean ignoreExistingDirs) - throws IOException { - Set existingStorageDirs = new HashSet(); - for (int i = 0; i < getNumStorageDirs(); i++) { - existingStorageDirs.add(getStorageDir(i).getRoot().getAbsolutePath()); - } - - // 1. For each data directory calculate its state and check whether all is - // consistent before transitioning. Format and recover. - ArrayList dataDirStates = - new ArrayList(dataDirs.size()); - List addedStorageDirectories = - new ArrayList(); - for(Iterator it = dataDirs.iterator(); it.hasNext();) { - File dataDir = it.next().getFile(); - if (existingStorageDirs.contains(dataDir.getAbsolutePath())) { + StartupOption startOpt) throws IOException { + final String bpid = nsInfo.getBlockPoolID(); + List successVolumes = Lists.newArrayList(); + for (StorageLocation dataDir : dataDirs) { + File root = dataDir.getFile(); + if (!containsStorageDir(root)) { + try { + // It first ensures the datanode level format is completed. + StorageDirectory sd = loadStorageDirectory( + datanode, nsInfo, root, startOpt); + addStorageDir(sd); + } catch (IOException e) { + LOG.warn(e); + continue; + } + } else { LOG.info("Storage directory " + dataDir + " has already been used."); - it.remove(); - continue; } - StorageDirectory sd = new StorageDirectory(dataDir); - StorageState curState; + + List bpDataDirs = new ArrayList(); + bpDataDirs.add(BlockPoolSliceStorage.getBpRoot(bpid, new File(root, + STORAGE_DIR_CURRENT))); try { - curState = sd.analyzeStorage(startOpt, this); - // sd is locked but not opened - switch (curState) { - case NORMAL: - break; - case NON_EXISTENT: - // ignore this storage - LOG.info("Storage directory " + dataDir + " does not exist"); - it.remove(); - continue; - case NOT_FORMATTED: // format - LOG.info("Storage directory " + dataDir + " is not formatted for " - + nsInfo.getBlockPoolID()); - LOG.info("Formatting ..."); - format(sd, nsInfo, datanode.getDatanodeUuid()); - break; - default: // recovery part is common - sd.doRecover(curState); + makeBlockPoolDataDir(bpDataDirs, null); + BlockPoolSliceStorage bpStorage = this.bpStorageMap.get(bpid); + if (bpStorage == null) { + bpStorage = new BlockPoolSliceStorage( + nsInfo.getNamespaceID(), bpid, nsInfo.getCTime(), + nsInfo.getClusterID()); } - } catch (IOException ioe) { - sd.unlock(); - LOG.warn("Ignoring storage directory " + dataDir - + " due to an exception", ioe); - //continue with other good dirs - continue; - } - if (isInitialize) { - addStorageDir(sd); - } - addedStorageDirectories.add(sd); - dataDirStates.add(curState); - } - if (dataDirs.size() == 0 || dataDirStates.size() == 0) { - // none of the data dirs exist - if (ignoreExistingDirs) { - return; - } - throw new IOException( - "All specified directories are not accessible or do not exist."); - } - - // 2. Do transitions - // Each storage directory is treated individually. - // During startup some of them can upgrade or rollback - // while others could be up-to-date for the regular startup. - for (Iterator it = addedStorageDirectories.iterator(); - it.hasNext(); ) { - StorageDirectory sd = it.next(); - try { - doTransition(datanode, sd, nsInfo, startOpt); - createStorageID(sd); + bpStorage.recoverTransitionRead(datanode, nsInfo, bpDataDirs, startOpt); + addBlockPoolStorage(bpid, bpStorage); } catch (IOException e) { - if (!isInitialize) { - sd.unlock(); - it.remove(); - continue; - } - unlockAll(); - throw e; + LOG.warn("Failed to add storage for block pool: " + bpid + " : " + + e.getMessage()); + continue; } + successVolumes.add(dataDir); } - - // 3. Update all successfully loaded storages. Some of them might have just - // been formatted. - this.writeAll(addedStorageDirectories); - - // 4. Make newly loaded storage directories visible for service. - if (!isInitialize) { - this.storageDirs.addAll(addedStorageDirectories); - } + return successVolumes; } /** - * Remove volumes from DataStorage. + * Remove volumes from DataStorage. All volumes are removed even when the + * IOException is thrown. + * * @param locations a collection of volumes. + * @throws IOException if I/O error when unlocking storage directory. */ - synchronized void removeVolumes(Collection locations) { + synchronized void removeVolumes(Collection locations) + throws IOException { if (locations.isEmpty()) { return; } @@ -371,6 +413,7 @@ synchronized void removeVolumes(Collection locations) { bpsStorage.removeVolumes(dataDirs); } + StringBuilder errorMsgBuilder = new StringBuilder(); for (Iterator it = this.storageDirs.iterator(); it.hasNext(); ) { StorageDirectory sd = it.next(); @@ -382,13 +425,18 @@ synchronized void removeVolumes(Collection locations) { LOG.warn(String.format( "I/O error attempting to unlock storage directory %s.", sd.getRoot()), e); + errorMsgBuilder.append(String.format("Failed to remove %s: %s\n", + sd.getRoot(), e.getMessage())); } } } + if (errorMsgBuilder.length() > 0) { + throw new IOException(errorMsgBuilder.toString()); + } } /** - * Analyze storage directories. + * Analyze storage directories for a specific block pool. * Recover from previous transitions if required. * Perform fs state transition if necessary depending on the namespace info. * Read storage info. @@ -396,60 +444,25 @@ synchronized void removeVolumes(Collection locations) { * This method should be synchronized between multiple DN threads. Only the * first DN thread does DN level storage dir recoverTransitionRead. * - * @param nsInfo namespace information - * @param dataDirs array of data storage directories - * @param startOpt startup option - * @throws IOException - */ - synchronized void recoverTransitionRead(DataNode datanode, - NamespaceInfo nsInfo, Collection dataDirs, - StartupOption startOpt) - throws IOException { - if (initialized) { - // DN storage has been initialized, no need to do anything - return; - } - LOG.info("DataNode version: " + HdfsConstants.DATANODE_LAYOUT_VERSION - + " and NameNode layout version: " + nsInfo.getLayoutVersion()); - - this.storageDirs = new ArrayList(dataDirs.size()); - addStorageLocations(datanode, nsInfo, dataDirs, startOpt, true, false); - - // mark DN storage is initialized - this.initialized = true; - } - - /** - * recoverTransitionRead for a specific block pool - * * @param datanode DataNode - * @param bpID Block pool Id * @param nsInfo Namespace info of namenode corresponding to the block pool * @param dataDirs Storage directories * @param startOpt startup option * @throws IOException on error */ - void recoverTransitionRead(DataNode datanode, String bpID, NamespaceInfo nsInfo, + void recoverTransitionRead(DataNode datanode, NamespaceInfo nsInfo, Collection dataDirs, StartupOption startOpt) throws IOException { - // First ensure datanode level format/snapshot/rollback is completed - recoverTransitionRead(datanode, nsInfo, dataDirs, startOpt); - - // Create list of storage directories for the block pool - Collection bpDataDirs = new ArrayList(); - for(StorageLocation dir : dataDirs) { - File dnRoot = dir.getFile(); - File bpRoot = BlockPoolSliceStorage.getBpRoot(bpID, new File(dnRoot, - STORAGE_DIR_CURRENT)); - bpDataDirs.add(bpRoot); + if (this.initialized) { + LOG.info("DataNode version: " + HdfsConstants.DATANODE_LAYOUT_VERSION + + " and NameNode layout version: " + nsInfo.getLayoutVersion()); + this.storageDirs = new ArrayList(dataDirs.size()); + // mark DN storage is initialized + this.initialized = true; } - // mkdir for the list of BlockPoolStorage - makeBlockPoolDataDir(bpDataDirs, null); - BlockPoolSliceStorage bpStorage = new BlockPoolSliceStorage( - nsInfo.getNamespaceID(), bpID, nsInfo.getCTime(), nsInfo.getClusterID()); - - bpStorage.recoverTransitionRead(datanode, nsInfo, bpDataDirs, startOpt); - addBlockPoolStorage(bpID, bpStorage); + if (addStorageLocations(datanode, nsInfo, dataDirs, startOpt).isEmpty()) { + throw new IOException("All specified directories are failed to load."); + } } /** @@ -665,12 +678,15 @@ private void doTransition( DataNode datanode, // meaningful at BlockPoolSliceStorage level. // regular start up. - if (this.layoutVersion == HdfsConstants.DATANODE_LAYOUT_VERSION) + if (this.layoutVersion == HdfsConstants.DATANODE_LAYOUT_VERSION) { + createStorageID(sd); return; // regular startup + } // do upgrade if (this.layoutVersion > HdfsConstants.DATANODE_LAYOUT_VERSION) { doUpgrade(datanode, sd, nsInfo); // upgrade + createStorageID(sd); return; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java index 849c80e003..a02ee0a434 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java @@ -51,6 +51,7 @@ import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; +import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.util.DiskChecker.DiskErrorException; @@ -100,8 +101,9 @@ public RollingLogs createRollingLogs(String bpid, String prefix public List getVolumes(); /** Add an array of StorageLocation to FsDataset. */ - public List addVolumes(List volumes, - final Collection bpids); + public void addVolume( + final StorageLocation location, + final List nsInfos) throws IOException; /** Removes a collection of volumes from FsDataset. */ public void removeVolumes(Collection volumes); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 73966b7498..4a89778ad3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -31,7 +31,6 @@ import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; @@ -94,6 +93,7 @@ import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; +import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.io.IOUtils; @@ -307,30 +307,39 @@ private void addVolume(Collection dataLocations, ReplicaMap tempVolumeMap = new ReplicaMap(this); fsVolume.getVolumeMap(tempVolumeMap, ramDiskReplicaTracker); - volumeMap.addAll(tempVolumeMap); - volumes.addVolume(fsVolume); - storageMap.put(sd.getStorageUuid(), - new DatanodeStorage(sd.getStorageUuid(), - DatanodeStorage.State.NORMAL, - storageType)); - asyncDiskService.addVolume(sd.getCurrentDir()); + synchronized (this) { + volumeMap.addAll(tempVolumeMap); + storageMap.put(sd.getStorageUuid(), + new DatanodeStorage(sd.getStorageUuid(), + DatanodeStorage.State.NORMAL, + storageType)); + asyncDiskService.addVolume(sd.getCurrentDir()); + volumes.addVolume(fsVolume); + } LOG.info("Added volume - " + dir + ", StorageType: " + storageType); } - private void addVolumeAndBlockPool(Collection dataLocations, - Storage.StorageDirectory sd, final Collection bpids) + @Override + public void addVolume(final StorageLocation location, + final List nsInfos) throws IOException { - final File dir = sd.getCurrentDir(); - final StorageType storageType = - getStorageTypeFromLocations(dataLocations, sd.getRoot()); + final File dir = location.getFile(); + // Prepare volume in DataStorage + DataStorage.VolumeBuilder builder = + dataStorage.prepareVolume(datanode, location.getFile(), nsInfos); + + final Storage.StorageDirectory sd = builder.getStorageDirectory(); + + StorageType storageType = location.getStorageType(); final FsVolumeImpl fsVolume = new FsVolumeImpl( this, sd.getStorageUuid(), dir, this.conf, storageType); final ReplicaMap tempVolumeMap = new ReplicaMap(fsVolume); + ArrayList exceptions = Lists.newArrayList(); - List exceptions = Lists.newArrayList(); - for (final String bpid : bpids) { + for (final NamespaceInfo nsInfo : nsInfos) { + String bpid = nsInfo.getBlockPoolID(); try { fsVolume.addBlockPool(bpid, this.conf); fsVolume.getVolumeMap(bpid, tempVolumeMap, ramDiskReplicaTracker); @@ -341,91 +350,24 @@ private void addVolumeAndBlockPool(Collection dataLocations, } } if (!exceptions.isEmpty()) { - // The states of FsDatasteImpl are not modified, thus no need to rolled back. throw MultipleIOException.createIOException(exceptions); } - volumeMap.addAll(tempVolumeMap); - storageMap.put(sd.getStorageUuid(), - new DatanodeStorage(sd.getStorageUuid(), - DatanodeStorage.State.NORMAL, - storageType)); - asyncDiskService.addVolume(sd.getCurrentDir()); - volumes.addVolume(fsVolume); + setupAsyncLazyPersistThread(fsVolume); + builder.build(); + synchronized (this) { + volumeMap.addAll(tempVolumeMap); + storageMap.put(sd.getStorageUuid(), + new DatanodeStorage(sd.getStorageUuid(), + DatanodeStorage.State.NORMAL, + storageType)); + asyncDiskService.addVolume(sd.getCurrentDir()); + volumes.addVolume(fsVolume); + } LOG.info("Added volume - " + dir + ", StorageType: " + storageType); } - /** - * Add an array of StorageLocation to FsDataset. - * - * @pre dataStorage must have these volumes. - * @param volumes an array of storage locations for adding volumes. - * @param bpids block pool IDs. - * @return an array of successfully loaded volumes. - */ - @Override - public synchronized List addVolumes( - final List volumes, final Collection bpids) { - final Collection dataLocations = - DataNode.getStorageLocations(this.conf); - final Map allStorageDirs = - new HashMap(); - List succeedVolumes = Lists.newArrayList(); - try { - for (int idx = 0; idx < dataStorage.getNumStorageDirs(); idx++) { - Storage.StorageDirectory sd = dataStorage.getStorageDir(idx); - allStorageDirs.put(sd.getRoot().getCanonicalPath(), sd); - } - } catch (IOException ioe) { - LOG.warn("Caught exception when parsing storage URL.", ioe); - return succeedVolumes; - } - - final boolean[] successFlags = new boolean[volumes.size()]; - Arrays.fill(successFlags, false); - List volumeAddingThreads = Lists.newArrayList(); - for (int i = 0; i < volumes.size(); i++) { - final int idx = i; - Thread t = new Thread() { - public void run() { - StorageLocation vol = volumes.get(idx); - try { - String key = vol.getFile().getCanonicalPath(); - if (!allStorageDirs.containsKey(key)) { - LOG.warn("Attempt to add an invalid volume: " + vol.getFile()); - } else { - addVolumeAndBlockPool(dataLocations, allStorageDirs.get(key), - bpids); - successFlags[idx] = true; - } - } catch (IOException e) { - LOG.warn("Caught exception when adding volume " + vol, e); - } - } - }; - volumeAddingThreads.add(t); - t.start(); - } - - for (Thread t : volumeAddingThreads) { - try { - t.join(); - } catch (InterruptedException e) { - LOG.warn("Caught InterruptedException when adding volume.", e); - } - } - - setupAsyncLazyPersistThreads(); - - for (int i = 0; i < volumes.size(); i++) { - if (successFlags[i]) { - succeedVolumes.add(volumes.get(i)); - } - } - return succeedVolumes; - } - /** * Removes a collection of volumes from FsDataset. * @param volumes the root directories of the volumes. @@ -2476,24 +2418,27 @@ private boolean ramDiskConfigured() { // added or removed. // This should only be called when the FsDataSetImpl#volumes list is finalized. private void setupAsyncLazyPersistThreads() { - boolean ramDiskConfigured = ramDiskConfigured(); for (FsVolumeImpl v: getVolumes()){ - // Skip transient volumes - if (v.isTransientStorage()) { - continue; - } + setupAsyncLazyPersistThread(v); + } + } - // Add thread for DISK volume if RamDisk is configured - if (ramDiskConfigured && - !asyncLazyPersistService.queryVolume(v.getCurrentDir())) { - asyncLazyPersistService.addVolume(v.getCurrentDir()); - } + private void setupAsyncLazyPersistThread(final FsVolumeImpl v) { + // Skip transient volumes + if (v.isTransientStorage()) { + return; + } + boolean ramDiskConfigured = ramDiskConfigured(); + // Add thread for DISK volume if RamDisk is configured + if (ramDiskConfigured && + !asyncLazyPersistService.queryVolume(v.getCurrentDir())) { + asyncLazyPersistService.addVolume(v.getCurrentDir()); + } - // Remove thread for DISK volume if RamDisk is not configured - if (!ramDiskConfigured && - asyncLazyPersistService.queryVolume(v.getCurrentDir())) { - asyncLazyPersistService.removeVolume(v.getCurrentDir()); - } + // Remove thread for DISK volume if RamDisk is not configured + if (!ramDiskConfigured && + asyncLazyPersistService.queryVolume(v.getCurrentDir())) { + asyncLazyPersistService.removeVolume(v.getCurrentDir()); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index 46cb46a181..3e5034ab66 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; +import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; @@ -53,6 +54,7 @@ import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; +import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.io.IOUtils; @@ -1194,8 +1196,9 @@ public List getVolumes() { } @Override - public List addVolumes(List volumes, - final Collection bpids) { + public void addVolume( + final StorageLocation location, + final List nsInfos) throws IOException { throw new UnsupportedOperationException(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java index 27cfc82e62..d468493b87 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hdfs.server.datanode; +import com.google.common.base.Joiner; +import com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.ReconfigurationException; import org.apache.hadoop.fs.BlockLocation; @@ -33,6 +35,8 @@ import org.apache.hadoop.hdfs.MiniDFSNNTopology; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.server.common.Storage; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.test.GenericTestUtils; @@ -58,9 +62,14 @@ import org.apache.commons.logging.LogFactory; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY; +import static org.hamcrest.CoreMatchers.anyOf; +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -197,10 +206,11 @@ public void testParseChangedVolumes() throws IOException { } assertFalse(oldLocations.isEmpty()); - String newPaths = "/foo/path1,/foo/path2"; - conf.set(DFS_DATANODE_DATA_DIR_KEY, newPaths); + String newPaths = oldLocations.get(0).getFile().getAbsolutePath() + + ",/foo/path1,/foo/path2"; - DataNode.ChangedVolumes changedVolumes =dn.parseChangedVolumes(); + DataNode.ChangedVolumes changedVolumes = + dn.parseChangedVolumes(newPaths); List newVolumes = changedVolumes.newLocations; assertEquals(2, newVolumes.size()); assertEquals(new File("/foo/path1").getAbsolutePath(), @@ -209,21 +219,21 @@ public void testParseChangedVolumes() throws IOException { newVolumes.get(1).getFile().getAbsolutePath()); List removedVolumes = changedVolumes.deactivateLocations; - assertEquals(oldLocations.size(), removedVolumes.size()); - for (int i = 0; i < removedVolumes.size(); i++) { - assertEquals(oldLocations.get(i).getFile(), - removedVolumes.get(i).getFile()); - } + assertEquals(1, removedVolumes.size()); + assertEquals(oldLocations.get(1).getFile(), + removedVolumes.get(0).getFile()); + + assertEquals(1, changedVolumes.unchangedLocations.size()); + assertEquals(oldLocations.get(0).getFile(), + changedVolumes.unchangedLocations.get(0).getFile()); } @Test public void testParseChangedVolumesFailures() throws IOException { startDFSCluster(1, 1); DataNode dn = cluster.getDataNodes().get(0); - Configuration conf = dn.getConf(); try { - conf.set(DFS_DATANODE_DATA_DIR_KEY, ""); - dn.parseChangedVolumes(); + dn.parseChangedVolumes(""); fail("Should throw IOException: empty inputs."); } catch (IOException e) { GenericTestUtils.assertExceptionContains("No directory is specified.", e); @@ -231,7 +241,8 @@ public void testParseChangedVolumesFailures() throws IOException { } /** Add volumes to the first DataNode. */ - private void addVolumes(int numNewVolumes) throws ReconfigurationException { + private void addVolumes(int numNewVolumes) + throws ReconfigurationException, IOException { File dataDir = new File(cluster.getDataDirectory()); DataNode dn = cluster.getDataNodes().get(0); // First DataNode. Configuration conf = dn.getConf(); @@ -253,12 +264,26 @@ private void addVolumes(int numNewVolumes) throws ReconfigurationException { newVolumeDirs.add(volumeDir); volumeDir.mkdirs(); newDataDirBuf.append(","); - newDataDirBuf.append(volumeDir.toURI()); + newDataDirBuf.append( + StorageLocation.parse(volumeDir.toString()).toString()); } String newDataDir = newDataDirBuf.toString(); dn.reconfigurePropertyImpl(DFS_DATANODE_DATA_DIR_KEY, newDataDir); - assertEquals(newDataDir, conf.get(DFS_DATANODE_DATA_DIR_KEY)); + + // Verify the configuration value is appropriately set. + String[] effectiveDataDirs = conf.get(DFS_DATANODE_DATA_DIR_KEY).split(","); + String[] expectDataDirs = newDataDir.split(","); + assertEquals(expectDataDirs.length, effectiveDataDirs.length); + for (int i = 0; i < expectDataDirs.length; i++) { + StorageLocation expectLocation = StorageLocation.parse(expectDataDirs[i]); + StorageLocation effectiveLocation = + StorageLocation.parse(effectiveDataDirs[i]); + assertEquals(expectLocation.getStorageType(), + effectiveLocation.getStorageType()); + assertEquals(expectLocation.getFile().getCanonicalFile(), + effectiveLocation.getFile().getCanonicalFile()); + } // Check that all newly created volumes are appropriately formatted. for (File volumeDir : newVolumeDirs) { @@ -439,7 +464,7 @@ public void testReplicatingAfterRemoveVolume() dn.reconfigurePropertyImpl( DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, newDirs); assertFileLocksReleased( - new ArrayList(oldDirs).subList(1, oldDirs.size())); + new ArrayList(oldDirs).subList(1, oldDirs.size())); triggerDeleteReport(dn); @@ -447,6 +472,59 @@ public void testReplicatingAfterRemoveVolume() DFSTestUtil.waitReplication(fs, testFile, replFactor); } + @Test + public void testAddVolumeFailures() throws IOException { + startDFSCluster(1, 1); + final String dataDir = cluster.getDataDirectory(); + + DataNode dn = cluster.getDataNodes().get(0); + List newDirs = Lists.newArrayList(); + final int NUM_NEW_DIRS = 4; + for (int i = 0; i < NUM_NEW_DIRS; i++) { + File newVolume = new File(dataDir, "new_vol" + i); + newDirs.add(newVolume.toString()); + if (i % 2 == 0) { + // Make addVolume() fail. + newVolume.createNewFile(); + } + } + + String newValue = dn.getConf().get(DFS_DATANODE_DATA_DIR_KEY) + "," + + Joiner.on(",").join(newDirs); + try { + dn.reconfigurePropertyImpl(DFS_DATANODE_DATA_DIR_KEY, newValue); + fail("Expect to throw IOException."); + } catch (ReconfigurationException e) { + String errorMessage = e.getCause().getMessage(); + String messages[] = errorMessage.split("\\r?\\n"); + assertEquals(2, messages.length); + assertThat(messages[0], containsString("new_vol0")); + assertThat(messages[1], containsString("new_vol2")); + } + + // Make sure that vol0 and vol2's metadata are not left in memory. + FsDatasetSpi dataset = dn.getFSDataset(); + for (FsVolumeSpi volume : dataset.getVolumes()) { + assertThat(volume.getBasePath(), is(not(anyOf( + is(newDirs.get(0)), is(newDirs.get(2)))))); + } + DataStorage storage = dn.getStorage(); + for (int i = 0; i < storage.getNumStorageDirs(); i++) { + Storage.StorageDirectory sd = storage.getStorageDir(i); + assertThat(sd.getRoot().toString(), + is(not(anyOf(is(newDirs.get(0)), is(newDirs.get(2)))))); + } + + // The newly effective conf does not have vol0 and vol2. + String[] effectiveVolumes = + dn.getConf().get(DFS_DATANODE_DATA_DIR_KEY).split(","); + assertEquals(4, effectiveVolumes.length); + for (String ev : effectiveVolumes) { + assertThat(new File(ev).getCanonicalPath(), + is(not(anyOf(is(newDirs.get(0)), is(newDirs.get(2)))))); + } + } + /** * Asserts that the storage lock file in each given directory has been * released. This method works by trying to acquire the lock file itself. If diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataStorage.java index ed322437da..c90b8e5d4f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataStorage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataStorage.java @@ -146,14 +146,11 @@ public void testAddStorageDirectories() throws IOException, assertEquals(numLocations, storage.getNumStorageDirs()); locations = createStorageLocations(numLocations); - try { - storage.addStorageLocations(mockDN, namespaceInfos.get(0), - locations, START_OPT); - fail("Expected to throw IOException: adding active directories."); - } catch (IOException e) { - GenericTestUtils.assertExceptionContains( - "All specified directories are not accessible or do not exist.", e); - } + List addedLocation = + storage.addStorageLocations(mockDN, namespaceInfos.get(0), + locations, START_OPT); + assertTrue(addedLocation.isEmpty()); + // The number of active storage dirs has not changed, since it tries to // add the storage dirs that are under service. assertEquals(numLocations, storage.getNumStorageDirs()); @@ -169,13 +166,12 @@ public void testRecoverTransitionReadFailure() throws IOException { final int numLocations = 3; List locations = createStorageLocations(numLocations, true); - try { storage.recoverTransitionRead(mockDN, nsInfo, locations, START_OPT); fail("An IOException should throw: all StorageLocations are NON_EXISTENT"); } catch (IOException e) { GenericTestUtils.assertExceptionContains( - "All specified directories are not accessible or do not exist.", e); + "All specified directories are failed to load.", e); } assertEquals(0, storage.getNumStorageDirs()); } @@ -191,9 +187,9 @@ public void testRecoverTransitionReadDoTransitionFailure() throws IOException { final int numLocations = 3; List locations = createStorageLocations(numLocations); - String bpid = nsInfo.getBlockPoolID(); // Prepare volumes - storage.recoverTransitionRead(mockDN, bpid, nsInfo, locations, START_OPT); + storage.recoverTransitionRead(mockDN, nsInfo, locations, START_OPT); + assertEquals(numLocations, storage.getNumStorageDirs()); // Reset DataStorage storage.unlockAll(); @@ -201,11 +197,11 @@ public void testRecoverTransitionReadDoTransitionFailure() // Trigger an exception from doTransition(). nsInfo.clusterID = "cluster1"; try { - storage.recoverTransitionRead(mockDN, bpid, nsInfo, locations, START_OPT); + storage.recoverTransitionRead(mockDN, nsInfo, locations, START_OPT); fail("Expect to throw an exception from doTransition()"); } catch (IOException e) { - GenericTestUtils.assertExceptionContains("Incompatible clusterIDs", e); + GenericTestUtils.assertExceptionContains("All specified directories", e); } - assertEquals(numLocations, storage.getNumStorageDirs()); + assertEquals(0, storage.getNumStorageDirs()); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java index 7c39ca5568..956ab78205 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; +import com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystemTestHelper; import org.apache.hadoop.hdfs.DFSConfigKeys; @@ -31,6 +32,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataStorage; import org.apache.hadoop.hdfs.server.datanode.StorageLocation; +import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.StringUtils; import org.junit.Before; @@ -40,7 +42,6 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -48,7 +49,9 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyListOf; import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -64,6 +67,7 @@ public class TestFsDatasetImpl { new StorageInfo(HdfsServerConstants.NodeType.DATA_NODE)); private Configuration conf; + private DataNode datanode; private DataStorage storage; private DataBlockScanner scanner; private FsDatasetImpl dataset; @@ -94,7 +98,7 @@ private static void createStorageDirs(DataStorage storage, Configuration conf, @Before public void setUp() throws IOException { - final DataNode datanode = Mockito.mock(DataNode.class); + datanode = Mockito.mock(DataNode.class); storage = Mockito.mock(DataStorage.class); scanner = Mockito.mock(DataBlockScanner.class); this.conf = new Configuration(); @@ -119,17 +123,24 @@ public void testAddVolumes() throws IOException { final int numNewVolumes = 3; final int numExistingVolumes = dataset.getVolumes().size(); final int totalVolumes = numNewVolumes + numExistingVolumes; - List newLocations = new ArrayList(); Set expectedVolumes = new HashSet(); + List nsInfos = Lists.newArrayList(); + for (String bpid : BLOCK_POOL_IDS) { + nsInfos.add(new NamespaceInfo(0, "cluster-id", bpid, 1)); + } for (int i = 0; i < numNewVolumes; i++) { String path = BASE_DIR + "/newData" + i; - newLocations.add(StorageLocation.parse(path)); - when(storage.getStorageDir(numExistingVolumes + i)) - .thenReturn(createStorageDirectory(new File(path))); - } - when(storage.getNumStorageDirs()).thenReturn(totalVolumes); + StorageLocation loc = StorageLocation.parse(path); + Storage.StorageDirectory sd = createStorageDirectory(new File(path)); + DataStorage.VolumeBuilder builder = + new DataStorage.VolumeBuilder(storage, sd); + when(storage.prepareVolume(eq(datanode), eq(loc.getFile()), + anyListOf(NamespaceInfo.class))) + .thenReturn(builder); + + dataset.addVolume(loc, nsInfos); + } - dataset.addVolumes(newLocations, Arrays.asList(BLOCK_POOL_IDS)); assertEquals(totalVolumes, dataset.getVolumes().size()); assertEquals(totalVolumes, dataset.storageMap.size());