diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index cb031a9c83..605f502c02 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -438,7 +438,7 @@ public long getTotalECBlockGroups() { private double reconstructionQueuesInitProgress = 0.0; /** for block replicas placement */ - private BlockPlacementPolicies placementPolicies; + private volatile BlockPlacementPolicies placementPolicies; private final BlockStoragePolicySuite storagePolicySuite; /** Check whether name system is running before terminating */ @@ -775,6 +775,14 @@ public BlockPlacementPolicy getBlockPlacementPolicy() { return placementPolicies.getPolicy(CONTIGUOUS); } + public void refreshBlockPlacementPolicy(Configuration conf) { + BlockPlacementPolicies bpp = + new BlockPlacementPolicies(conf, datanodeManager.getFSClusterStats(), + datanodeManager.getNetworkTopology(), + datanodeManager.getHost2DatanodeMap()); + placementPolicies = bpp; + } + /** Dump meta data to out. */ public void metaSave(PrintWriter out) { assert namesystem.hasReadLock(); // TODO: block manager read lock and NS write lock diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java index 2a74190995..74757e563a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java @@ -184,6 +184,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_GC_TIME_MONITOR_OBSERVATION_WINDOW_MS_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_GC_TIME_MONITOR_ENABLE; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_GC_TIME_MONITOR_ENABLE_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY; import static org.apache.hadoop.util.ExitUtil.terminate; import static org.apache.hadoop.util.ToolRunner.confirmPrompt; @@ -322,7 +324,9 @@ public enum OperationCategory { DFS_STORAGE_POLICY_SATISFIER_MODE_KEY, DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY, - DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION)); + DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION, + DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, + DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY)); private static final String USAGE = "Usage: hdfs namenode [" + StartupOption.BACKUP.getName() + "] | \n\t[" @@ -2179,6 +2183,10 @@ protected String reconfigurePropertyImpl(String property, String newVal) || property.equals( DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION)) { return reconfReplicationParameters(newVal, property); + } else if (property.equals(DFS_BLOCK_REPLICATOR_CLASSNAME_KEY) || property + .equals(DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY)) { + reconfBlockPlacementPolicy(); + return newVal; } else { throw new ReconfigurationException(property, newVal, getConf().get( property)); @@ -2223,6 +2231,11 @@ private String reconfReplicationParameters(final String newVal, } } + private void reconfBlockPlacementPolicy() { + getNamesystem().getBlockManager() + .refreshBlockPlacementPolicy(getNewConf()); + } + private int adjustNewVal(int defaultVal, String newVal) { if (newVal == null) { return defaultVal; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRefreshBlockPlacementPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRefreshBlockPlacementPolicy.java new file mode 100644 index 0000000000..b431db7ac6 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRefreshBlockPlacementPolicy.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.ReconfigurationException; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.AddBlockFlag; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; +import org.apache.hadoop.net.Node; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.EnumSet; +import java.util.List; +import java.util.Set; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY; +import static org.junit.Assert.assertEquals; + +/** + * Test refresh block placement policy. + */ +public class TestRefreshBlockPlacementPolicy { + private MiniDFSCluster cluster; + private Configuration config; + private static int counter = 0; + static class MockBlockPlacementPolicy extends BlockPlacementPolicyDefault { + @Override + public DatanodeStorageInfo[] chooseTarget(String srcPath, + int numOfReplicas, + Node writer, + List chosen, + boolean returnChosenNodes, + Set excludedNodes, + long blocksize, + BlockStoragePolicy storagePolicy, + EnumSet flags) { + counter++; + return super.chooseTarget(srcPath, numOfReplicas, writer, chosen, + returnChosenNodes, excludedNodes, blocksize, storagePolicy, flags); + } + } + + @Before + public void setup() throws IOException { + config = new Configuration(); + config.setClass(DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, + MockBlockPlacementPolicy.class, BlockPlacementPolicy.class); + config.setClass(DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY, + MockBlockPlacementPolicy.class, BlockPlacementPolicy.class); + cluster = new MiniDFSCluster.Builder(config).numDataNodes(9).build(); + cluster.waitActive(); + } + + @After + public void cleanup() throws IOException { + cluster.shutdown(); + } + + @Test + public void testRefreshReplicationPolicy() throws Exception { + Path file = new Path("/test-file"); + DistributedFileSystem dfs = cluster.getFileSystem(); + + verifyRefreshPolicy(dfs, file, () -> cluster.getNameNode() + .reconfigurePropertyImpl(DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, null)); + } + + @Test + public void testRefreshEcPolicy() throws Exception { + Path ecDir = new Path("/ec"); + Path file = new Path("/ec/test-file"); + DistributedFileSystem dfs = cluster.getFileSystem(); + dfs.mkdir(ecDir, FsPermission.createImmutable((short)755)); + dfs.setErasureCodingPolicy(ecDir, null); + + verifyRefreshPolicy(dfs, file, () -> cluster.getNameNode() + .reconfigurePropertyImpl(DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY, null)); + } + + @FunctionalInterface + private interface Refresh { + void refresh() throws ReconfigurationException; + } + + private void verifyRefreshPolicy(DistributedFileSystem dfs, Path file, + Refresh func) throws IOException, ReconfigurationException { + // Choose datanode using the mock policy. + int lastCounter = counter; + OutputStream out = dfs.create(file, true); + out.write("test".getBytes()); + out.close(); + assert(counter > lastCounter); + + // Refresh to the default policy. + func.refresh(); + + lastCounter = counter; + dfs.delete(file, true); + out = dfs.create(file, true); + out.write("test".getBytes()); + out.close(); + assertEquals(lastCounter, counter); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java index fb3b58413c..366e07c15e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java @@ -22,6 +22,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY; import com.google.common.base.Supplier; import com.google.common.collect.Lists; @@ -419,9 +421,11 @@ public void testNameNodeGetReconfigurableProperties() throws IOException { final List outs = Lists.newArrayList(); final List errs = Lists.newArrayList(); getReconfigurableProperties("namenode", address, outs, errs); - assertEquals(10, outs.size()); - assertEquals(DFS_HEARTBEAT_INTERVAL_KEY, outs.get(1)); - assertEquals(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, outs.get(2)); + assertEquals(12, outs.size()); + assertEquals(DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY, outs.get(1)); + assertEquals(DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, outs.get(2)); + assertEquals(DFS_HEARTBEAT_INTERVAL_KEY, outs.get(3)); + assertEquals(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, outs.get(4)); assertEquals(errs.size(), 0); }