From 4fcfea71bfb16295f3a661e712d66351a1edc55e Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Fri, 25 Mar 2016 17:09:12 -0700 Subject: [PATCH] HDFS-9005. Provide support for upgrade domain script. (Ming Ma via Lei Xu) --- .../protocol/DatanodeAdminProperties.java | 100 +++++++ .../hdfs/util/CombinedHostsFileReader.java | 76 ++++++ .../hdfs/util/CombinedHostsFileWriter.java | 69 +++++ .../CombinedHostFileManager.java | 250 ++++++++++++++++++ .../blockmanagement/HostConfigManager.java | 80 ++++++ .../hdfs/server/blockmanagement/HostSet.java | 114 ++++++++ ...TestUpgradeDomainBlockPlacementPolicy.java | 169 ++++++++++++ .../hadoop/hdfs/util/HostsFileWriter.java | 122 +++++++++ .../util/TestCombinedHostsFileReader.java | 79 ++++++ .../src/test/resources/dfs.hosts.json | 5 + 10 files changed, 1064 insertions(+) create mode 100644 hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeAdminProperties.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/CombinedHostsFileReader.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/CombinedHostsFileWriter.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CombinedHostFileManager.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostConfigManager.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostSet.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestUpgradeDomainBlockPlacementPolicy.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/HostsFileWriter.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestCombinedHostsFileReader.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/resources/dfs.hosts.json diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeAdminProperties.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeAdminProperties.java new file mode 100644 index 0000000000..9f7b98309d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeAdminProperties.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocol; + +import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates; + +/** + * The class describes the configured admin properties for a datanode. + * + * It is the static configuration specified by administrators via dfsadmin + * command; different from the runtime state. CombinedHostFileManager uses + * the class to deserialize the configurations from json-based file format. + * + * To decommission a node, use AdminStates.DECOMMISSIONED. + */ +public class DatanodeAdminProperties { + private String hostName; + private int port; + private String upgradeDomain; + private AdminStates adminState = AdminStates.NORMAL; + + /** + * Return the host name of the datanode. + * @return the host name of the datanode. + */ + public String getHostName() { + return hostName; + } + + /** + * Set the host name of the datanode. + * @param hostName the host name of the datanode. + */ + public void setHostName(final String hostName) { + this.hostName = hostName; + } + + /** + * Get the port number of the datanode. + * @return the port number of the datanode. + */ + public int getPort() { + return port; + } + + /** + * Set the port number of the datanode. + * @param port the port number of the datanode. + */ + public void setPort(final int port) { + this.port = port; + } + + /** + * Get the upgrade domain of the datanode. + * @return the upgrade domain of the datanode. + */ + public String getUpgradeDomain() { + return upgradeDomain; + } + + /** + * Set the upgrade domain of the datanode. + * @param upgradeDomain the upgrade domain of the datanode. + */ + public void setUpgradeDomain(final String upgradeDomain) { + this.upgradeDomain = upgradeDomain; + } + + /** + * Get the admin state of the datanode. + * @return the admin state of the datanode. + */ + public AdminStates getAdminState() { + return adminState; + } + + /** + * Set the admin state of the datanode. + * @param adminState the admin state of the datanode. + */ + public void setAdminState(final AdminStates adminState) { + this.adminState = adminState; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/CombinedHostsFileReader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/CombinedHostsFileReader.java new file mode 100644 index 0000000000..33acb91f83 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/CombinedHostsFileReader.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.util; + +import java.io.FileInputStream; +import java.io.InputStreamReader; +import java.io.IOException; +import java.io.Reader; + +import java.util.Iterator; +import java.util.Set; +import java.util.HashSet; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.codehaus.jackson.JsonFactory; +import org.codehaus.jackson.map.ObjectMapper; + +import org.apache.hadoop.hdfs.protocol.DatanodeAdminProperties; + +/** + * Reader support for JSON based datanode configuration, an alternative + * to the exclude/include files configuration. + * The JSON file format is the array of elements where each element + * in the array describes the properties of a datanode. The properties of + * a datanode is defined in {@link DatanodeAdminProperties}. For example, + * + * {"hostName": "host1"} + * {"hostName": "host2", "port": 50, "upgradeDomain": "ud0"} + * {"hostName": "host3", "port": 0, "adminState": "DECOMMISSIONED"} + */ +@InterfaceAudience.LimitedPrivate({"HDFS"}) +@InterfaceStability.Unstable +public final class CombinedHostsFileReader { + private CombinedHostsFileReader() { + } + + /** + * Deserialize a set of DatanodeAdminProperties from a json file. + * @param hostsFile the input json file to read from. + * @return the set of DatanodeAdminProperties + * @throws IOException + */ + public static Set + readFile(final String hostsFile) throws IOException { + HashSet allDNs = new HashSet<>(); + ObjectMapper mapper = new ObjectMapper(); + try (Reader input = + new InputStreamReader(new FileInputStream(hostsFile), "UTF-8")) { + Iterator iterator = + mapper.readValues(new JsonFactory().createJsonParser(input), + DatanodeAdminProperties.class); + while (iterator.hasNext()) { + DatanodeAdminProperties properties = iterator.next(); + allDNs.add(properties); + } + } + return allDNs; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/CombinedHostsFileWriter.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/CombinedHostsFileWriter.java new file mode 100644 index 0000000000..ea70be2eb7 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/CombinedHostsFileWriter.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.util; + +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.Writer; + +import java.util.Set; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.codehaus.jackson.map.ObjectMapper; + +import org.apache.hadoop.hdfs.protocol.DatanodeAdminProperties; + +/** + * Writer support for JSON based datanode configuration, an alternative + * to the exclude/include files configuration. + * The JSON file format is the array of elements where each element + * in the array describes the properties of a datanode. The properties of + * a datanode is defined in {@link DatanodeAdminProperties}. For example, + * + * {"hostName": "host1"} + * {"hostName": "host2", "port": 50, "upgradeDomain": "ud0"} + * {"hostName": "host3", "port": 0, "adminState": "DECOMMISSIONED"} + */ +@InterfaceAudience.LimitedPrivate({"HDFS"}) +@InterfaceStability.Unstable +public final class CombinedHostsFileWriter { + private CombinedHostsFileWriter() { + } + + /** + * Serialize a set of DatanodeAdminProperties to a json file. + * @param hostsFile the json file name. + * @param allDNs the set of DatanodeAdminProperties + * @throws IOException + */ + public static void writeFile(final String hostsFile, + final Set allDNs) throws IOException { + StringBuilder configs = new StringBuilder(); + try (Writer output = + new OutputStreamWriter(new FileOutputStream(hostsFile), "UTF-8")) { + for (DatanodeAdminProperties datanodeAdminProperties: allDNs) { + ObjectMapper mapper = new ObjectMapper(); + configs.append(mapper.writeValueAsString(datanodeAdminProperties)); + } + output.write(configs.toString()); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CombinedHostFileManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CombinedHostFileManager.java new file mode 100644 index 0000000000..3e913b93a2 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CombinedHostFileManager.java @@ -0,0 +1,250 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; +import com.google.common.collect.UnmodifiableIterator; +import com.google.common.collect.Iterables; +import com.google.common.collect.Collections2; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.DatanodeAdminProperties; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates; + +import java.io.IOException; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.Collection; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; + +import com.google.common.base.Predicate; + +import org.apache.hadoop.hdfs.util.CombinedHostsFileReader; + +/** + * This class manages datanode configuration using a json file. + * Please refer to {@link CombinedHostsFileReader} for the json format. + *

+ *

+ * Entries may or may not specify a port. If they don't, we consider + * them to apply to every DataNode on that host. The code canonicalizes the + * entries into IP addresses. + *

+ *

+ * The code ignores all entries that the DNS fails to resolve their IP + * addresses. This is okay because by default the NN rejects the registrations + * of DNs when it fails to do a forward and reverse lookup. Note that DNS + * resolutions are only done during the loading time to minimize the latency. + */ +public class CombinedHostFileManager extends HostConfigManager { + private static final Logger LOG = LoggerFactory.getLogger( + CombinedHostFileManager.class); + private Configuration conf; + private HostProperties hostProperties = new HostProperties(); + + static class HostProperties { + private Multimap allDNs = + HashMultimap.create(); + // optimization. If every node in the file isn't in service, it implies + // any node is allowed to register with nn. This is equivalent to having + // an empty "include" file. + private boolean emptyInServiceNodeLists = true; + synchronized void add(InetAddress addr, + DatanodeAdminProperties properties) { + allDNs.put(addr, properties); + if (properties.getAdminState().equals( + AdminStates.NORMAL)) { + emptyInServiceNodeLists = false; + } + } + + // If the includes list is empty, act as if everything is in the + // includes list. + synchronized boolean isIncluded(final InetSocketAddress address) { + return emptyInServiceNodeLists || Iterables.any( + allDNs.get(address.getAddress()), + new Predicate() { + public boolean apply(DatanodeAdminProperties input) { + return input.getPort() == 0 || + input.getPort() == address.getPort(); + } + }); + } + + synchronized boolean isExcluded(final InetSocketAddress address) { + return Iterables.any(allDNs.get(address.getAddress()), + new Predicate() { + public boolean apply(DatanodeAdminProperties input) { + return input.getAdminState().equals( + AdminStates.DECOMMISSIONED) && + (input.getPort() == 0 || + input.getPort() == address.getPort()); + } + }); + } + + synchronized String getUpgradeDomain(final InetSocketAddress address) { + Iterable datanode = Iterables.filter( + allDNs.get(address.getAddress()), + new Predicate() { + public boolean apply(DatanodeAdminProperties input) { + return (input.getPort() == 0 || + input.getPort() == address.getPort()); + } + }); + return datanode.iterator().hasNext() ? + datanode.iterator().next().getUpgradeDomain() : null; + } + + Iterable getIncludes() { + return new Iterable() { + @Override + public Iterator iterator() { + return new HostIterator(allDNs.entries()); + } + }; + } + + Iterable getExcludes() { + return new Iterable() { + @Override + public Iterator iterator() { + return new HostIterator( + Collections2.filter(allDNs.entries(), + new Predicate>() { + public boolean apply(java.util.Map.Entry entry) { + return entry.getValue().getAdminState().equals( + AdminStates.DECOMMISSIONED); + } + } + )); + } + }; + } + + static class HostIterator extends UnmodifiableIterator { + private final Iterator> it; + public HostIterator(Collection> nodes) { + this.it = nodes.iterator(); + } + @Override + public boolean hasNext() { + return it.hasNext(); + } + + @Override + public InetSocketAddress next() { + Map.Entry e = it.next(); + return new InetSocketAddress(e.getKey(), e.getValue().getPort()); + } + } + } + + @Override + public Iterable getIncludes() { + return hostProperties.getIncludes(); + } + + @Override + public Iterable getExcludes() { + return hostProperties.getExcludes(); + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public void refresh() throws IOException { + refresh(conf.get(DFSConfigKeys.DFS_HOSTS, "")); + } + private void refresh(final String hostsFile) throws IOException { + HostProperties hostProps = new HostProperties(); + Set all = + CombinedHostsFileReader.readFile(hostsFile); + for(DatanodeAdminProperties properties : all) { + InetSocketAddress addr = parseEntry(hostsFile, + properties.getHostName(), properties.getPort()); + if (addr != null) { + hostProps.add(addr.getAddress(), properties); + } + } + refresh(hostProps); + } + + @VisibleForTesting + static InetSocketAddress parseEntry(final String fn, final String hostName, + final int port) { + InetSocketAddress addr = new InetSocketAddress(hostName, port); + if (addr.isUnresolved()) { + LOG.warn("Failed to resolve {} in {}. ", hostName, fn); + return null; + } + return addr; + } + + @Override + public synchronized boolean isIncluded(final DatanodeID dn) { + return hostProperties.isIncluded(dn.getResolvedAddress()); + } + + @Override + public synchronized boolean isExcluded(final DatanodeID dn) { + return isExcluded(dn.getResolvedAddress()); + } + + private boolean isExcluded(final InetSocketAddress address) { + return hostProperties.isExcluded(address); + } + + @Override + public synchronized String getUpgradeDomain(final DatanodeID dn) { + return hostProperties.getUpgradeDomain(dn.getResolvedAddress()); + } + + /** + * Set the properties lists by the new instances. The + * old instance is discarded. + * @param hostProperties the new properties list + */ + @VisibleForTesting + private void refresh(final HostProperties hostProperties) { + synchronized (this) { + this.hostProperties = hostProperties; + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostConfigManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostConfigManager.java new file mode 100644 index 0000000000..f28ed2997a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostConfigManager.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.hdfs.protocol.DatanodeID; + +import java.io.IOException; +import java.net.InetSocketAddress; + +/** + * This interface abstracts how datanode configuration is managed. + * + * Each implementation defines its own way to persist the configuration. + * For example, it can use one JSON file to store the configs for all + * datanodes; or it can use one file to store in-service datanodes and another + * file to store decommission-requested datanodes. + * + * These files control which DataNodes the NameNode expects to see in the + * cluster. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public abstract class HostConfigManager implements Configurable { + + /** + * Return all the datanodes that are allowed to connect to the namenode. + * @return Iterable of all datanodes + */ + public abstract Iterable getIncludes(); + + /** + * Return all datanodes that should be in decommissioned state. + * @return Iterable of those datanodes + */ + public abstract Iterable getExcludes(); + + /** + * Check if a datanode is allowed to connect the namenode. + * @param dn the DatanodeID of the datanode + * @return boolean if dn is allowed to connect the namenode. + */ + public abstract boolean isIncluded(DatanodeID dn); + + /** + * Check if a datanode needs to be decommissioned. + * @param dn the DatanodeID of the datanode + * @return boolean if dn needs to be decommissioned. + */ + public abstract boolean isExcluded(DatanodeID dn); + + /** + * Reload the configuration. + */ + public abstract void refresh() throws IOException; + + /** + * Get the upgrade domain of a datanode. + * @param dn the DatanodeID of the datanode + * @return the upgrade domain of dn. + */ + public abstract String getUpgradeDomain(DatanodeID dn); +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostSet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostSet.java new file mode 100644 index 0000000000..958557b4f8 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostSet.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import com.google.common.base.Function; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Iterators; +import com.google.common.collect.Multimap; +import com.google.common.collect.UnmodifiableIterator; + +import javax.annotation.Nullable; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.Collection; +import java.util.Iterator; +import java.util.Map; + + +/** + * The HostSet allows efficient queries on matching wildcard addresses. + *

+ * For InetSocketAddress A and B with the same host address, + * we define a partial order between A and B, A <= B iff A.getPort() == B + * .getPort() || B.getPort() == 0. + */ +public class HostSet implements Iterable { + // Host -> lists of ports + private final Multimap addrs = HashMultimap.create(); + + /** + * The function that checks whether there exists an entry foo in the set + * so that foo <= addr. + */ + boolean matchedBy(InetSocketAddress addr) { + Collection ports = addrs.get(addr.getAddress()); + return addr.getPort() == 0 ? !ports.isEmpty() : ports.contains(addr + .getPort()); + } + + /** + * The function that checks whether there exists an entry foo in the set + * so that addr <= foo. + */ + boolean match(InetSocketAddress addr) { + int port = addr.getPort(); + Collection ports = addrs.get(addr.getAddress()); + boolean exactMatch = ports.contains(port); + boolean genericMatch = ports.contains(0); + return exactMatch || genericMatch; + } + + boolean isEmpty() { + return addrs.isEmpty(); + } + + int size() { + return addrs.size(); + } + + void add(InetSocketAddress addr) { + Preconditions.checkArgument(!addr.isUnresolved()); + addrs.put(addr.getAddress(), addr.getPort()); + } + + @Override + public Iterator iterator() { + return new UnmodifiableIterator() { + private final Iterator> it = addrs.entries().iterator(); + + @Override + public boolean hasNext() { + return it.hasNext(); + } + + @Override + public InetSocketAddress next() { + Map.Entry e = it.next(); + return new InetSocketAddress(e.getKey(), e.getValue()); + } + }; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder("HostSet("); + Joiner.on(",").appendTo(sb, Iterators.transform(iterator(), + new Function() { + @Override + public String apply(@Nullable InetSocketAddress addr) { + assert addr != null; + return addr.getAddress().getHostAddress() + ":" + addr.getPort(); + } + })); + return sb.append(")").toString(); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestUpgradeDomainBlockPlacementPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestUpgradeDomainBlockPlacementPolicy.java new file mode 100644 index 0000000000..cc14fcbb04 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestUpgradeDomainBlockPlacementPolicy.java @@ -0,0 +1,169 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.Arrays; +import java.util.EnumSet; +import java.util.HashSet; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.fs.permission.PermissionStatus; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.DatanodeAdminProperties; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyWithUpgradeDomain; +import org.apache.hadoop.hdfs.server.blockmanagement.CombinedHostFileManager; +import org.apache.hadoop.hdfs.server.blockmanagement.HostConfigManager; +import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; +import org.apache.hadoop.hdfs.util.HostsFileWriter; +import org.apache.hadoop.net.StaticMapping; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * End-to-end test case for upgrade domain + * The test configs upgrade domain for nodes via admin json + * config file and put some nodes to decommission state. + * The test then verifies replicas are placed on the nodes that + * satisfy the upgrade domain policy. + * + */ +public class TestUpgradeDomainBlockPlacementPolicy { + + private static final short REPLICATION_FACTOR = (short) 3; + private static final int DEFAULT_BLOCK_SIZE = 1024; + static final String[] racks = + { "/RACK1", "/RACK1", "/RACK1", "/RACK2", "/RACK2", "/RACK2" }; + /** + * Use host names that can be resolved ( + * InetSocketAddress#isUnresolved == false). Otherwise, + * CombinedHostFileManager won't allow those hosts. + */ + static final String[] hosts = + { "127.0.0.1", "127.0.0.1", "127.0.0.1", "127.0.0.1", + "127.0.0.1", "127.0.0.1" }; + static final String[] upgradeDomains = + { "ud1", "ud2", "ud3", "ud1", "ud2", "ud3" }; + static final Set expectedDatanodeIDs = new HashSet<>(); + private MiniDFSCluster cluster = null; + private NamenodeProtocols nameNodeRpc = null; + private FSNamesystem namesystem = null; + private PermissionStatus perm = null; + + @Before + public void setup() throws IOException { + StaticMapping.resetMap(); + Configuration conf = new HdfsConfiguration(); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE); + conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BLOCK_SIZE / 2); + conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, + BlockPlacementPolicyWithUpgradeDomain.class, + BlockPlacementPolicy.class); + conf.setClass(DFSConfigKeys.DFS_NAMENODE_HOSTS_PROVIDER_CLASSNAME_KEY, + CombinedHostFileManager.class, HostConfigManager.class); + HostsFileWriter hostsFileWriter = new HostsFileWriter(); + hostsFileWriter.initialize(conf, "temp/upgradedomainpolicy"); + + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(6).racks(racks) + .hosts(hosts).build(); + cluster.waitActive(); + nameNodeRpc = cluster.getNameNodeRpc(); + namesystem = cluster.getNamesystem(); + perm = new PermissionStatus("TestDefaultBlockPlacementPolicy", null, + FsPermission.getDefault()); + refreshDatanodeAdminProperties(hostsFileWriter); + hostsFileWriter.cleanup(); + } + + @After + public void teardown() { + if (cluster != null) { + cluster.shutdown(); + cluster = null; + } + } + + /** + * Define admin properties for these datanodes as follows. + * dn0 and dn3 have upgrade domain ud1. + * dn1 and dn4 have upgrade domain ud2. + * dn2 and dn5 have upgrade domain ud3. + * dn0 and dn5 are decommissioned. + * Given dn0, dn1 and dn2 are on rack1 and dn3, dn4 and dn5 are on + * rack2. Then any block's replicas should be on either + * {dn1, dn2, d3} or {dn2, dn3, dn4}. + */ + private void refreshDatanodeAdminProperties(HostsFileWriter hostsFileWriter) + throws IOException { + DatanodeAdminProperties[] datanodes = new DatanodeAdminProperties[ + hosts.length]; + for (int i = 0; i < hosts.length; i++) { + datanodes[i] = new DatanodeAdminProperties(); + DatanodeID datanodeID = cluster.getDataNodes().get(i).getDatanodeId(); + datanodes[i].setHostName(datanodeID.getHostName()); + datanodes[i].setPort(datanodeID.getXferPort()); + datanodes[i].setUpgradeDomain(upgradeDomains[i]); + } + datanodes[0].setAdminState(DatanodeInfo.AdminStates.DECOMMISSIONED); + datanodes[5].setAdminState(DatanodeInfo.AdminStates.DECOMMISSIONED); + hostsFileWriter.initIncludeHosts(datanodes); + cluster.getFileSystem().refreshNodes(); + + expectedDatanodeIDs.add(cluster.getDataNodes().get(2).getDatanodeId()); + expectedDatanodeIDs.add(cluster.getDataNodes().get(3).getDatanodeId()); + } + + @Test + public void testPlacement() throws Exception { + String clientMachine = "127.0.0.1"; + for (int i = 0; i < 5; i++) { + String src = "/test-" + i; + // Create the file with client machine + HdfsFileStatus fileStatus = namesystem.startFile(src, perm, + clientMachine, clientMachine, EnumSet.of(CreateFlag.CREATE), true, + REPLICATION_FACTOR, DEFAULT_BLOCK_SIZE, null, false); + LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine, + null, null, fileStatus.getFileId(), null); + + assertEquals("Block should be allocated sufficient locations", + REPLICATION_FACTOR, locatedBlock.getLocations().length); + Set locs = new HashSet<>(Arrays.asList( + locatedBlock.getLocations())); + for (DatanodeID datanodeID : expectedDatanodeIDs) { + locs.contains(datanodeID); + } + + nameNodeRpc.abandonBlock(locatedBlock.getBlock(), fileStatus.getFileId(), + src, clientMachine); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/HostsFileWriter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/HostsFileWriter.java new file mode 100644 index 0000000000..cd5ae95497 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/HostsFileWriter.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.util; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; + + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.server.blockmanagement.HostConfigManager; +import org.apache.hadoop.hdfs.server.blockmanagement.HostFileManager; + +import org.apache.hadoop.hdfs.protocol.DatanodeAdminProperties; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates; + +import static org.junit.Assert.assertTrue; + +public class HostsFileWriter { + private FileSystem localFileSys; + private Path fullDir; + private Path excludeFile; + private Path includeFile; + private Path combinedFile; + private boolean isLegacyHostsFile = false; + + public void initialize(Configuration conf, String dir) throws IOException { + localFileSys = FileSystem.getLocal(conf); + Path workingDir = new Path(MiniDFSCluster.getBaseDirectory()); + this.fullDir = new Path(workingDir, dir); + assertTrue(localFileSys.mkdirs(this.fullDir)); + + if (conf.getClass( + DFSConfigKeys.DFS_NAMENODE_HOSTS_PROVIDER_CLASSNAME_KEY, + HostFileManager.class, HostConfigManager.class).equals( + HostFileManager.class)) { + isLegacyHostsFile = true; + } + if (isLegacyHostsFile) { + excludeFile = new Path(fullDir, "exclude"); + includeFile = new Path(fullDir, "include"); + DFSTestUtil.writeFile(localFileSys, excludeFile, ""); + DFSTestUtil.writeFile(localFileSys, includeFile, ""); + conf.set(DFSConfigKeys.DFS_HOSTS_EXCLUDE, excludeFile.toUri().getPath()); + conf.set(DFSConfigKeys.DFS_HOSTS, includeFile.toUri().getPath()); + } else { + combinedFile = new Path(fullDir, "all"); + conf.set(DFSConfigKeys.DFS_HOSTS, combinedFile.toString()); + } + } + + public void initExcludeHost(String hostNameAndPort) throws IOException { + if (isLegacyHostsFile) { + DFSTestUtil.writeFile(localFileSys, excludeFile, hostNameAndPort); + } else { + DatanodeAdminProperties dn = new DatanodeAdminProperties(); + String [] hostAndPort = hostNameAndPort.split(":"); + dn.setHostName(hostAndPort[0]); + dn.setPort(Integer.parseInt(hostAndPort[1])); + dn.setAdminState(AdminStates.DECOMMISSIONED); + HashSet allDNs = new HashSet<>(); + allDNs.add(dn); + CombinedHostsFileWriter.writeFile(combinedFile.toString(), allDNs); + } + } + + public void initIncludeHosts(String[] hostNameAndPorts) throws IOException { + StringBuilder includeHosts = new StringBuilder(); + if (isLegacyHostsFile) { + for(String hostNameAndPort : hostNameAndPorts) { + includeHosts.append(hostNameAndPort).append("\n"); + } + DFSTestUtil.writeFile(localFileSys, includeFile, + includeHosts.toString()); + } else { + HashSet allDNs = new HashSet<>(); + for(String hostNameAndPort : hostNameAndPorts) { + String[] hostAndPort = hostNameAndPort.split(":"); + DatanodeAdminProperties dn = new DatanodeAdminProperties(); + dn.setHostName(hostAndPort[0]); + dn.setPort(Integer.parseInt(hostAndPort[1])); + allDNs.add(dn); + } + CombinedHostsFileWriter.writeFile(combinedFile.toString(), allDNs); + } + } + + public void initIncludeHosts(DatanodeAdminProperties[] datanodes) + throws IOException { + CombinedHostsFileWriter.writeFile(combinedFile.toString(), + new HashSet<>(Arrays.asList(datanodes))); + } + + public void cleanup() throws IOException { + if (localFileSys.exists(fullDir)) { + FileUtils.deleteQuietly(new File(fullDir.toUri().getPath())); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestCombinedHostsFileReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestCombinedHostsFileReader.java new file mode 100644 index 0000000000..c3946e412b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestCombinedHostsFileReader.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.util; + +import java.io.File; +import java.io.FileWriter; + +import java.util.Set; + +import org.apache.hadoop.hdfs.protocol.DatanodeAdminProperties; +import org.junit.Before; +import org.junit.After; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +/* + * Test for JSON based HostsFileReader + */ +public class TestCombinedHostsFileReader { + + // Using /test/build/data/tmp directory to store temporary files + static final String HOSTS_TEST_DIR = new File(System.getProperty( + "test.build.data", "/tmp")).getAbsolutePath(); + File NEW_FILE = new File(HOSTS_TEST_DIR, "dfs.hosts.new.json"); + + static final String TEST_CACHE_DATA_DIR = + System.getProperty("test.cache.data", "build/test/cache"); + File EXISTING_FILE = new File(TEST_CACHE_DATA_DIR, "dfs.hosts.json"); + + @Before + public void setUp() throws Exception { + } + + @After + public void tearDown() throws Exception { + // Delete test file after running tests + NEW_FILE.delete(); + + } + + /* + * Load the existing test json file + */ + @Test + public void testLoadExistingJsonFile() throws Exception { + Set all = + CombinedHostsFileReader.readFile(EXISTING_FILE.getAbsolutePath()); + assertEquals(5, all.size()); + } + + /* + * Test empty json config file + */ + @Test + public void testEmptyCombinedHostsFileReader() throws Exception { + FileWriter hosts = new FileWriter(NEW_FILE); + hosts.write(""); + hosts.close(); + Set all = + CombinedHostsFileReader.readFile(NEW_FILE.getAbsolutePath()); + assertEquals(0, all.size()); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/dfs.hosts.json b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/dfs.hosts.json new file mode 100644 index 0000000000..64fca48dbf --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/dfs.hosts.json @@ -0,0 +1,5 @@ +{"hostName": "host1"} +{"hostName": "host2", "upgradeDomain": "ud0"} +{"hostName": "host3", "adminState": "DECOMMISSIONED"} +{"hostName": "host4", "upgradeDomain": "ud2", "adminState": "DECOMMISSIONED"} +{"hostName": "host5", "port": 8090}