diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt index 9188a55204..9fbc774673 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt @@ -35,3 +35,5 @@ HDFS-1971. Send block report from datanode to both active and standby namenodes. HDFS-2616. Change DatanodeProtocol#sendHeartbeat() to return HeartbeatResponse. (suresh) HDFS-2622. Fix TestDFSUpgrade in HA branch. (todd) + +HDFS-2612. Handle refreshNameNodes in federated HA clusters (todd) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java index 85807f6d5a..62b825be56 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java @@ -19,7 +19,9 @@ import java.io.IOException; import java.net.InetSocketAddress; +import java.util.ArrayList; import java.util.List; +import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import org.apache.commons.logging.Log; @@ -42,6 +44,8 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; /** * One instance per block-pool/namespace on the DN, which handles the @@ -89,6 +93,21 @@ class BPOfferService { this.bpServiceToActive = this.bpServices.get(0); } + void refreshNNList(ArrayList addrs) throws IOException { + Set oldAddrs = Sets.newHashSet(); + for (BPServiceActor actor : bpServices) { + oldAddrs.add(actor.getNNSocketAddress()); + } + Set newAddrs = Sets.newHashSet(addrs); + + if (!Sets.symmetricDifference(oldAddrs, newAddrs).isEmpty()) { + // Keep things simple for now -- we can implement this at a later date. + throw new IOException( + "HA does not currently support adding a new standby to a running DN. " + + "Please do a rolling restart of DNs to reconfigure the list of NNs."); + } + } + /** * returns true if BP thread has completed initialization of storage * and has registered with the corresponding namenode diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java new file mode 100644 index 0000000000..3176be2078 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java @@ -0,0 +1,251 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.security.UserGroupInformation; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +/** + * Manages the BPOfferService objects for the data node. + * Creation, removal, starting, stopping, shutdown on BPOfferService + * objects must be done via APIs in this class. + */ +@InterfaceAudience.Private +class BlockPoolManager { + private static final Log LOG = DataNode.LOG; + + private final Map bpByNameserviceId = + Maps.newHashMap(); + private final Map bpByBlockPoolId = + Maps.newHashMap(); + private final List offerServices = + Lists.newArrayList(); + + private final DataNode dn; + + //This lock is used only to ensure exclusion of refreshNamenodes + private final Object refreshNamenodesLock = new Object(); + + BlockPoolManager(DataNode dn) { + this.dn = dn; + } + + synchronized void addBlockPool(BPOfferService bpos) { + Preconditions.checkArgument(offerServices.contains(bpos), + "Unknown BPOS: %s", bpos); + if (bpos.getBlockPoolId() == null) { + throw new IllegalArgumentException("Null blockpool id"); + } + bpByBlockPoolId.put(bpos.getBlockPoolId(), bpos); + } + + /** + * Returns the array of BPOfferService objects. + * Caution: The BPOfferService returned could be shutdown any time. + */ + synchronized BPOfferService[] getAllNamenodeThreads() { + BPOfferService[] bposArray = new BPOfferService[offerServices.size()]; + return offerServices.toArray(bposArray); + } + + synchronized BPOfferService get(String bpid) { + return bpByBlockPoolId.get(bpid); + } + + // TODO(HA) would be good to kill this + synchronized BPOfferService get(InetSocketAddress addr) { + for (BPOfferService bpos : offerServices) { + if (bpos.containsNN(addr)) { + return bpos; + } + } + return null; + } + + synchronized void remove(BPOfferService t) { + offerServices.remove(t); + bpByBlockPoolId.remove(t.getBlockPoolId()); + + boolean removed = false; + for (Iterator it = bpByNameserviceId.values().iterator(); + it.hasNext() && !removed;) { + BPOfferService bpos = it.next(); + if (bpos == t) { + it.remove(); + LOG.info("Removed " + bpos); + removed = true; + } + } + + if (!removed) { + LOG.warn("Couldn't remove BPOS " + t + " from bpByNameserviceId map"); + } + } + + void shutDownAll() throws InterruptedException { + BPOfferService[] bposArray = this.getAllNamenodeThreads(); + + for (BPOfferService bpos : bposArray) { + bpos.stop(); //interrupts the threads + } + //now join + for (BPOfferService bpos : bposArray) { + bpos.join(); + } + } + + synchronized void startAll() throws IOException { + try { + UserGroupInformation.getLoginUser().doAs( + new PrivilegedExceptionAction() { + public Object run() throws Exception { + for (BPOfferService bpos : offerServices) { + bpos.start(); + } + return null; + } + }); + } catch (InterruptedException ex) { + IOException ioe = new IOException(); + ioe.initCause(ex.getCause()); + throw ioe; + } + } + + void joinAll() { + for (BPOfferService bpos: this.getAllNamenodeThreads()) { + bpos.join(); + } + } + + void refreshNamenodes(Configuration conf) + throws IOException { + LOG.info("Refresh request received for nameservices: " + + conf.get(DFSConfigKeys.DFS_FEDERATION_NAMESERVICES)); + + Map> newAddressMap = + DFSUtil.getNNServiceRpcAddresses(conf); + + synchronized (refreshNamenodesLock) { + doRefreshNamenodes(newAddressMap); + } + } + + private void doRefreshNamenodes( + Map> addrMap) throws IOException { + assert Thread.holdsLock(refreshNamenodesLock); + + Set toRefresh = Sets.newHashSet(); + Set toAdd = Sets.newHashSet(); + Set toRemove; + + synchronized (this) { + // Step 1. For each of the new nameservices, figure out whether + // it's an update of the set of NNs for an existing NS, + // or an entirely new nameservice. + for (String nameserviceId : addrMap.keySet()) { + if (bpByNameserviceId.containsKey(nameserviceId)) { + toRefresh.add(nameserviceId); + } else { + toAdd.add(nameserviceId); + } + } + + // Step 2. Any nameservices we currently have but are no longer present + // need to be removed. + toRemove = Sets.newHashSet(Sets.difference( + bpByNameserviceId.keySet(), addrMap.keySet())); + + assert toRefresh.size() + toAdd.size() == + addrMap.size() : + "toAdd: " + Joiner.on(",").useForNull("").join(toAdd) + + " toRemove: " + Joiner.on(",").useForNull("").join(toRemove) + + " toRefresh: " + Joiner.on(",").useForNull("").join(toRefresh); + + + // Step 3. Start new nameservices + if (!toAdd.isEmpty()) { + LOG.info("Starting BPOfferServices for nameservices: " + + Joiner.on(",").useForNull("").join(toAdd)); + + for (String nsToAdd : toAdd) { + ArrayList addrs = + Lists.newArrayList(addrMap.get(nsToAdd).values()); + BPOfferService bpos = createBPOS(addrs); + bpByNameserviceId.put(nsToAdd, bpos); + offerServices.add(bpos); + } + } + startAll(); + } + + // Step 4. Shut down old nameservices. This happens outside + // of the synchronized(this) lock since they need to call + // back to .remove() from another thread + if (!toRemove.isEmpty()) { + LOG.info("Stopping BPOfferServices for nameservices: " + + Joiner.on(",").useForNull("").join(toRemove)); + + for (String nsToRemove : toRemove) { + BPOfferService bpos = bpByNameserviceId.get(nsToRemove); + bpos.stop(); + bpos.join(); + // they will call remove on their own + } + } + + // Step 5. Update nameservices whose NN list has changed + if (!toRefresh.isEmpty()) { + LOG.info("Refreshing list of NNs for nameservices: " + + Joiner.on(",").useForNull("").join(toRefresh)); + + for (String nsToRefresh : toRefresh) { + BPOfferService bpos = bpByNameserviceId.get(nsToRefresh); + ArrayList addrs = + Lists.newArrayList(addrMap.get(nsToRefresh).values()); + bpos.refreshNNList(addrs); + } + } + } + + /** + * Extracted out for test purposes. + */ + protected BPOfferService createBPOS(List nnAddrs) { + return new BPOfferService(nnAddrs, dn); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index dc3a18163b..b2c974c28b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -48,7 +48,6 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STORAGEID_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_FEDERATION_NAMESERVICES; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTPS_ENABLE_KEY; import java.io.BufferedOutputStream; @@ -71,12 +70,10 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; @@ -91,7 +88,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtil.ConfiguredNNAddress; import org.apache.hadoop.hdfs.HDFSPolicyProvider; import org.apache.hadoop.hdfs.HdfsConfiguration; @@ -169,7 +165,6 @@ import org.mortbay.util.ajax.JSON; import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -236,163 +231,6 @@ public static InetSocketAddress createSocketAddr(String target) { return NetUtils.createSocketAddr(target); } - /** - * Manages he BPOfferService objects for the data node. - * Creation, removal, starting, stopping, shutdown on BPOfferService - * objects must be done via APIs in this class. - */ - @InterfaceAudience.Private - class BlockPoolManager { - private final Map bpMapping; - private final List offerServices; - - //This lock is used only to ensure exclusion of refreshNamenodes - private final Object refreshNamenodesLock = new Object(); - - BlockPoolManager(Configuration conf) - throws IOException { - bpMapping = new HashMap(); - offerServices = new ArrayList(); - - Map> map = - DFSUtil.getNNServiceRpcAddresses(conf); - for (Entry> entry : - map.entrySet()) { - List nnList = Lists.newArrayList(entry.getValue().values()); - BPOfferService bpos = new BPOfferService(nnList, DataNode.this); - offerServices.add(bpos); - } - } - - synchronized void addBlockPool(BPOfferService bpos) { - Preconditions.checkArgument(offerServices.contains(bpos), - "Unknown BPOS: %s", bpos); - if (bpos.getBlockPoolId() == null) { - throw new IllegalArgumentException("Null blockpool id"); - } - LOG.info("===> registering in bpmapping: " + bpos); - bpMapping.put(bpos.getBlockPoolId(), bpos); - } - - /** - * Returns the array of BPOfferService objects. - * Caution: The BPOfferService returned could be shutdown any time. - */ - synchronized BPOfferService[] getAllNamenodeThreads() { - BPOfferService[] bposArray = new BPOfferService[offerServices.size()]; - return offerServices.toArray(bposArray); - } - - synchronized BPOfferService get(String bpid) { - return bpMapping.get(bpid); - } - - // TODO(HA) would be good to kill this - synchronized BPOfferService get(InetSocketAddress addr) { - for (BPOfferService bpos : offerServices) { - if (bpos.containsNN(addr)) { - return bpos; - } - } - return null; - } - - synchronized void remove(BPOfferService t) { - offerServices.remove(t); - bpMapping.remove(t.getBlockPoolId()); - } - - void shutDownAll() throws InterruptedException { - BPOfferService[] bposArray = this.getAllNamenodeThreads(); - - for (BPOfferService bpos : bposArray) { - bpos.stop(); //interrupts the threads - } - //now join - for (BPOfferService bpos : bposArray) { - bpos.join(); - } - } - - synchronized void startAll() throws IOException { - try { - UserGroupInformation.getLoginUser().doAs( - new PrivilegedExceptionAction() { - public Object run() throws Exception { - for (BPOfferService bpos : offerServices) { - bpos.start(); - } - return null; - } - }); - } catch (InterruptedException ex) { - IOException ioe = new IOException(); - ioe.initCause(ex.getCause()); - throw ioe; - } - } - - void joinAll() { - for (BPOfferService bpos: this.getAllNamenodeThreads()) { - bpos.join(); - } - } - - void refreshNamenodes(Configuration conf) - throws IOException { - throw new UnsupportedOperationException("TODO(HA)"); -/* - * TODO(HA) - - LOG.info("Refresh request received for nameservices: " - + conf.get(DFS_FEDERATION_NAMESERVICES)); - - // TODO(HA): need to update this for multiple NNs per nameservice - // For now, just list all of the NNs into this set - Map> newAddressMap = - DFSUtil.getNNServiceRpcAddresses(conf); - Set newAddresses = Sets.newHashSet(); - for (ConfiguredNNAddress cnn : DFSUtil.flattenAddressMap(newAddressMap)) { - newAddresses.add(cnn.getAddress()); - } - - List toShutdown = new ArrayList(); - List toStart = new ArrayList(); - synchronized (refreshNamenodesLock) { - synchronized (this) { - for (InetSocketAddress nnaddr : offerServices.keySet()) { - if (!(newAddresses.contains(nnaddr))) { - toShutdown.add(offerServices.get(nnaddr)); - } - } - for (InetSocketAddress nnaddr : newAddresses) { - if (!(offerServices.containsKey(nnaddr))) { - toStart.add(nnaddr); - } - } - - for (InetSocketAddress nnaddr : toStart) { - BPOfferService bpos = new BPOfferService(nnaddr, DataNode.this); - offerServices.put(bpos.getNNSocketAddress(), bpos); - } - } - - for (BPOfferService bpos : toShutdown) { - bpos.stop(); - bpos.join(); - } - - // stoping the BPOSes causes them to call remove() on their own when they - // clean up. - - // Now start the threads that are not already running. - startAll(); - } - */ - } - - } - volatile boolean shouldRun = true; private BlockPoolManager blockPoolManager; public volatile FSDatasetInterface data = null; @@ -779,7 +617,8 @@ void startDataNode(Configuration conf, metrics = DataNodeMetrics.create(conf, getMachineName()); - blockPoolManager = new BlockPoolManager(conf); + blockPoolManager = new BlockPoolManager(this); + blockPoolManager.refreshNamenodes(conf); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockPoolManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockPoolManager.java new file mode 100644 index 0000000000..c0301ac814 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockPoolManager.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSUtil; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + + +public class TestBlockPoolManager { + private Log LOG = LogFactory.getLog(TestBlockPoolManager.class); + private DataNode mockDN = Mockito.mock(DataNode.class); + private BlockPoolManager bpm; + private StringBuilder log = new StringBuilder(); + private int mockIdx = 1; + + @Before + public void setupBPM() { + bpm = new BlockPoolManager(mockDN){ + + @Override + protected BPOfferService createBPOS(List nnAddrs) { + final int idx = mockIdx++; + doLog("create #" + idx); + final BPOfferService bpos = Mockito.mock(BPOfferService.class); + Mockito.doReturn("Mock BPOS #" + idx).when(bpos).toString(); + // Log refreshes + try { + Mockito.doAnswer( + new Answer() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + doLog("refresh #" + idx); + return null; + } + }).when(bpos).refreshNNList( + Mockito.>any()); + } catch (IOException e) { + throw new RuntimeException(e); + } + // Log stops + Mockito.doAnswer( + new Answer() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + doLog("stop #" + idx); + bpm.remove(bpos); + return null; + } + }).when(bpos).stop(); + return bpos; + } + }; + } + + private void doLog(String string) { + synchronized(log) { + LOG.info(string); + log.append(string).append("\n"); + } + } + + @Test + public void testSimpleSingleNS() throws Exception { + Configuration conf = new Configuration(); + conf.set(DFSConfigKeys.FS_DEFAULT_NAME_KEY, + "hdfs://mock1:8020"); + bpm.refreshNamenodes(conf); + assertEquals("create #1\n", log.toString()); + } + + @Test + public void testFederationRefresh() throws Exception { + Configuration conf = new Configuration(); + conf.set(DFSConfigKeys.DFS_FEDERATION_NAMESERVICES, + "ns1,ns2"); + addNN(conf, "ns1", "mock1:8020"); + addNN(conf, "ns2", "mock1:8020"); + bpm.refreshNamenodes(conf); + assertEquals( + "create #1\n" + + "create #2\n", log.toString()); + log.setLength(0); + + // Remove the first NS + conf.set(DFSConfigKeys.DFS_FEDERATION_NAMESERVICES, + "ns1"); + bpm.refreshNamenodes(conf); + assertEquals( + "stop #1\n" + + "refresh #2\n", log.toString()); + log.setLength(0); + + // Add back an NS -- this creates a new BPOS since the old + // one for ns2 should have been previously retired + conf.set(DFSConfigKeys.DFS_FEDERATION_NAMESERVICES, + "ns1,ns2"); + bpm.refreshNamenodes(conf); + assertEquals( + "create #3\n" + + "refresh #2\n", log.toString()); + } + + private static void addNN(Configuration conf, String ns, String addr) { + String key = DFSUtil.addKeySuffixes( + DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY, ns); + conf.set(key, addr); + } +}