From 9dcbdbdb5a34d85910707f81ebc1bb1f81c99978 Mon Sep 17 00:00:00 2001 From: Ming Ma Date: Tue, 30 Aug 2016 14:00:13 -0700 Subject: [PATCH] HDFS-9392. Admins support for maintenance state. Contributed by Ming Ma. --- .../protocol/DatanodeAdminProperties.java | 19 + .../hadoop/hdfs/protocol/DatanodeInfo.java | 27 +- .../hadoop/hdfs/protocol/HdfsConstants.java | 2 +- .../CombinedHostFileManager.java | 23 + .../blockmanagement/DatanodeManager.java | 33 +- .../server/blockmanagement/DatanodeStats.java | 10 +- .../blockmanagement/DecommissionManager.java | 101 ++- .../blockmanagement/HeartbeatManager.java | 27 + .../blockmanagement/HostConfigManager.java | 7 + .../blockmanagement/HostFileManager.java | 6 + .../hdfs/server/namenode/FSNamesystem.java | 29 + .../namenode/metrics/FSNamesystemMBean.java | 15 + .../hadoop/hdfs/AdminStatesBaseTest.java | 375 +++++++++++ .../apache/hadoop/hdfs/TestDecommission.java | 590 +++++------------- .../hadoop/hdfs/TestMaintenanceState.java | 310 +++++++++ .../namenode/TestDecommissioningStatus.java | 2 +- .../hadoop/hdfs/util/HostsFileWriter.java | 55 +- .../util/TestCombinedHostsFileReader.java | 2 +- .../src/test/resources/dfs.hosts.json | 2 + 19 files changed, 1164 insertions(+), 471 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AdminStatesBaseTest.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMaintenanceState.java diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeAdminProperties.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeAdminProperties.java index 9f7b98309d..2abed81ba1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeAdminProperties.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeAdminProperties.java @@ -33,6 +33,7 @@ public class DatanodeAdminProperties { private int port; private String upgradeDomain; private AdminStates adminState = AdminStates.NORMAL; + private long maintenanceExpireTimeInMS = Long.MAX_VALUE; /** * Return the host name of the datanode. @@ -97,4 +98,22 @@ public AdminStates getAdminState() { public void setAdminState(final AdminStates adminState) { this.adminState = adminState; } + + /** + * Get the maintenance expiration time in milliseconds. + * @return the maintenance expiration time in milliseconds. + */ + public long getMaintenanceExpireTimeInMS() { + return this.maintenanceExpireTimeInMS; + } + + /** + * Get the maintenance expiration time in milliseconds. + * @param maintenanceExpireTimeInMS + * the maintenance expiration time in milliseconds. + */ + public void setMaintenanceExpireTimeInMS( + final long maintenanceExpireTimeInMS) { + this.maintenanceExpireTimeInMS = maintenanceExpireTimeInMS; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java index e04abdde75..cd32a5350a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java @@ -83,6 +83,7 @@ public static AdminStates fromValue(final String value) { } protected AdminStates adminState; + private long maintenanceExpireTimeInMS; public DatanodeInfo(DatanodeInfo from) { super(from); @@ -499,17 +500,28 @@ public void setDecommissioned() { } /** - * Put a node to maintenance mode. + * Start the maintenance operation. */ public void startMaintenance() { - adminState = AdminStates.ENTERING_MAINTENANCE; + this.adminState = AdminStates.ENTERING_MAINTENANCE; } /** - * Put a node to maintenance mode. + * Put a node directly to maintenance mode. */ public void setInMaintenance() { - adminState = AdminStates.IN_MAINTENANCE; + this.adminState = AdminStates.IN_MAINTENANCE; + } + + /** + * @param maintenanceExpireTimeInMS the time that the DataNode is in the + * maintenance mode until in the unit of milliseconds. */ + public void setMaintenanceExpireTimeInMS(long maintenanceExpireTimeInMS) { + this.maintenanceExpireTimeInMS = maintenanceExpireTimeInMS; + } + + public long getMaintenanceExpireTimeInMS() { + return this.maintenanceExpireTimeInMS; } /** @@ -519,6 +531,9 @@ public void stopMaintenance() { adminState = null; } + public static boolean maintenanceNotExpired(long maintenanceExpireTimeInMS) { + return Time.monotonicNow() < maintenanceExpireTimeInMS; + } /** * Returns true if the node is is entering_maintenance */ @@ -541,6 +556,10 @@ public boolean isMaintenance() { adminState == AdminStates.IN_MAINTENANCE); } + public boolean maintenanceExpired() { + return !maintenanceNotExpired(this.maintenanceExpireTimeInMS); + } + public boolean isInService() { return getAdminState() == AdminStates.NORMAL; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java index 8df2d54964..acbc8f674b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java @@ -141,7 +141,7 @@ public static RollingUpgradeAction fromString(String s) { // type of the datanode report public enum DatanodeReportType { - ALL, LIVE, DEAD, DECOMMISSIONING + ALL, LIVE, DEAD, DECOMMISSIONING, ENTERING_MAINTENANCE } public static final byte RS_6_3_POLICY_ID = 0; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CombinedHostFileManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CombinedHostFileManager.java index 3e913b93a2..6f9c35e0fc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CombinedHostFileManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CombinedHostFileManager.java @@ -148,6 +148,24 @@ public boolean apply(java.util.Map.Entry datanode = Iterables.filter( + allDNs.get(address.getAddress()), + new Predicate() { + public boolean apply(DatanodeAdminProperties input) { + return input.getAdminState().equals( + AdminStates.IN_MAINTENANCE) && + (input.getPort() == 0 || + input.getPort() == address.getPort()); + } + }); + // if DN isn't set to maintenance state, ignore MaintenanceExpireTimeInMS + // set in the config. + return datanode.iterator().hasNext() ? + datanode.iterator().next().getMaintenanceExpireTimeInMS() : 0; + } + static class HostIterator extends UnmodifiableIterator { private final Iterator> it; @@ -236,6 +254,11 @@ public synchronized String getUpgradeDomain(final DatanodeID dn) { return hostProperties.getUpgradeDomain(dn.getResolvedAddress()); } + @Override + public long getMaintenanceExpirationTimeInMS(DatanodeID dn) { + return hostProperties.getMaintenanceExpireTimeInMS(dn.getResolvedAddress()); + } + /** * Set the properties lists by the new instances. The * old instance is discarded. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index da02a9035b..fffe29c61d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -552,7 +552,7 @@ DatanodeDescriptor getDatanodeDescriptor(String address) { /** Get a datanode descriptor given corresponding DatanodeUUID */ - DatanodeDescriptor getDatanode(final String datanodeUuid) { + public DatanodeDescriptor getDatanode(final String datanodeUuid) { if (datanodeUuid == null) { return null; } @@ -902,10 +902,14 @@ private static void removeDecomNodeFromList( * * @param nodeReg datanode */ - void startDecommissioningIfExcluded(DatanodeDescriptor nodeReg) { + void startAdminOperationIfNecessary(DatanodeDescriptor nodeReg) { + long maintenanceExpireTimeInMS = + hostConfigManager.getMaintenanceExpirationTimeInMS(nodeReg); // If the registered node is in exclude list, then decommission it if (getHostConfigManager().isExcluded(nodeReg)) { decomManager.startDecommission(nodeReg); + } else if (nodeReg.maintenanceNotExpired(maintenanceExpireTimeInMS)) { + decomManager.startMaintenance(nodeReg, maintenanceExpireTimeInMS); } } @@ -1017,7 +1021,7 @@ nodes with its data cleared (or user can just remove the StorageID // also treat the registration message as a heartbeat heartbeatManager.register(nodeS); incrementVersionCount(nodeS.getSoftwareVersion()); - startDecommissioningIfExcluded(nodeS); + startAdminOperationIfNecessary(nodeS); success = true; } finally { if (!success) { @@ -1056,7 +1060,7 @@ nodes with its data cleared (or user can just remove the StorageID heartbeatManager.addDatanode(nodeDescr); heartbeatManager.updateDnStat(nodeDescr); incrementVersionCount(nodeReg.getSoftwareVersion()); - startDecommissioningIfExcluded(nodeDescr); + startAdminOperationIfNecessary(nodeDescr); success = true; } finally { if (!success) { @@ -1122,9 +1126,14 @@ private void refreshDatanodes() { if (!hostConfigManager.isIncluded(node)) { node.setDisallowed(true); // case 2. } else { - if (hostConfigManager.isExcluded(node)) { + long maintenanceExpireTimeInMS = + hostConfigManager.getMaintenanceExpirationTimeInMS(node); + if (node.maintenanceNotExpired(maintenanceExpireTimeInMS)) { + decomManager.startMaintenance(node, maintenanceExpireTimeInMS); + } else if (hostConfigManager.isExcluded(node)) { decomManager.startDecommission(node); // case 3. } else { + decomManager.stopMaintenance(node); decomManager.stopDecommission(node); // case 4. } } @@ -1157,7 +1166,12 @@ public List getDecommissioningNodes() { // A decommissioning DN may be "alive" or "dead". return getDatanodeListForReport(DatanodeReportType.DECOMMISSIONING); } - + + /** @return list of datanodes that are entering maintenance. */ + public List getEnteringMaintenanceNodes() { + return getDatanodeListForReport(DatanodeReportType.ENTERING_MAINTENANCE); + } + /* Getter and Setter for stale DataNodes related attributes */ /** @@ -1342,6 +1356,9 @@ public List getDatanodeListForReport( final boolean listDecommissioningNodes = type == DatanodeReportType.ALL || type == DatanodeReportType.DECOMMISSIONING; + final boolean listEnteringMaintenanceNodes = + type == DatanodeReportType.ALL || + type == DatanodeReportType.ENTERING_MAINTENANCE; ArrayList nodes; final HostSet foundNodes = new HostSet(); @@ -1353,10 +1370,12 @@ public List getDatanodeListForReport( for (DatanodeDescriptor dn : datanodeMap.values()) { final boolean isDead = isDatanodeDead(dn); final boolean isDecommissioning = dn.isDecommissionInProgress(); + final boolean isEnteringMaintenance = dn.isEnteringMaintenance(); if (((listLiveNodes && !isDead) || (listDeadNodes && isDead) || - (listDecommissioningNodes && isDecommissioning)) && + (listDecommissioningNodes && isDecommissioning) || + (listEnteringMaintenanceNodes && isEnteringMaintenance)) && hostConfigManager.isIncluded(dn)) { nodes.add(dn); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStats.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStats.java index bcc9bba3db..0d4e235d80 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStats.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStats.java @@ -47,7 +47,7 @@ class DatanodeStats { synchronized void add(final DatanodeDescriptor node) { xceiverCount += node.getXceiverCount(); - if (!(node.isDecommissionInProgress() || node.isDecommissioned())) { + if (node.isInService()) { capacityUsed += node.getDfsUsed(); blockPoolUsed += node.getBlockPoolUsed(); nodesInService++; @@ -56,7 +56,8 @@ synchronized void add(final DatanodeDescriptor node) { capacityRemaining += node.getRemaining(); cacheCapacity += node.getCacheCapacity(); cacheUsed += node.getCacheUsed(); - } else if (!node.isDecommissioned()) { + } else if (node.isDecommissionInProgress() || + node.isEnteringMaintenance()) { cacheCapacity += node.getCacheCapacity(); cacheUsed += node.getCacheUsed(); } @@ -74,7 +75,7 @@ synchronized void add(final DatanodeDescriptor node) { synchronized void subtract(final DatanodeDescriptor node) { xceiverCount -= node.getXceiverCount(); - if (!(node.isDecommissionInProgress() || node.isDecommissioned())) { + if (node.isInService()) { capacityUsed -= node.getDfsUsed(); blockPoolUsed -= node.getBlockPoolUsed(); nodesInService--; @@ -83,7 +84,8 @@ synchronized void subtract(final DatanodeDescriptor node) { capacityRemaining -= node.getRemaining(); cacheCapacity -= node.getCacheCapacity(); cacheUsed -= node.getCacheUsed(); - } else if (!node.isDecommissioned()) { + } else if (node.isDecommissionInProgress() || + node.isEnteringMaintenance()) { cacheCapacity -= node.getCacheCapacity(); cacheUsed -= node.getCacheUsed(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java index ec6d9ba179..c456aba560 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java @@ -86,8 +86,11 @@ public class DecommissionManager { private final ScheduledExecutorService executor; /** - * Map containing the decommission-in-progress datanodes that are being - * tracked so they can be be marked as decommissioned. + * Map containing the DECOMMISSION_INPROGRESS or ENTERING_MAINTENANCE + * datanodes that are being tracked so they can be be marked as + * DECOMMISSIONED or IN_MAINTENANCE. Even after the node is marked as + * IN_MAINTENANCE, the node remains in the map until + * maintenance expires checked during a monitor tick. *

* This holds a set of references to the under-replicated blocks on the DN at * the time the DN is added to the map, i.e. the blocks that are preventing @@ -102,12 +105,12 @@ public class DecommissionManager { * another check is done with the actual block map. */ private final TreeMap> - decomNodeBlocks; + outOfServiceNodeBlocks; /** - * Tracking a node in decomNodeBlocks consumes additional memory. To limit - * the impact on NN memory consumption, we limit the number of nodes in - * decomNodeBlocks. Additional nodes wait in pendingNodes. + * Tracking a node in outOfServiceNodeBlocks consumes additional memory. To + * limit the impact on NN memory consumption, we limit the number of nodes in + * outOfServiceNodeBlocks. Additional nodes wait in pendingNodes. */ private final Queue pendingNodes; @@ -122,7 +125,7 @@ public class DecommissionManager { executor = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setNameFormat("DecommissionMonitor-%d") .setDaemon(true).build()); - decomNodeBlocks = new TreeMap<>(); + outOfServiceNodeBlocks = new TreeMap<>(); pendingNodes = new LinkedList<>(); } @@ -222,13 +225,56 @@ public void stopDecommission(DatanodeDescriptor node) { } // Remove from tracking in DecommissionManager pendingNodes.remove(node); - decomNodeBlocks.remove(node); + outOfServiceNodeBlocks.remove(node); } else { LOG.trace("stopDecommission: Node {} in {}, nothing to do." + node, node.getAdminState()); } } + /** + * Start maintenance of the specified datanode. + * @param node + */ + @VisibleForTesting + public void startMaintenance(DatanodeDescriptor node, + long maintenanceExpireTimeInMS) { + // Even if the node is already in maintenance, we still need to adjust + // the expiration time. + node.setMaintenanceExpireTimeInMS(maintenanceExpireTimeInMS); + if (!node.isMaintenance()) { + // Update DN stats maintained by HeartbeatManager + hbManager.startMaintenance(node); + pendingNodes.add(node); + } else { + LOG.trace("startMaintenance: Node {} in {}, nothing to do." + + node, node.getAdminState()); + } + } + + + /** + * Stop maintenance of the specified datanode. + * @param node + */ + @VisibleForTesting + public void stopMaintenance(DatanodeDescriptor node) { + if (node.isMaintenance()) { + // Update DN stats maintained by HeartbeatManager + hbManager.stopMaintenance(node); + + // TODO HDFS-9390 remove replicas from block maps + // or handle over replicated blocks. + + // Remove from tracking in DecommissionManager + pendingNodes.remove(node); + outOfServiceNodeBlocks.remove(node); + } else { + LOG.trace("stopMaintenance: Node {} in {}, nothing to do." + + node, node.getAdminState()); + } + } + private void setDecommissioned(DatanodeDescriptor dn) { dn.setDecommissioned(); LOG.info("Decommissioning complete for node {}", dn); @@ -313,7 +359,7 @@ public int getNumPendingNodes() { @VisibleForTesting public int getNumTrackedNodes() { - return decomNodeBlocks.size(); + return outOfServiceNodeBlocks.size(); } @VisibleForTesting @@ -333,8 +379,8 @@ private class Monitor implements Runnable { */ private final int numBlocksPerCheck; /** - * The maximum number of nodes to track in decomNodeBlocks. A value of 0 - * means no limit. + * The maximum number of nodes to track in outOfServiceNodeBlocks. + * A value of 0 means no limit. */ private final int maxConcurrentTrackedNodes; /** @@ -347,7 +393,7 @@ private class Monitor implements Runnable { */ private int numNodesChecked = 0; /** - * The last datanode in decomNodeBlocks that we've processed + * The last datanode in outOfServiceNodeBlocks that we've processed */ private DatanodeDescriptor iterkey = new DatanodeDescriptor(new DatanodeID("", "", "", 0, 0, 0, 0)); @@ -393,14 +439,15 @@ public void run() { private void processPendingNodes() { while (!pendingNodes.isEmpty() && (maxConcurrentTrackedNodes == 0 || - decomNodeBlocks.size() < maxConcurrentTrackedNodes)) { - decomNodeBlocks.put(pendingNodes.poll(), null); + outOfServiceNodeBlocks.size() < maxConcurrentTrackedNodes)) { + outOfServiceNodeBlocks.put(pendingNodes.poll(), null); } } private void check() { final Iterator>> - it = new CyclicIteration<>(decomNodeBlocks, iterkey).iterator(); + it = new CyclicIteration<>(outOfServiceNodeBlocks, + iterkey).iterator(); final LinkedList toRemove = new LinkedList<>(); while (it.hasNext() && !exceededNumBlocksPerCheck()) { @@ -410,6 +457,17 @@ private void check() { final DatanodeDescriptor dn = entry.getKey(); AbstractList blocks = entry.getValue(); boolean fullScan = false; + if (dn.isMaintenance()) { + // TODO HDFS-9390 make sure blocks are minimally replicated + // before transitioning the node to IN_MAINTENANCE state. + + // If maintenance expires, stop tracking it. + if (dn.maintenanceExpired()) { + stopMaintenance(dn); + toRemove.add(dn); + } + continue; + } if (blocks == null) { // This is a newly added datanode, run through its list to schedule // under-replicated blocks for replication and collect the blocks @@ -417,7 +475,7 @@ private void check() { LOG.debug("Newly-added node {}, doing full scan to find " + "insufficiently-replicated blocks.", dn); blocks = handleInsufficientlyStored(dn); - decomNodeBlocks.put(dn, blocks); + outOfServiceNodeBlocks.put(dn, blocks); fullScan = true; } else { // This is a known datanode, check if its # of insufficiently @@ -436,7 +494,7 @@ private void check() { LOG.debug("Node {} has finished replicating current set of " + "blocks, checking with the full block map.", dn); blocks = handleInsufficientlyStored(dn); - decomNodeBlocks.put(dn, blocks); + outOfServiceNodeBlocks.put(dn, blocks); } // If the full scan is clean AND the node liveness is okay, // we can finally mark as decommissioned. @@ -460,11 +518,12 @@ private void check() { } iterkey = dn; } - // Remove the datanodes that are decommissioned + // Remove the datanodes that are decommissioned or in service after + // maintenance expiration. for (DatanodeDescriptor dn : toRemove) { - Preconditions.checkState(dn.isDecommissioned(), - "Removing a node that is not yet decommissioned!"); - decomNodeBlocks.remove(dn); + Preconditions.checkState(dn.isDecommissioned() || dn.isInService(), + "Removing a node that is not yet decommissioned or in service!"); + outOfServiceNodeBlocks.remove(dn); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java index cec4a1ad2c..d728ee2c9b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java @@ -265,6 +265,33 @@ synchronized void startDecommission(final DatanodeDescriptor node) { } } + synchronized void startMaintenance(final DatanodeDescriptor node) { + if (!node.isAlive()) { + LOG.info("Dead node {} is put in maintenance state immediately.", node); + node.setInMaintenance(); + } else if (node.isDecommissioned()) { + LOG.info("Decommissioned node " + node + " is put in maintenance state" + + " immediately."); + node.setInMaintenance(); + } else { + stats.subtract(node); + node.startMaintenance(); + stats.add(node); + } + } + + synchronized void stopMaintenance(final DatanodeDescriptor node) { + LOG.info("Stopping maintenance of {} node {}", + node.isAlive() ? "live" : "dead", node); + if (!node.isAlive()) { + node.stopMaintenance(); + } else { + stats.subtract(node); + node.stopMaintenance(); + stats.add(node); + } + } + synchronized void stopDecommission(final DatanodeDescriptor node) { LOG.info("Stopping decommissioning of {} node {}", node.isAlive() ? "live" : "dead", node); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostConfigManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostConfigManager.java index f28ed2997a..0ab4ebc180 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostConfigManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostConfigManager.java @@ -77,4 +77,11 @@ public abstract class HostConfigManager implements Configurable { * @return the upgrade domain of dn. */ public abstract String getUpgradeDomain(DatanodeID dn); + + /** + * Get the maintenance expiration time in milli seconds. + * @param dn the DatanodeID of the datanode + * @return the maintenance expiration time of dn. + */ + public abstract long getMaintenanceExpirationTimeInMS(DatanodeID dn); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostFileManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostFileManager.java index bcfebf25de..59f907fe08 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostFileManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostFileManager.java @@ -138,6 +138,12 @@ public synchronized String getUpgradeDomain(final DatanodeID dn) { return null; } + @Override + public long getMaintenanceExpirationTimeInMS(DatanodeID dn) { + // The include/exclude files based config doesn't support maintenance mode. + return 0; + } + /** * Read the includes and excludes lists from the named files. Any previous * includes and excludes lists are discarded. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 52fbaa7c7f..f4b742e050 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -7079,5 +7079,34 @@ public long getBytesInFuture() { return blockManager.getBytesInFuture(); } + + @Override // FSNamesystemMBean + public int getNumInMaintenanceLiveDataNodes() { + final List live = new ArrayList(); + getBlockManager().getDatanodeManager().fetchDatanodes(live, null, true); + int liveInMaintenance = 0; + for (DatanodeDescriptor node : live) { + liveInMaintenance += node.isInMaintenance() ? 1 : 0; + } + return liveInMaintenance; + } + + @Override // FSNamesystemMBean + public int getNumInMaintenanceDeadDataNodes() { + final List dead = new ArrayList(); + getBlockManager().getDatanodeManager().fetchDatanodes(null, dead, true); + int deadInMaintenance = 0; + for (DatanodeDescriptor node : dead) { + deadInMaintenance += node.isInMaintenance() ? 1 : 0; + } + return deadInMaintenance; + } + + @Override // FSNamesystemMBean + public int getNumEnteringMaintenanceDataNodes() { + return getBlockManager().getDatanodeManager().getEnteringMaintenanceNodes() + .size(); + } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/FSNamesystemMBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/FSNamesystemMBean.java index b314f7f8f8..f1e7515754 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/FSNamesystemMBean.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/FSNamesystemMBean.java @@ -208,4 +208,19 @@ public interface FSNamesystemMBean { * Return total time spent doing sync operations on FSEditLog. */ String getTotalSyncTimes(); + + /** + * @return Number of IN_MAINTENANCE live data nodes + */ + int getNumInMaintenanceLiveDataNodes(); + + /** + * @return Number of IN_MAINTENANCE dead data nodes + */ + int getNumInMaintenanceDeadDataNodes(); + + /** + * @return Number of ENTERING_MAINTENANCE data nodes + */ + int getNumEnteringMaintenanceDataNodes(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AdminStatesBaseTest.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AdminStatesBaseTest.java new file mode 100644 index 0000000000..0698628bbe --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AdminStatesBaseTest.java @@ -0,0 +1,375 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; +import org.apache.hadoop.hdfs.server.blockmanagement.CombinedHostFileManager; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; +import org.apache.hadoop.hdfs.server.blockmanagement.HostConfigManager; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; +import org.apache.hadoop.hdfs.util.HostsFileWriter; +import org.junit.After; +import org.junit.Before; + +/** + * This class provide utilities for testing of the admin operations of nodes. + */ +public class AdminStatesBaseTest { + public static final Log LOG = LogFactory.getLog(AdminStatesBaseTest.class); + static final long seed = 0xDEADBEEFL; + static final int blockSize = 8192; + static final int fileSize = 16384; + static final int HEARTBEAT_INTERVAL = 1; // heartbeat interval in seconds + static final int BLOCKREPORT_INTERVAL_MSEC = 1000; //block report in msec + static final int NAMENODE_REPLICATION_INTERVAL = 1; //replication interval + + final private Random myrand = new Random(); + + private HostsFileWriter hostsFileWriter; + private Configuration conf; + private MiniDFSCluster cluster = null; + private boolean useCombinedHostFileManager = false; + + protected void setUseCombinedHostFileManager() { + useCombinedHostFileManager = true; + } + + protected Configuration getConf() { + return conf; + } + + protected MiniDFSCluster getCluster() { + return cluster; + } + + @Before + public void setup() throws IOException { + // Set up the hosts/exclude files. + hostsFileWriter = new HostsFileWriter(); + conf = new HdfsConfiguration(); + + if (useCombinedHostFileManager) { + conf.setClass(DFSConfigKeys.DFS_NAMENODE_HOSTS_PROVIDER_CLASSNAME_KEY, + CombinedHostFileManager.class, HostConfigManager.class); + } + + // Setup conf + conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, + false); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, + 200); + conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL); + conf.setInt(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, + BLOCKREPORT_INTERVAL_MSEC); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, + NAMENODE_REPLICATION_INTERVAL); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY, 1); + + hostsFileWriter.initialize(conf, "temp/admin"); + } + + @After + public void teardown() throws IOException { + hostsFileWriter.cleanup(); + shutdownCluster(); + } + + protected void writeFile(FileSystem fileSys, Path name, int repl) + throws IOException { + writeFile(fileSys, name, repl, 2); + } + + protected void writeFile(FileSystem fileSys, Path name, int repl, + int numOfBlocks) throws IOException { + writeFile(fileSys, name, repl, numOfBlocks, true); + } + + protected FSDataOutputStream writeFile(FileSystem fileSys, Path name, + int repl, int numOfBlocks, boolean completeFile) + throws IOException { + // create and write a file that contains two blocks of data + FSDataOutputStream stm = fileSys.create(name, true, fileSys.getConf() + .getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096), + (short) repl, blockSize); + byte[] buffer = new byte[blockSize*numOfBlocks]; + Random rand = new Random(seed); + rand.nextBytes(buffer); + stm.write(buffer); + LOG.info("Created file " + name + " with " + repl + " replicas."); + if (completeFile) { + stm.close(); + return null; + } else { + // Do not close stream, return it + // so that it is not garbage collected + return stm; + } + } + + /* + * decommission the DN or put the DN into maintenance for datanodeUuid or one + * random node if datanodeUuid is null. + * And wait for the node to reach the given {@code waitForState}. + */ + protected DatanodeInfo takeNodeOutofService(int nnIndex, + String datanodeUuid, long maintenanceExpirationInMS, + ArrayList decommissionedNodes, + AdminStates waitForState) throws IOException { + return takeNodeOutofService(nnIndex, datanodeUuid, + maintenanceExpirationInMS, decommissionedNodes, null, waitForState); + } + + /* + * decommission the DN or put the DN to maintenance set by datanodeUuid + * Pick randome node if datanodeUuid == null + * wait for the node to reach the given {@code waitForState}. + */ + protected DatanodeInfo takeNodeOutofService(int nnIndex, + String datanodeUuid, long maintenanceExpirationInMS, + List decommissionedNodes, + Map inMaintenanceNodes, AdminStates waitForState) + throws IOException { + DFSClient client = getDfsClient(nnIndex); + DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.ALL); + boolean isDecommissionRequest = + waitForState == AdminStates.DECOMMISSION_INPROGRESS || + waitForState == AdminStates.DECOMMISSIONED; + + // + // pick one datanode randomly unless the caller specifies one. + // + int index = 0; + if (datanodeUuid == null) { + boolean found = false; + while (!found) { + index = myrand.nextInt(info.length); + if ((isDecommissionRequest && !info[index].isDecommissioned()) || + (!isDecommissionRequest && !info[index].isInMaintenance())) { + found = true; + } + } + } else { + // The caller specifies a DN + for (; index < info.length; index++) { + if (info[index].getDatanodeUuid().equals(datanodeUuid)) { + break; + } + } + if (index == info.length) { + throw new IOException("invalid datanodeUuid " + datanodeUuid); + } + } + String nodename = info[index].getXferAddr(); + LOG.info("Taking node: " + nodename + " out of service"); + + ArrayList decommissionNodes = new ArrayList(); + if (decommissionedNodes != null) { + for (DatanodeInfo dn : decommissionedNodes) { + decommissionNodes.add(dn.getName()); + } + } + Map maintenanceNodes = new HashMap<>(); + if (inMaintenanceNodes != null) { + for (Map.Entry dn : + inMaintenanceNodes.entrySet()) { + maintenanceNodes.put(dn.getKey().getName(), dn.getValue()); + } + } + + if (isDecommissionRequest) { + decommissionNodes.add(nodename); + } else { + maintenanceNodes.put(nodename, maintenanceExpirationInMS); + } + + // write node names into the json host file. + hostsFileWriter.initOutOfServiceHosts(decommissionNodes, maintenanceNodes); + refreshNodes(nnIndex); + DatanodeInfo ret = NameNodeAdapter.getDatanode( + cluster.getNamesystem(nnIndex), info[index]); + waitNodeState(ret, waitForState); + return ret; + } + + /* Ask a specific NN to put the datanode in service and wait for it + * to reach the NORMAL state. + */ + protected void putNodeInService(int nnIndex, + DatanodeInfo outOfServiceNode) throws IOException { + LOG.info("Putting node: " + outOfServiceNode + " in service"); + ArrayList decommissionNodes = new ArrayList<>(); + Map maintenanceNodes = new HashMap<>(); + + DatanodeManager dm = + cluster.getNamesystem(nnIndex).getBlockManager().getDatanodeManager(); + List nodes = + dm.getDatanodeListForReport(DatanodeReportType.ALL); + for (DatanodeDescriptor node : nodes) { + if (node.isMaintenance()) { + maintenanceNodes.put(node.getName(), + node.getMaintenanceExpireTimeInMS()); + } else if (node.isDecommissionInProgress() || node.isDecommissioned()) { + decommissionNodes.add(node.getName()); + } + } + decommissionNodes.remove(outOfServiceNode.getName()); + maintenanceNodes.remove(outOfServiceNode.getName()); + + hostsFileWriter.initOutOfServiceHosts(decommissionNodes, maintenanceNodes); + refreshNodes(nnIndex); + waitNodeState(outOfServiceNode, AdminStates.NORMAL); + } + + protected void putNodeInService(int nnIndex, + String datanodeUuid) throws IOException { + DatanodeInfo datanodeInfo = + getDatanodeDesriptor(cluster.getNamesystem(nnIndex), datanodeUuid); + putNodeInService(nnIndex, datanodeInfo); + } + + /* + * Wait till node is transitioned to the expected state. + */ + protected void waitNodeState(DatanodeInfo node, + AdminStates state) { + boolean done = state == node.getAdminState(); + while (!done) { + LOG.info("Waiting for node " + node + " to change state to " + + state + " current state: " + node.getAdminState()); + try { + Thread.sleep(HEARTBEAT_INTERVAL * 500); + } catch (InterruptedException e) { + // nothing + } + done = state == node.getAdminState(); + } + LOG.info("node " + node + " reached the state " + state); + } + + protected void initIncludeHost(String hostNameAndPort) throws IOException { + hostsFileWriter.initIncludeHost(hostNameAndPort); + } + + protected void initIncludeHosts(String[] hostNameAndPorts) + throws IOException { + hostsFileWriter.initIncludeHosts(hostNameAndPorts); + } + + protected void initExcludeHost(String hostNameAndPort) throws IOException { + hostsFileWriter.initExcludeHost(hostNameAndPort); + } + + protected void initExcludeHosts(List hostNameAndPorts) + throws IOException { + hostsFileWriter.initExcludeHosts(hostNameAndPorts); + } + + /* Get DFSClient to the namenode */ + protected DFSClient getDfsClient(final int nnIndex) throws IOException { + return new DFSClient(cluster.getNameNode(nnIndex).getNameNodeAddress(), + conf); + } + + /* Validate cluster has expected number of datanodes */ + protected static void validateCluster(DFSClient client, int numDNs) + throws IOException { + DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE); + assertEquals("Number of Datanodes ", numDNs, info.length); + } + + /** Start a MiniDFSCluster. + * @throws IOException */ + protected void startCluster(int numNameNodes, int numDatanodes, + boolean setupHostsFile, long[] nodesCapacity, + boolean checkDataNodeHostConfig) throws IOException { + MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf) + .nnTopology(MiniDFSNNTopology.simpleFederatedTopology(numNameNodes)) + .numDataNodes(numDatanodes); + if (setupHostsFile) { + builder.setupHostsFile(setupHostsFile); + } + if (nodesCapacity != null) { + builder.simulatedCapacities(nodesCapacity); + } + if (checkDataNodeHostConfig) { + builder.checkDataNodeHostConfig(checkDataNodeHostConfig); + } + cluster = builder.build(); + cluster.waitActive(); + for (int i = 0; i < numNameNodes; i++) { + DFSClient client = getDfsClient(i); + validateCluster(client, numDatanodes); + } + } + + protected void startCluster(int numNameNodes, int numDatanodes) + throws IOException { + startCluster(numNameNodes, numDatanodes, false, null, false); + } + + protected void startSimpleHACluster(int numDatanodes) throws IOException { + cluster = new MiniDFSCluster.Builder(conf) + .nnTopology(MiniDFSNNTopology.simpleHATopology()).numDataNodes( + numDatanodes).build(); + cluster.transitionToActive(0); + cluster.waitActive(); + } + + protected void shutdownCluster() { + if (cluster != null) { + cluster.shutdown(); + } + } + + protected void refreshNodes(final int nnIndex) throws IOException { + cluster.getNamesystem(nnIndex).getBlockManager().getDatanodeManager(). + refreshNodes(conf); + } + + protected DatanodeDescriptor getDatanodeDesriptor( + final FSNamesystem ns, final String datanodeUuid) { + return ns.getBlockManager().getDatanodeManager().getDatanode(datanodeUuid); + } + + protected void cleanupFile(FileSystem fileSys, Path name) throws IOException { + assertTrue(fileSys.exists(name)); + fileSys.delete(name, true); + assertTrue(!fileSys.exists(name)); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java index f6b5d8f360..ddb8237699 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java @@ -26,17 +26,13 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Random; import java.util.concurrent.ExecutionException; import com.google.common.base.Supplier; import com.google.common.collect.Lists; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; -import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -64,11 +60,8 @@ import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStatistics; import org.apache.hadoop.test.GenericTestUtils; -import org.apache.hadoop.test.PathUtils; import org.apache.log4j.Level; -import org.junit.After; import org.junit.Assert; -import org.junit.Before; import org.junit.Ignore; import org.junit.Test; import org.mortbay.util.ajax.JSON; @@ -78,90 +71,9 @@ /** * This class tests the decommissioning of nodes. */ -public class TestDecommission { +public class TestDecommission extends AdminStatesBaseTest { public static final Logger LOG = LoggerFactory.getLogger(TestDecommission .class); - static final long seed = 0xDEADBEEFL; - static final int blockSize = 8192; - static final int fileSize = 16384; - static final int HEARTBEAT_INTERVAL = 1; // heartbeat interval in seconds - static final int BLOCKREPORT_INTERVAL_MSEC = 1000; //block report in msec - static final int NAMENODE_REPLICATION_INTERVAL = 1; //replication interval - - final Random myrand = new Random(); - Path dir; - Path hostsFile; - Path excludeFile; - FileSystem localFileSys; - Configuration conf; - MiniDFSCluster cluster = null; - - @Before - public void setup() throws IOException { - conf = new HdfsConfiguration(); - // Set up the hosts/exclude files. - localFileSys = FileSystem.getLocal(conf); - Path workingDir = localFileSys.getWorkingDirectory(); - dir = new Path(workingDir, PathUtils.getTestDirName(getClass()) + "/work-dir/decommission"); - hostsFile = new Path(dir, "hosts"); - excludeFile = new Path(dir, "exclude"); - - // Setup conf - conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, false); - conf.set(DFSConfigKeys.DFS_HOSTS, hostsFile.toUri().getPath()); - conf.set(DFSConfigKeys.DFS_HOSTS_EXCLUDE, excludeFile.toUri().getPath()); - conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 2000); - conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL); - conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1); - conf.setInt(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, BLOCKREPORT_INTERVAL_MSEC); - conf.setInt(DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY, 4); - conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, NAMENODE_REPLICATION_INTERVAL); - - writeConfigFile(hostsFile, null); - writeConfigFile(excludeFile, null); - } - - @After - public void teardown() throws IOException { - cleanupFile(localFileSys, dir); - if (cluster != null) { - cluster.shutdown(); - cluster = null; - } - } - - private void writeConfigFile(Path name, List nodes) - throws IOException { - // delete if it already exists - if (localFileSys.exists(name)) { - localFileSys.delete(name, true); - } - - FSDataOutputStream stm = localFileSys.create(name); - - if (nodes != null) { - for (Iterator it = nodes.iterator(); it.hasNext();) { - String node = it.next(); - stm.writeBytes(node); - stm.writeBytes("\n"); - } - } - stm.close(); - } - - private void writeFile(FileSystem fileSys, Path name, int repl) - throws IOException { - // create and write a file that contains three blocks of data - FSDataOutputStream stm = fileSys.create(name, true, fileSys.getConf() - .getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096), - (short) repl, blockSize); - byte[] buffer = new byte[fileSize]; - Random rand = new Random(seed); - rand.nextBytes(buffer); - stm.write(buffer); - stm.close(); - LOG.info("Created file " + name + " with " + repl + " replicas."); - } /** * Verify that the number of replicas are as expected for each block in @@ -223,128 +135,6 @@ private static String checkFile(FileSystem fileSys, Path name, int repl, return null; } - private void cleanupFile(FileSystem fileSys, Path name) throws IOException { - assertTrue(fileSys.exists(name)); - fileSys.delete(name, true); - assertTrue(!fileSys.exists(name)); - } - - /* - * decommission the DN at index dnIndex or one random node if dnIndex is set - * to -1 and wait for the node to reach the given {@code waitForState}. - */ - private DatanodeInfo decommissionNode(int nnIndex, - String datanodeUuid, - ArrayListdecommissionedNodes, - AdminStates waitForState) - throws IOException { - DFSClient client = getDfsClient(cluster.getNameNode(nnIndex), conf); - DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE); - - // - // pick one datanode randomly unless the caller specifies one. - // - int index = 0; - if (datanodeUuid == null) { - boolean found = false; - while (!found) { - index = myrand.nextInt(info.length); - if (!info[index].isDecommissioned()) { - found = true; - } - } - } else { - // The caller specifies a DN - for (; index < info.length; index++) { - if (info[index].getDatanodeUuid().equals(datanodeUuid)) { - break; - } - } - if (index == info.length) { - throw new IOException("invalid datanodeUuid " + datanodeUuid); - } - } - String nodename = info[index].getXferAddr(); - LOG.info("Decommissioning node: " + nodename); - - // write nodename into the exclude file. - ArrayList nodes = new ArrayList(); - if (decommissionedNodes != null) { - for (DatanodeInfo dn : decommissionedNodes) { - nodes.add(dn.getName()); - } - } - nodes.add(nodename); - writeConfigFile(excludeFile, nodes); - refreshNodes(cluster.getNamesystem(nnIndex), conf); - DatanodeInfo ret = NameNodeAdapter.getDatanode( - cluster.getNamesystem(nnIndex), info[index]); - waitNodeState(ret, waitForState); - return ret; - } - - /* Ask a specific NN to stop decommission of the datanode and wait for each - * to reach the NORMAL state. - */ - private void recommissionNode(int nnIndex, DatanodeInfo decommissionedNode) throws IOException { - LOG.info("Recommissioning node: " + decommissionedNode); - writeConfigFile(excludeFile, null); - refreshNodes(cluster.getNamesystem(nnIndex), conf); - waitNodeState(decommissionedNode, AdminStates.NORMAL); - - } - - /* - * Wait till node is fully decommissioned. - */ - private void waitNodeState(DatanodeInfo node, - AdminStates state) { - boolean done = state == node.getAdminState(); - while (!done) { - LOG.info("Waiting for node " + node + " to change state to " - + state + " current state: " + node.getAdminState()); - try { - Thread.sleep(HEARTBEAT_INTERVAL * 500); - } catch (InterruptedException e) { - // nothing - } - done = state == node.getAdminState(); - } - LOG.info("node " + node + " reached the state " + state); - } - - /* Get DFSClient to the namenode */ - private static DFSClient getDfsClient(NameNode nn, - Configuration conf) throws IOException { - return new DFSClient(nn.getNameNodeAddress(), conf); - } - - /* Validate cluster has expected number of datanodes */ - private static void validateCluster(DFSClient client, int numDNs) - throws IOException { - DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE); - assertEquals("Number of Datanodes ", numDNs, info.length); - } - - /** Start a MiniDFSCluster - * @throws IOException */ - private void startCluster(int numNameNodes, int numDatanodes, - Configuration conf) throws IOException { - cluster = new MiniDFSCluster.Builder(conf) - .nnTopology(MiniDFSNNTopology.simpleFederatedTopology(numNameNodes)) - .numDataNodes(numDatanodes).build(); - cluster.waitActive(); - for (int i = 0; i < numNameNodes; i++) { - DFSClient client = getDfsClient(cluster.getNameNode(i), conf); - validateCluster(client, numDatanodes); - } - } - - static void refreshNodes(final FSNamesystem ns, final Configuration conf - ) throws IOException { - ns.getBlockManager().getDatanodeManager().refreshNodes(conf); - } - private void verifyStats(NameNode namenode, FSNamesystem fsn, DatanodeInfo info, DataNode node, boolean decommissioning) throws InterruptedException, IOException { @@ -376,7 +166,7 @@ private void verifyStats(NameNode namenode, FSNamesystem fsn, public void testDecommission() throws IOException { testDecommission(1, 6); } - + /** * Tests decommission with replicas on the target datanode cannot be migrated * to other datanodes and satisfy the replication factor. Make sure the @@ -387,8 +177,8 @@ public void testDecommission2() throws IOException { LOG.info("Starting test testDecommission"); int numNamenodes = 1; int numDatanodes = 4; - conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 3); - startCluster(numNamenodes, numDatanodes, conf); + getConf().setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 3); + startCluster(numNamenodes, numDatanodes); ArrayList> namenodeDecomList = new ArrayList>( numNamenodes); @@ -399,8 +189,8 @@ public void testDecommission2() throws IOException { // Start decommissioning one namenode at a time ArrayList decommissionedNodes = namenodeDecomList.get(0); - FileSystem fileSys = cluster.getFileSystem(0); - FSNamesystem ns = cluster.getNamesystem(0); + FileSystem fileSys = getCluster().getFileSystem(0); + FSNamesystem ns = getCluster().getNamesystem(0); writeFile(fileSys, file1, replicas); @@ -408,14 +198,14 @@ public void testDecommission2() throws IOException { int liveDecomissioned = ns.getNumDecomLiveDataNodes(); // Decommission one node. Verify that node is decommissioned. - DatanodeInfo decomNode = decommissionNode(0, null, decommissionedNodes, - AdminStates.DECOMMISSIONED); + DatanodeInfo decomNode = takeNodeOutofService(0, null, 0, + decommissionedNodes, AdminStates.DECOMMISSIONED); decommissionedNodes.add(decomNode); assertEquals(deadDecomissioned, ns.getNumDecomDeadDataNodes()); assertEquals(liveDecomissioned + 1, ns.getNumDecomLiveDataNodes()); // Ensure decommissioned datanode is not automatically shutdown - DFSClient client = getDfsClient(cluster.getNameNode(0), conf); + DFSClient client = getDfsClient(0); assertEquals("All datanodes must be alive", numDatanodes, client.datanodeReport(DatanodeReportType.LIVE).length); assertNull(checkFile(fileSys, file1, replicas, decomNode.getXferAddr(), @@ -424,9 +214,8 @@ public void testDecommission2() throws IOException { // Restart the cluster and ensure recommissioned datanodes // are allowed to register with the namenode - cluster.shutdown(); - startCluster(1, 4, conf); - cluster.shutdown(); + shutdownCluster(); + startCluster(1, 4); } /** @@ -449,26 +238,22 @@ public void testDecommissionFederation() throws IOException { */ @Test(timeout=360000) public void testDecommissionOnStandby() throws Exception { - Configuration hdfsConf = new HdfsConfiguration(conf); - hdfsConf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1); - hdfsConf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 30000); - hdfsConf.setInt(DFSConfigKeys.DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_KEY, 2); + getConf().setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1); + getConf().setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, + 30000); + getConf().setInt( + DFSConfigKeys.DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_KEY, 2); // The time to wait so that the slow DN's heartbeat is considered old // by BlockPlacementPolicyDefault and thus will choose that DN for // excess replica. long slowHeartbeatDNwaitTime = - hdfsConf.getLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, - DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT) * 1000 * (hdfsConf.getInt( - DFSConfigKeys.DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_KEY, + getConf().getLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, + DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT) * 1000 * (getConf(). + getInt(DFSConfigKeys.DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_KEY, DFSConfigKeys.DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_DEFAULT) + 1); - cluster = new MiniDFSCluster.Builder(hdfsConf) - .nnTopology(MiniDFSNNTopology.simpleHATopology()).numDataNodes(3).build(); - - cluster.transitionToActive(0); - cluster.waitActive(); - + startSimpleHACluster(3); // Step 1, create a cluster with 4 DNs. Blocks are stored on the first 3 DNs. // The last DN is empty. Also configure the last DN to have slow heartbeat @@ -478,29 +263,29 @@ public void testDecommissionOnStandby() throws Exception { // same as # of DNs, each DN will have a replica for any block. Path file1 = new Path("testDecommissionHA.dat"); int replicas = 3; - FileSystem activeFileSys = cluster.getFileSystem(0); + FileSystem activeFileSys = getCluster().getFileSystem(0); writeFile(activeFileSys, file1, replicas); - HATestUtil.waitForStandbyToCatchUp(cluster.getNameNode(0), - cluster.getNameNode(1)); + HATestUtil.waitForStandbyToCatchUp(getCluster().getNameNode(0), + getCluster().getNameNode(1)); // Step 1.b, start a DN with slow heartbeat, so that we can know for sure it // will be chosen as the target of excess replica during recommission. - hdfsConf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 30); - cluster.startDataNodes(hdfsConf, 1, true, null, null, null); - DataNode lastDN = cluster.getDataNodes().get(3); + getConf().setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 30); + getCluster().startDataNodes(getConf(), 1, true, null, null, null); + DataNode lastDN = getCluster().getDataNodes().get(3); lastDN.getDatanodeUuid(); // Step 2, decommission the first DN at both ANN and SBN. - DataNode firstDN = cluster.getDataNodes().get(0); + DataNode firstDN = getCluster().getDataNodes().get(0); // Step 2.a, ask ANN to decomm the first DN - DatanodeInfo decommissionedNodeFromANN = decommissionNode( - 0, firstDN.getDatanodeUuid(), null, AdminStates.DECOMMISSIONED); + DatanodeInfo decommissionedNodeFromANN = takeNodeOutofService( + 0, firstDN.getDatanodeUuid(), 0, null, AdminStates.DECOMMISSIONED); // Step 2.b, ask SBN to decomm the first DN - DatanodeInfo decomNodeFromSBN = decommissionNode(1, firstDN.getDatanodeUuid(), null, - AdminStates.DECOMMISSIONED); + DatanodeInfo decomNodeFromSBN = takeNodeOutofService(1, + firstDN.getDatanodeUuid(), 0, null, AdminStates.DECOMMISSIONED); // Step 3, recommission the first DN on SBN and ANN to create excess replica // It recommissions the node on SBN first to create potential @@ -520,7 +305,7 @@ public void testDecommissionOnStandby() throws Exception { // After the fix, // After recommissionNode finishes, SBN has 4 live replicas ( 0, 1, 2, 3 ) Thread.sleep(slowHeartbeatDNwaitTime); - recommissionNode(1, decomNodeFromSBN); + putNodeInService(1, decomNodeFromSBN); // Step 3.b, ask ANN to recommission the first DN. // To verify the fix, the test makes sure the excess replica picked by ANN @@ -529,41 +314,41 @@ public void testDecommissionOnStandby() throws Exception { // by ANN. // 1. restore LastDNprop's heartbeat interval. // 2. Make next-to-last DN's heartbeat slow. - MiniDFSCluster.DataNodeProperties LastDNprop = cluster.stopDataNode(3); - LastDNprop.conf.setLong( + MiniDFSCluster.DataNodeProperties lastDNprop = + getCluster().stopDataNode(3); + lastDNprop.conf.setLong( DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL); - cluster.restartDataNode(LastDNprop); + getCluster().restartDataNode(lastDNprop); - MiniDFSCluster.DataNodeProperties nextToLastDNprop = cluster.stopDataNode(2); - nextToLastDNprop.conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 30); - cluster.restartDataNode(nextToLastDNprop); - cluster.waitActive(); + MiniDFSCluster.DataNodeProperties nextToLastDNprop = + getCluster().stopDataNode(2); + nextToLastDNprop.conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, + 30); + getCluster().restartDataNode(nextToLastDNprop); + getCluster().waitActive(); Thread.sleep(slowHeartbeatDNwaitTime); - recommissionNode(0, decommissionedNodeFromANN); + putNodeInService(0, decommissionedNodeFromANN); // Step 3.c, make sure the DN has deleted the block and report to NNs - cluster.triggerHeartbeats(); - HATestUtil.waitForDNDeletions(cluster); - cluster.triggerDeletionReports(); + getCluster().triggerHeartbeats(); + HATestUtil.waitForDNDeletions(getCluster()); + getCluster().triggerDeletionReports(); // Step 4, decommission the first DN on both ANN and SBN // With the fix to make sure SBN no longer marks excess replica // during recommission, SBN's decommission can finish properly - decommissionNode(0, firstDN.getDatanodeUuid(), null, + takeNodeOutofService(0, firstDN.getDatanodeUuid(), 0, null, AdminStates.DECOMMISSIONED); // Ask SBN to decomm the first DN - decommissionNode(1, firstDN.getDatanodeUuid(), null, + takeNodeOutofService(1, firstDN.getDatanodeUuid(), 0, null, AdminStates.DECOMMISSIONED); - - cluster.shutdown(); - } private void testDecommission(int numNamenodes, int numDatanodes) throws IOException { LOG.info("Starting test testDecommission"); - startCluster(numNamenodes, numDatanodes, conf); + startCluster(numNamenodes, numDatanodes); ArrayList> namenodeDecomList = new ArrayList>(numNamenodes); @@ -577,8 +362,8 @@ private void testDecommission(int numNamenodes, int numDatanodes) // Start decommissioning one namenode at a time for (int i = 0; i < numNamenodes; i++) { ArrayList decommissionedNodes = namenodeDecomList.get(i); - FileSystem fileSys = cluster.getFileSystem(i); - FSNamesystem ns = cluster.getNamesystem(i); + FileSystem fileSys = getCluster().getFileSystem(i); + FSNamesystem ns = getCluster().getNamesystem(i); writeFile(fileSys, file1, replicas); @@ -586,14 +371,14 @@ private void testDecommission(int numNamenodes, int numDatanodes) int liveDecomissioned = ns.getNumDecomLiveDataNodes(); // Decommission one node. Verify that node is decommissioned. - DatanodeInfo decomNode = decommissionNode(i, null, decommissionedNodes, - AdminStates.DECOMMISSIONED); + DatanodeInfo decomNode = takeNodeOutofService(i, null, 0, + decommissionedNodes, AdminStates.DECOMMISSIONED); decommissionedNodes.add(decomNode); assertEquals(deadDecomissioned, ns.getNumDecomDeadDataNodes()); assertEquals(liveDecomissioned + 1, ns.getNumDecomLiveDataNodes()); // Ensure decommissioned datanode is not automatically shutdown - DFSClient client = getDfsClient(cluster.getNameNode(i), conf); + DFSClient client = getDfsClient(i); assertEquals("All datanodes must be alive", numDatanodes, client.datanodeReport(DatanodeReportType.LIVE).length); // wait for the block to be replicated @@ -616,9 +401,8 @@ private void testDecommission(int numNamenodes, int numDatanodes) // Restart the cluster and ensure decommissioned datanodes // are allowed to register with the namenode - cluster.shutdown(); - startCluster(numNamenodes, numDatanodes, conf); - cluster.shutdown(); + shutdownCluster(); + startCluster(numNamenodes, numDatanodes); } /** @@ -630,13 +414,13 @@ public void testRecommission() throws Exception { try { LOG.info("Starting test testRecommission"); - startCluster(1, numDatanodes, conf); + startCluster(1, numDatanodes); final Path file1 = new Path("testDecommission.dat"); final int replicas = numDatanodes - 1; ArrayList decommissionedNodes = Lists.newArrayList(); - final FileSystem fileSys = cluster.getFileSystem(); + final FileSystem fileSys = getCluster().getFileSystem(); // Write a file to n-1 datanodes writeFile(fileSys, file1, replicas); @@ -647,25 +431,24 @@ public void testRecommission() throws Exception { replicas, loc.getHosts().length); final String toDecomHost = loc.getNames()[0]; String toDecomUuid = null; - for (DataNode d : cluster.getDataNodes()) { + for (DataNode d : getCluster().getDataNodes()) { if (d.getDatanodeId().getXferAddr().equals(toDecomHost)) { toDecomUuid = d.getDatanodeId().getDatanodeUuid(); break; } } assertNotNull("Could not find a dn with the block!", toDecomUuid); - final DatanodeInfo decomNode = - decommissionNode(0, toDecomUuid, decommissionedNodes, - AdminStates.DECOMMISSIONED); + final DatanodeInfo decomNode = takeNodeOutofService(0, toDecomUuid, + 0, decommissionedNodes, AdminStates.DECOMMISSIONED); decommissionedNodes.add(decomNode); final BlockManager blockManager = - cluster.getNamesystem().getBlockManager(); + getCluster().getNamesystem().getBlockManager(); final DatanodeManager datanodeManager = blockManager.getDatanodeManager(); BlockManagerTestUtil.recheckDecommissionState(datanodeManager); // Ensure decommissioned datanode is not automatically shutdown - DFSClient client = getDfsClient(cluster.getNameNode(), conf); + DFSClient client = getDfsClient(0); assertEquals("All datanodes must be alive", numDatanodes, client.datanodeReport(DatanodeReportType.LIVE).length); @@ -692,15 +475,13 @@ public Boolean get() { }, 500, 30000); // redecommission and wait for over-replication to be fixed - recommissionNode(0, decomNode); + putNodeInService(0, decomNode); BlockManagerTestUtil.recheckDecommissionState(datanodeManager); - DFSTestUtil.waitForReplication(cluster, b, 1, replicas, 0); + DFSTestUtil.waitForReplication(getCluster(), b, 1, replicas, 0); cleanupFile(fileSys, file1); } finally { - if (cluster != null) { - cluster.shutdown(); - } + shutdownCluster(); } } @@ -726,35 +507,33 @@ public void testClusterStats(int numNameNodes) throws IOException, InterruptedException { LOG.info("Starting test testClusterStats"); int numDatanodes = 1; - startCluster(numNameNodes, numDatanodes, conf); + startCluster(numNameNodes, numDatanodes); for (int i = 0; i < numNameNodes; i++) { - FileSystem fileSys = cluster.getFileSystem(i); + FileSystem fileSys = getCluster().getFileSystem(i); Path file = new Path("testClusterStats.dat"); writeFile(fileSys, file, 1); - FSNamesystem fsn = cluster.getNamesystem(i); - NameNode namenode = cluster.getNameNode(i); + FSNamesystem fsn = getCluster().getNamesystem(i); + NameNode namenode = getCluster().getNameNode(i); - DatanodeInfo decomInfo = decommissionNode(i, null, null, + DatanodeInfo decomInfo = takeNodeOutofService(i, null, 0, null, AdminStates.DECOMMISSION_INPROGRESS); DataNode decomNode = getDataNode(decomInfo); // Check namenode stats for multiple datanode heartbeats verifyStats(namenode, fsn, decomInfo, decomNode, true); // Stop decommissioning and verify stats - writeConfigFile(excludeFile, null); - refreshNodes(fsn, conf); DatanodeInfo retInfo = NameNodeAdapter.getDatanode(fsn, decomInfo); + putNodeInService(i, retInfo); DataNode retNode = getDataNode(decomInfo); - waitNodeState(retInfo, AdminStates.NORMAL); verifyStats(namenode, fsn, retInfo, retNode, false); } } private DataNode getDataNode(DatanodeInfo decomInfo) { DataNode decomNode = null; - for (DataNode dn: cluster.getDataNodes()) { + for (DataNode dn: getCluster().getDataNodes()) { if (decomInfo.equals(dn.getDatanodeId())) { decomNode = dn; break; @@ -789,22 +568,16 @@ public void testHostsFileFederation() throws IOException, InterruptedException { public void testHostsFile(int numNameNodes) throws IOException, InterruptedException { int numDatanodes = 1; - cluster = new MiniDFSCluster.Builder(conf) - .nnTopology(MiniDFSNNTopology.simpleFederatedTopology(numNameNodes)) - .numDataNodes(numDatanodes).setupHostsFile(true).build(); - cluster.waitActive(); - + startCluster(numNameNodes, numDatanodes, true, null, false); + // Now empty hosts file and ensure the datanode is disallowed // from talking to namenode, resulting in it's shutdown. - ArrayListlist = new ArrayList(); final String bogusIp = "127.0.30.1"; - list.add(bogusIp); - writeConfigFile(hostsFile, list); - + initIncludeHost(bogusIp); + for (int j = 0; j < numNameNodes; j++) { - refreshNodes(cluster.getNamesystem(j), conf); - - DFSClient client = getDfsClient(cluster.getNameNode(j), conf); + refreshNodes(j); + DFSClient client = getDfsClient(j); DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE); for (int i = 0 ; i < 5 && info.length != 0; i++) { LOG.info("Waiting for datanode to be marked dead"); @@ -828,19 +601,20 @@ public void testDecommissionWithOpenfile() throws IOException, InterruptedExcept LOG.info("Starting test testDecommissionWithOpenfile"); //At most 4 nodes will be decommissioned - startCluster(1, 7, conf); + startCluster(1, 7); - FileSystem fileSys = cluster.getFileSystem(0); - FSNamesystem ns = cluster.getNamesystem(0); - + FileSystem fileSys = getCluster().getFileSystem(0); + FSNamesystem ns = getCluster().getNamesystem(0); + String openFile = "/testDecommissionWithOpenfile.dat"; writeFile(fileSys, new Path(openFile), (short)3); // make sure the file was open for write FSDataOutputStream fdos = fileSys.append(new Path(openFile)); - LocatedBlocks lbs = NameNodeAdapter.getBlockLocations(cluster.getNameNode(0), openFile, 0, fileSize); - + LocatedBlocks lbs = NameNodeAdapter.getBlockLocations( + getCluster().getNameNode(0), openFile, 0, fileSize); + DatanodeInfo[] dnInfos4LastBlock = lbs.getLastLocatedBlock().getLocations(); DatanodeInfo[] dnInfos4FirstBlock = lbs.get(0).getLocations(); @@ -863,12 +637,12 @@ public void testDecommissionWithOpenfile() throws IOException, InterruptedExcept //decommission one of the 3 nodes which have last block nodes.add(dnInfos4LastBlock[0].getXferAddr()); dnInfos.add(dm.getDatanode(dnInfos4LastBlock[0])); - - writeConfigFile(excludeFile, nodes); - refreshNodes(ns, conf); + + initExcludeHosts(nodes); + refreshNodes(0); for (DatanodeInfo dn : dnInfos) { waitNodeState(dn, AdminStates.DECOMMISSIONED); - } + } fdos.close(); } @@ -882,31 +656,32 @@ public void testDecommissionWithNamenodeRestart()throws IOException, Interrupted int numNamenodes = 1; int numDatanodes = 1; int replicas = 1; - conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, + getConf().setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT); - conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY, 5); + getConf().setLong(DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY, 5); - startCluster(numNamenodes, numDatanodes, conf); + startCluster(numNamenodes, numDatanodes); Path file1 = new Path("testDecommissionWithNamenodeRestart.dat"); - FileSystem fileSys = cluster.getFileSystem(); + FileSystem fileSys = getCluster().getFileSystem(); writeFile(fileSys, file1, replicas); - DFSClient client = getDfsClient(cluster.getNameNode(), conf); + DFSClient client = getDfsClient(0); DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE); DatanodeID excludedDatanodeID = info[0]; String excludedDatanodeName = info[0].getXferAddr(); - writeConfigFile(excludeFile, new ArrayList(Arrays.asList(excludedDatanodeName))); + initExcludeHost(excludedDatanodeName); //Add a new datanode to cluster - cluster.startDataNodes(conf, 1, true, null, null, null, null); + getCluster().startDataNodes(getConf(), 1, true, null, null, null, null); numDatanodes+=1; - assertEquals("Number of datanodes should be 2 ", 2, cluster.getDataNodes().size()); + assertEquals("Number of datanodes should be 2 ", 2, + getCluster().getDataNodes().size()); //Restart the namenode - cluster.restartNameNode(); + getCluster().restartNameNode(); DatanodeInfo datanodeInfo = NameNodeAdapter.getDatanode( - cluster.getNamesystem(), excludedDatanodeID); + getCluster().getNamesystem(), excludedDatanodeID); waitNodeState(datanodeInfo, AdminStates.DECOMMISSIONED); // Ensure decommissioned datanode is not automatically shutdown @@ -919,9 +694,8 @@ public void testDecommissionWithNamenodeRestart()throws IOException, Interrupted cleanupFile(fileSys, file1); // Restart the cluster and ensure recommissioned datanodes // are allowed to register with the namenode - cluster.shutdown(); - startCluster(numNamenodes, numDatanodes, conf); - cluster.shutdown(); + shutdownCluster(); + startCluster(numNamenodes, numDatanodes); } /** @@ -933,30 +707,30 @@ public void testDeadNodeCountAfterNamenodeRestart()throws Exception { int numNamenodes = 1; int numDatanodes = 2; - startCluster(numNamenodes, numDatanodes, conf); + startCluster(numNamenodes, numDatanodes); - DFSClient client = getDfsClient(cluster.getNameNode(), conf); + DFSClient client = getDfsClient(0); DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE); DatanodeInfo excludedDatanode = info[0]; String excludedDatanodeName = info[0].getXferAddr(); - writeConfigFile(hostsFile, new ArrayList(Arrays.asList( - excludedDatanodeName, info[1].getXferAddr()))); - decommissionNode(0, excludedDatanode.getDatanodeUuid(), null, + List hosts = new ArrayList(Arrays.asList( + excludedDatanodeName, info[1].getXferAddr())); + initIncludeHosts(hosts.toArray(new String[hosts.size()])); + takeNodeOutofService(0, excludedDatanode.getDatanodeUuid(), 0, null, AdminStates.DECOMMISSIONED); - cluster.stopDataNode(excludedDatanodeName); + getCluster().stopDataNode(excludedDatanodeName); DFSTestUtil.waitForDatanodeState( - cluster, excludedDatanode.getDatanodeUuid(), false, 20000); + getCluster(), excludedDatanode.getDatanodeUuid(), false, 20000); //Restart the namenode - cluster.restartNameNode(); + getCluster().restartNameNode(); assertEquals("There should be one node alive", 1, client.datanodeReport(DatanodeReportType.LIVE).length); assertEquals("There should be one node dead", 1, client.datanodeReport(DatanodeReportType.DEAD).length); - cluster.shutdown(); } /** @@ -976,7 +750,6 @@ public void testDeadNodeCountAfterNamenodeRestart()throws Exception { @Ignore @Test(timeout=360000) public void testIncludeByRegistrationName() throws Exception { - Configuration hdfsConf = new Configuration(conf); // Any IPv4 address starting with 127 functions as a "loopback" address // which is connected to the current host. So by choosing 127.0.0.100 // as our registration name, we have chosen a name which is also a valid @@ -985,26 +758,21 @@ public void testIncludeByRegistrationName() throws Exception { // to deal with DNS in this test. final String registrationName = "127.0.0.100"; final String nonExistentDn = "127.0.0.10"; - hdfsConf.set(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY, registrationName); - cluster = new MiniDFSCluster.Builder(hdfsConf) - .numDataNodes(1).checkDataNodeHostConfig(true) - .setupHostsFile(true).build(); - cluster.waitActive(); + getConf().set(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY, registrationName); + startCluster(1, 1, false, null, true); // Set up an includes file that doesn't have our datanode. - ArrayList nodes = new ArrayList(); - nodes.add(nonExistentDn); - writeConfigFile(hostsFile, nodes); - refreshNodes(cluster.getNamesystem(0), hdfsConf); + initIncludeHost(nonExistentDn); + refreshNodes(0); // Wait for the DN to be marked dead. LOG.info("Waiting for DN to be marked as dead."); - final DFSClient client = getDfsClient(cluster.getNameNode(0), hdfsConf); + final DFSClient client = getDfsClient(0); GenericTestUtils.waitFor(new Supplier() { @Override public Boolean get() { BlockManagerTestUtil - .checkHeartbeat(cluster.getNamesystem().getBlockManager()); + .checkHeartbeat(getCluster().getNamesystem().getBlockManager()); try { DatanodeInfo info[] = client.datanodeReport(DatanodeReportType.DEAD); return info.length == 1; @@ -1017,13 +785,11 @@ public Boolean get() { // Use a non-empty include file with our registration name. // It should work. - int dnPort = cluster.getDataNodes().get(0).getXferPort(); - nodes = new ArrayList(); - nodes.add(registrationName + ":" + dnPort); - writeConfigFile(hostsFile, nodes); - refreshNodes(cluster.getNamesystem(0), hdfsConf); - cluster.restartDataNode(0); - cluster.triggerHeartbeats(); + int dnPort = getCluster().getDataNodes().get(0).getXferPort(); + initIncludeHost(registrationName + ":" + dnPort); + refreshNodes(0); + getCluster().restartDataNode(0); + getCluster().triggerHeartbeats(); // Wait for the DN to come back. LOG.info("Waiting for DN to come back."); @@ -1031,7 +797,7 @@ public Boolean get() { @Override public Boolean get() { BlockManagerTestUtil - .checkHeartbeat(cluster.getNamesystem().getBlockManager()); + .checkHeartbeat(getCluster().getNamesystem().getBlockManager()); try { DatanodeInfo info[] = client.datanodeReport(DatanodeReportType.LIVE); if (info.length == 1) { @@ -1050,20 +816,19 @@ public Boolean get() { @Test(timeout=120000) public void testBlocksPerInterval() throws Exception { - Configuration newConf = new Configuration(conf); org.apache.log4j.Logger.getLogger(DecommissionManager.class) .setLevel(Level.TRACE); // Turn the blocks per interval way down - newConf.setInt( + getConf().setInt( DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BLOCKS_PER_INTERVAL_KEY, 3); // Disable the normal monitor runs - newConf.setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY, + getConf().setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY, Integer.MAX_VALUE); - startCluster(1, 3, newConf); - final FileSystem fs = cluster.getFileSystem(); + startCluster(1, 3); + final FileSystem fs = getCluster().getFileSystem(); final DatanodeManager datanodeManager = - cluster.getNamesystem().getBlockManager().getDatanodeManager(); + getCluster().getNamesystem().getBlockManager().getDatanodeManager(); final DecommissionManager decomManager = datanodeManager.getDecomManager(); // Write a 3 block file, so each node has one block. Should scan 3 nodes. @@ -1085,10 +850,9 @@ private void doDecomCheck(DatanodeManager datanodeManager, throws IOException, ExecutionException, InterruptedException { // Decom all nodes ArrayList decommissionedNodes = Lists.newArrayList(); - for (DataNode d: cluster.getDataNodes()) { - DatanodeInfo dn = decommissionNode(0, d.getDatanodeUuid(), - decommissionedNodes, - AdminStates.DECOMMISSION_INPROGRESS); + for (DataNode d: getCluster().getDataNodes()) { + DatanodeInfo dn = takeNodeOutofService(0, d.getDatanodeUuid(), 0, + decommissionedNodes, AdminStates.DECOMMISSION_INPROGRESS); decommissionedNodes.add(dn); } // Run decom scan and check @@ -1097,26 +861,25 @@ private void doDecomCheck(DatanodeManager datanodeManager, decomManager.getNumNodesChecked()); // Recommission all nodes for (DatanodeInfo dn : decommissionedNodes) { - recommissionNode(0, dn); + putNodeInService(0, dn); } } @Test(timeout=120000) public void testPendingNodes() throws Exception { - Configuration newConf = new Configuration(conf); org.apache.log4j.Logger.getLogger(DecommissionManager.class) .setLevel(Level.TRACE); // Only allow one node to be decom'd at a time - newConf.setInt( + getConf().setInt( DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES, 1); // Disable the normal monitor runs - newConf.setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY, + getConf().setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY, Integer.MAX_VALUE); - startCluster(1, 3, newConf); - final FileSystem fs = cluster.getFileSystem(); + startCluster(1, 3); + final FileSystem fs = getCluster().getFileSystem(); final DatanodeManager datanodeManager = - cluster.getNamesystem().getBlockManager().getDatanodeManager(); + getCluster().getNamesystem().getBlockManager().getDatanodeManager(); final DecommissionManager decomManager = datanodeManager.getDecomManager(); // Keep a file open to prevent decom from progressing @@ -1125,16 +888,15 @@ public void testPendingNodes() throws Exception { // Flush and trigger block reports so the block definitely shows up on NN open1.write(123); open1.hflush(); - for (DataNode d: cluster.getDataNodes()) { + for (DataNode d: getCluster().getDataNodes()) { DataNodeTestUtils.triggerBlockReport(d); } // Decom two nodes, so one is still alive ArrayList decommissionedNodes = Lists.newArrayList(); for (int i=0; i<2; i++) { - final DataNode d = cluster.getDataNodes().get(i); - DatanodeInfo dn = decommissionNode(0, d.getDatanodeUuid(), - decommissionedNodes, - AdminStates.DECOMMISSION_INPROGRESS); + final DataNode d = getCluster().getDataNodes().get(i); + DatanodeInfo dn = takeNodeOutofService(0, d.getDatanodeUuid(), 0, + decommissionedNodes, AdminStates.DECOMMISSION_INPROGRESS); decommissionedNodes.add(dn); } @@ -1145,10 +907,9 @@ public void testPendingNodes() throws Exception { // Close file, try to decom the last node, should get stuck in tracked open1.close(); - final DataNode d = cluster.getDataNodes().get(2); - DatanodeInfo dn = decommissionNode(0, d.getDatanodeUuid(), - decommissionedNodes, - AdminStates.DECOMMISSION_INPROGRESS); + final DataNode d = getCluster().getDataNodes().get(2); + DatanodeInfo dn = takeNodeOutofService(0, d.getDatanodeUuid(), 0, + decommissionedNodes, AdminStates.DECOMMISSION_INPROGRESS); decommissionedNodes.add(dn); BlockManagerTestUtil.recheckDecommissionState(datanodeManager); @@ -1171,16 +932,11 @@ private void assertTrackedAndPending(DecommissionManager decomManager, */ @Test public void testCountOnDecommissionedNodeList() throws IOException{ - conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1); - conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 1); + getConf().setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1); + getConf().setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, + 1); try { - cluster = - new MiniDFSCluster.Builder(conf) - .nnTopology(MiniDFSNNTopology.simpleFederatedTopology(1)) - .numDataNodes(1).build(); - cluster.waitActive(); - DFSClient client = getDfsClient(cluster.getNameNode(0), conf); - validateCluster(client, 1); + startCluster(1, 1); ArrayList> namenodeDecomList = new ArrayList>(1); @@ -1188,10 +944,10 @@ public void testCountOnDecommissionedNodeList() throws IOException{ // Move datanode1 to Decommissioned state ArrayList decommissionedNode = namenodeDecomList.get(0); - decommissionNode(0, null, - decommissionedNode, AdminStates.DECOMMISSIONED); + takeNodeOutofService(0, null, 0, decommissionedNode, + AdminStates.DECOMMISSIONED); - FSNamesystem ns = cluster.getNamesystem(0); + FSNamesystem ns = getCluster().getNamesystem(0); DatanodeManager datanodeManager = ns.getBlockManager().getDatanodeManager(); List live = new ArrayList(); @@ -1202,7 +958,7 @@ public void testCountOnDecommissionedNodeList() throws IOException{ datanodeManager.fetchDatanodes(live, null, true); assertTrue(0==live.size()); }finally { - cluster.shutdown(); + shutdownCluster(); } } @@ -1235,21 +991,15 @@ public void nodeUsageVerification(int numDatanodes, long[] nodesCapacity, Map> usage = null; DatanodeInfo decommissionedNodeInfo = null; String zeroNodeUsage = "0.00%"; - conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1); - conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1); - conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 1); + getConf().setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1); + getConf().setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1); + getConf().setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, + 1); FileSystem fileSys = null; Path file1 = new Path("testNodeUsage.dat"); try { - SimulatedFSDataset.setFactory(conf); - cluster = - new MiniDFSCluster.Builder(conf) - .nnTopology(MiniDFSNNTopology.simpleFederatedTopology(1)) - .numDataNodes(numDatanodes) - .simulatedCapacities(nodesCapacity).build(); - cluster.waitActive(); - DFSClient client = getDfsClient(cluster.getNameNode(0), conf); - validateCluster(client, numDatanodes); + SimulatedFSDataset.setFactory(getConf()); + startCluster(1, numDatanodes, false, nodesCapacity, false); ArrayList> namenodeDecomList = new ArrayList>(1); @@ -1258,12 +1008,12 @@ public void nodeUsageVerification(int numDatanodes, long[] nodesCapacity, if (decommissionState == AdminStates.DECOMMISSIONED) { // Move datanode1 to Decommissioned state ArrayList decommissionedNode = namenodeDecomList.get(0); - decommissionedNodeInfo = decommissionNode(0, null, + decommissionedNodeInfo = takeNodeOutofService(0, null, 0, decommissionedNode, decommissionState); } // Write a file(replica 1).Hence will be written to only one live node. - fileSys = cluster.getFileSystem(0); - FSNamesystem ns = cluster.getNamesystem(0); + fileSys = getCluster().getFileSystem(0); + FSNamesystem ns = getCluster().getNamesystem(0); writeFile(fileSys, file1, 1); Thread.sleep(2000); @@ -1276,7 +1026,7 @@ public void nodeUsageVerification(int numDatanodes, long[] nodesCapacity, // Start decommissioning datanode ArrayList decommissioningNodes = namenodeDecomList. get(0); - decommissionedNodeInfo = decommissionNode(0, null, + decommissionedNodeInfo = takeNodeOutofService(0, null, 0, decommissioningNodes, decommissionState); // NodeUsage should not include DECOMMISSION_INPROGRESS node // (minUsage should be 0.00%) @@ -1286,7 +1036,7 @@ public void nodeUsageVerification(int numDatanodes, long[] nodesCapacity, equalsIgnoreCase(zeroNodeUsage)); } // Recommission node - recommissionNode(0, decommissionedNodeInfo); + putNodeInService(0, decommissionedNodeInfo); usage = (Map>) JSON.parse(ns.getNodeUsage()); String nodeusageAfterRecommi = @@ -1297,7 +1047,6 @@ public void nodeUsageVerification(int numDatanodes, long[] nodesCapacity, equalsIgnoreCase(nodeusageAfterRecommi)); } finally { cleanupFile(fileSys, file1); - cluster.shutdown(); } } @@ -1306,9 +1055,8 @@ public void testUsedCapacity() throws Exception { int numNamenodes = 1; int numDatanodes = 2; - startCluster(numNamenodes,numDatanodes,conf); - cluster.waitActive(); - FSNamesystem ns = cluster.getNamesystem(0); + startCluster(numNamenodes, numDatanodes); + FSNamesystem ns = getCluster().getNamesystem(0); BlockManager blockManager = ns.getBlockManager(); DatanodeStatistics datanodeStatistics = blockManager.getDatanodeManager() .getDatanodeStatistics(); @@ -1318,11 +1066,11 @@ public void testUsedCapacity() throws Exception { long initialBlockPoolUsed = datanodeStatistics.getBlockPoolUsed(); ArrayList> namenodeDecomList = new ArrayList>(numNamenodes); - namenodeDecomList.add(0, new ArrayList(numDatanodes)); + namenodeDecomList.add(0, new ArrayList<>(numDatanodes)); ArrayList decommissionedNodes = namenodeDecomList.get(0); //decommission one node - DatanodeInfo decomNode = decommissionNode(0, null, decommissionedNodes, - AdminStates.DECOMMISSIONED); + DatanodeInfo decomNode = takeNodeOutofService(0, null, 0, + decommissionedNodes, AdminStates.DECOMMISSIONED); decommissionedNodes.add(decomNode); long newUsedCapacity = datanodeStatistics.getCapacityUsed(); long newTotalCapacity = datanodeStatistics.getCapacityTotal(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMaintenanceState.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMaintenanceState.java new file mode 100644 index 0000000000..63617ad5ea --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMaintenanceState.java @@ -0,0 +1,310 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.Collection; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.client.HdfsDataInputStream; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.util.Time; +import org.junit.Test; + +/** + * This class tests node maintenance. + */ +public class TestMaintenanceState extends AdminStatesBaseTest { + public static final Log LOG = LogFactory.getLog(TestMaintenanceState.class); + static private final long EXPIRATION_IN_MS = 500; + + public TestMaintenanceState() { + setUseCombinedHostFileManager(); + } + + /** + * Verify a node can transition from AdminStates.ENTERING_MAINTENANCE to + * AdminStates.NORMAL. + */ + @Test(timeout = 360000) + public void testTakeNodeOutOfEnteringMaintenance() throws Exception { + LOG.info("Starting testTakeNodeOutOfEnteringMaintenance"); + final int replicas = 1; + final int numNamenodes = 1; + final int numDatanodes = 1; + final Path file1 = new Path("/testTakeNodeOutOfEnteringMaintenance.dat"); + + startCluster(numNamenodes, numDatanodes); + + FileSystem fileSys = getCluster().getFileSystem(0); + writeFile(fileSys, file1, replicas, 1); + + DatanodeInfo nodeOutofService = takeNodeOutofService(0, + null, Long.MAX_VALUE, null, AdminStates.ENTERING_MAINTENANCE); + + putNodeInService(0, nodeOutofService.getDatanodeUuid()); + + cleanupFile(fileSys, file1); + } + + /** + * Verify a AdminStates.ENTERING_MAINTENANCE node can expire and transition + * to AdminStates.NORMAL upon timeout. + */ + @Test(timeout = 360000) + public void testEnteringMaintenanceExpiration() throws Exception { + LOG.info("Starting testEnteringMaintenanceExpiration"); + final int replicas = 1; + final int numNamenodes = 1; + final int numDatanodes = 1; + final Path file1 = new Path("/testTakeNodeOutOfEnteringMaintenance.dat"); + + startCluster(numNamenodes, numDatanodes); + + FileSystem fileSys = getCluster().getFileSystem(0); + writeFile(fileSys, file1, replicas, 1); + + // expires in 500 milliseconds + DatanodeInfo nodeOutofService = takeNodeOutofService(0, null, + Time.monotonicNow() + EXPIRATION_IN_MS, null, + AdminStates.ENTERING_MAINTENANCE); + + waitNodeState(nodeOutofService, AdminStates.NORMAL); + + cleanupFile(fileSys, file1); + } + + /** + * Verify node stays in AdminStates.NORMAL with invalid expiration. + */ + @Test(timeout = 360000) + public void testInvalidExpiration() throws Exception { + LOG.info("Starting testInvalidExpiration"); + final int replicas = 1; + final int numNamenodes = 1; + final int numDatanodes = 1; + final Path file1 = new Path("/testTakeNodeOutOfEnteringMaintenance.dat"); + + startCluster(numNamenodes, numDatanodes); + + FileSystem fileSys = getCluster().getFileSystem(0); + writeFile(fileSys, file1, replicas, 1); + + // expiration has to be greater than Time.monotonicNow(). + takeNodeOutofService(0, null, Time.monotonicNow(), null, + AdminStates.NORMAL); + + cleanupFile(fileSys, file1); + } + + /** + * When a dead node is put to maintenance, it transitions directly to + * AdminStates.IN_MAINTENANCE. + */ + @Test(timeout = 360000) + public void testPutDeadNodeToMaintenance() throws Exception { + LOG.info("Starting testPutDeadNodeToMaintenance"); + final int numNamenodes = 1; + final int numDatanodes = 1; + final int replicas = 1; + final Path file1 = new Path("/testPutDeadNodeToMaintenance.dat"); + + startCluster(numNamenodes, numDatanodes); + + FileSystem fileSys = getCluster().getFileSystem(0); + FSNamesystem ns = getCluster().getNamesystem(0); + writeFile(fileSys, file1, replicas, 1); + + MiniDFSCluster.DataNodeProperties dnProp = getCluster().stopDataNode(0); + DFSTestUtil.waitForDatanodeState( + getCluster(), dnProp.datanode.getDatanodeUuid(), false, 20000); + + int deadInMaintenance = ns.getNumInMaintenanceDeadDataNodes(); + int liveInMaintenance = ns.getNumInMaintenanceLiveDataNodes(); + + takeNodeOutofService(0, dnProp.datanode.getDatanodeUuid(), Long.MAX_VALUE, + null, AdminStates.IN_MAINTENANCE); + + assertEquals(deadInMaintenance + 1, ns.getNumInMaintenanceDeadDataNodes()); + assertEquals(liveInMaintenance, ns.getNumInMaintenanceLiveDataNodes()); + + cleanupFile(fileSys, file1); + } + + /** + * When a dead node is put to maintenance, it transitions directly to + * AdminStates.IN_MAINTENANCE. Then AdminStates.IN_MAINTENANCE expires and + * transitions to AdminStates.NORMAL. + */ + @Test(timeout = 360000) + public void testPutDeadNodeToMaintenanceWithExpiration() throws Exception { + LOG.info("Starting testPutDeadNodeToMaintenanceWithExpiration"); + final int numNamenodes = 1; + final int numDatanodes = 1; + final int replicas = 1; + final Path file1 = new Path("/testPutDeadNodeToMaintenance.dat"); + + startCluster(numNamenodes, numDatanodes); + + FileSystem fileSys = getCluster().getFileSystem(0); + FSNamesystem ns = getCluster().getNamesystem(0); + writeFile(fileSys, file1, replicas, 1); + + MiniDFSCluster.DataNodeProperties dnProp = getCluster().stopDataNode(0); + DFSTestUtil.waitForDatanodeState( + getCluster(), dnProp.datanode.getDatanodeUuid(), false, 20000); + + int deadInMaintenance = ns.getNumInMaintenanceDeadDataNodes(); + int liveInMaintenance = ns.getNumInMaintenanceLiveDataNodes(); + + DatanodeInfo nodeOutofService = takeNodeOutofService(0, + dnProp.datanode.getDatanodeUuid(), + Time.monotonicNow() + EXPIRATION_IN_MS, null, + AdminStates.IN_MAINTENANCE); + + waitNodeState(nodeOutofService, AdminStates.NORMAL); + + // no change + assertEquals(deadInMaintenance, ns.getNumInMaintenanceDeadDataNodes()); + assertEquals(liveInMaintenance, ns.getNumInMaintenanceLiveDataNodes()); + + cleanupFile(fileSys, file1); + } + + /** + * Transition from decommissioned state to maintenance state. + */ + @Test(timeout = 360000) + public void testTransitionFromDecommissioned() throws IOException { + LOG.info("Starting testTransitionFromDecommissioned"); + final int numNamenodes = 1; + final int numDatanodes = 4; + final int replicas = 3; + final Path file1 = new Path("/testTransitionFromDecommissioned.dat"); + + startCluster(numNamenodes, numDatanodes); + + FileSystem fileSys = getCluster().getFileSystem(0); + writeFile(fileSys, file1, replicas, 1); + + DatanodeInfo nodeOutofService = takeNodeOutofService(0, null, 0, null, + AdminStates.DECOMMISSIONED); + + takeNodeOutofService(0, nodeOutofService.getDatanodeUuid(), Long.MAX_VALUE, + null, AdminStates.IN_MAINTENANCE); + + cleanupFile(fileSys, file1); + } + + /** + * Transition from decommissioned state to maintenance state. + * After the maintenance state expires, it is transitioned to NORMAL. + */ + @Test(timeout = 360000) + public void testTransitionFromDecommissionedAndExpired() throws IOException { + LOG.info("Starting testTransitionFromDecommissionedAndExpired"); + final int numNamenodes = 1; + final int numDatanodes = 4; + final int replicas = 3; + final Path file1 = new Path("/testTransitionFromDecommissioned.dat"); + + startCluster(numNamenodes, numDatanodes); + + FileSystem fileSys = getCluster().getFileSystem(0); + writeFile(fileSys, file1, replicas, 1); + + DatanodeInfo nodeOutofService = takeNodeOutofService(0, null, 0, null, + AdminStates.DECOMMISSIONED); + + takeNodeOutofService(0, nodeOutofService.getDatanodeUuid(), + Time.monotonicNow() + EXPIRATION_IN_MS, null, + AdminStates.IN_MAINTENANCE); + + waitNodeState(nodeOutofService, AdminStates.NORMAL); + + cleanupFile(fileSys, file1); + } + + /** + * When a node is put to maintenance, it first transitions to + * AdminStates.ENTERING_MAINTENANCE. It makes sure all blocks have minimal + * replication before it can be transitioned to AdminStates.IN_MAINTENANCE. + * If node becomes dead when it is in AdminStates.ENTERING_MAINTENANCE, admin + * state should stay in AdminStates.ENTERING_MAINTENANCE state. + */ + @Test(timeout = 360000) + public void testNodeDeadWhenInEnteringMaintenance() throws Exception { + LOG.info("Starting testNodeDeadWhenInEnteringMaintenance"); + final int numNamenodes = 1; + final int numDatanodes = 1; + final int replicas = 1; + final Path file1 = new Path("/testNodeDeadWhenInEnteringMaintenance.dat"); + + startCluster(numNamenodes, numDatanodes); + + FileSystem fileSys = getCluster().getFileSystem(0); + FSNamesystem ns = getCluster().getNamesystem(0); + writeFile(fileSys, file1, replicas, 1); + + DatanodeInfo nodeOutofService = takeNodeOutofService(0, + getFirstBlockFirstReplicaUuid(fileSys, file1), Long.MAX_VALUE, null, + AdminStates.ENTERING_MAINTENANCE); + assertEquals(1, ns.getNumEnteringMaintenanceDataNodes()); + + MiniDFSCluster.DataNodeProperties dnProp = + getCluster().stopDataNode(nodeOutofService.getXferAddr()); + DFSTestUtil.waitForDatanodeState( + getCluster(), nodeOutofService.getDatanodeUuid(), false, 20000); + DFSClient client = getDfsClient(0); + assertEquals("maintenance node shouldn't be alive", numDatanodes - 1, + client.datanodeReport(DatanodeReportType.LIVE).length); + + getCluster().restartDataNode(dnProp, true); + getCluster().waitActive(); + waitNodeState(nodeOutofService, AdminStates.ENTERING_MAINTENANCE); + assertEquals(1, ns.getNumEnteringMaintenanceDataNodes()); + + cleanupFile(fileSys, file1); + } + + static protected String getFirstBlockFirstReplicaUuid(FileSystem fileSys, + Path name) throws IOException { + // need a raw stream + assertTrue("Not HDFS:"+fileSys.getUri(), + fileSys instanceof DistributedFileSystem); + HdfsDataInputStream dis = (HdfsDataInputStream)fileSys.open(name); + Collection dinfo = dis.getAllBlocks(); + for (LocatedBlock blk : dinfo) { // for each block + DatanodeInfo[] nodes = blk.getLocations(); + if (nodes.length > 0) { + return nodes[0].getDatanodeUuid(); + } + } + return null; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java index 7c39bf884f..6bb60409c4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java @@ -158,7 +158,7 @@ private void decommissionNode(FSNamesystem namesystem, String dnName) // write nodename into the exclude file. ArrayList nodes = new ArrayList(decommissionedNodes); nodes.add(dnName); - hostsFileWriter.initExcludeHosts(nodes.toArray(new String[0])); + hostsFileWriter.initExcludeHosts(nodes); } private void checkDecommissionStatus(DatanodeDescriptor decommNode, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/HostsFileWriter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/HostsFileWriter.java index 2ef0b8f2bc..4c8fcef5a8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/HostsFileWriter.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/HostsFileWriter.java @@ -20,8 +20,11 @@ import java.io.File; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; +import java.util.List; +import java.util.Map; import org.apache.commons.io.FileUtils; @@ -73,30 +76,60 @@ public void initialize(Configuration conf, String dir) throws IOException { } public void initExcludeHost(String hostNameAndPort) throws IOException { - initExcludeHosts(hostNameAndPort); + ArrayList nodes = new ArrayList<>(); + nodes.add(hostNameAndPort); + initExcludeHosts(nodes); } - public void initExcludeHosts(String... hostNameAndPorts) throws IOException { + public void initExcludeHosts(List hostNameAndPorts) + throws IOException { + initOutOfServiceHosts(hostNameAndPorts, null); + } + + public void initOutOfServiceHosts(List decommissionHostNameAndPorts, + Map maintenanceHosts) throws IOException { StringBuilder excludeHosts = new StringBuilder(); if (isLegacyHostsFile) { - for (String hostNameAndPort : hostNameAndPorts) { + if (maintenanceHosts != null && maintenanceHosts.size() > 0) { + throw new UnsupportedOperationException( + "maintenance support isn't supported by legacy hosts file"); + } + for (String hostNameAndPort : decommissionHostNameAndPorts) { excludeHosts.append(hostNameAndPort).append("\n"); } - DFSTestUtil.writeFile(localFileSys, excludeFile, excludeHosts.toString()); + DFSTestUtil.writeFile(localFileSys, excludeFile, + excludeHosts.toString()); } else { HashSet allDNs = new HashSet<>(); - for (String hostNameAndPort : hostNameAndPorts) { - DatanodeAdminProperties dn = new DatanodeAdminProperties(); - String[] hostAndPort = hostNameAndPort.split(":"); - dn.setHostName(hostAndPort[0]); - dn.setPort(Integer.parseInt(hostAndPort[1])); - dn.setAdminState(AdminStates.DECOMMISSIONED); - allDNs.add(dn); + if (decommissionHostNameAndPorts != null) { + for (String hostNameAndPort : decommissionHostNameAndPorts) { + DatanodeAdminProperties dn = new DatanodeAdminProperties(); + String[] hostAndPort = hostNameAndPort.split(":"); + dn.setHostName(hostAndPort[0]); + dn.setPort(Integer.parseInt(hostAndPort[1])); + dn.setAdminState(AdminStates.DECOMMISSIONED); + allDNs.add(dn); + } + } + if (maintenanceHosts != null) { + for (Map.Entry hostEntry : maintenanceHosts.entrySet()) { + DatanodeAdminProperties dn = new DatanodeAdminProperties(); + String[] hostAndPort = hostEntry.getKey().split(":"); + dn.setHostName(hostAndPort[0]); + dn.setPort(Integer.parseInt(hostAndPort[1])); + dn.setAdminState(AdminStates.IN_MAINTENANCE); + dn.setMaintenanceExpireTimeInMS(hostEntry.getValue()); + allDNs.add(dn); + } } CombinedHostsFileWriter.writeFile(combinedFile.toString(), allDNs); } } + public void initIncludeHost(String hostNameAndPort) throws IOException { + initIncludeHosts(new String[]{hostNameAndPort}); + } + public void initIncludeHosts(String[] hostNameAndPorts) throws IOException { StringBuilder includeHosts = new StringBuilder(); if (isLegacyHostsFile) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestCombinedHostsFileReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestCombinedHostsFileReader.java index 923cf66fea..b48784fbcc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestCombinedHostsFileReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestCombinedHostsFileReader.java @@ -62,7 +62,7 @@ public void tearDown() throws Exception { public void testLoadExistingJsonFile() throws Exception { Set all = CombinedHostsFileReader.readFile(EXISTING_FILE.getAbsolutePath()); - assertEquals(5, all.size()); + assertEquals(7, all.size()); } /* diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/dfs.hosts.json b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/dfs.hosts.json index 64fca48dbf..9c852e01eb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/dfs.hosts.json +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/dfs.hosts.json @@ -3,3 +3,5 @@ {"hostName": "host3", "adminState": "DECOMMISSIONED"} {"hostName": "host4", "upgradeDomain": "ud2", "adminState": "DECOMMISSIONED"} {"hostName": "host5", "port": 8090} +{"hostName": "host6", "adminState": "IN_MAINTENANCE"} +{"hostName": "host7", "adminState": "IN_MAINTENANCE", "maintenanceExpireTimeInMS": "112233"}