HDFS-2612. Handle refreshNameNodes in federated HA clusters. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1209249 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2011-12-01 21:26:08 +00:00
parent 4cbead8484
commit f4fa76719e
5 changed files with 412 additions and 163 deletions

View File

@ -35,3 +35,5 @@ HDFS-1971. Send block report from datanode to both active and standby namenodes.
HDFS-2616. Change DatanodeProtocol#sendHeartbeat() to return HeartbeatResponse. (suresh)
HDFS-2622. Fix TestDFSUpgrade in HA branch. (todd)
HDFS-2612. Handle refreshNameNodes in federated HA clusters (todd)

View File

@ -19,7 +19,9 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.commons.logging.Log;
@ -42,6 +44,8 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
/**
* One instance per block-pool/namespace on the DN, which handles the
@ -89,6 +93,21 @@ class BPOfferService {
this.bpServiceToActive = this.bpServices.get(0);
}
void refreshNNList(ArrayList<InetSocketAddress> addrs) throws IOException {
Set<InetSocketAddress> oldAddrs = Sets.newHashSet();
for (BPServiceActor actor : bpServices) {
oldAddrs.add(actor.getNNSocketAddress());
}
Set<InetSocketAddress> newAddrs = Sets.newHashSet(addrs);
if (!Sets.symmetricDifference(oldAddrs, newAddrs).isEmpty()) {
// Keep things simple for now -- we can implement this at a later date.
throw new IOException(
"HA does not currently support adding a new standby to a running DN. " +
"Please do a rolling restart of DNs to reconfigure the list of NNs.");
}
}
/**
* returns true if BP thread has completed initialization of storage
* and has registered with the corresponding namenode

View File

@ -0,0 +1,251 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.security.UserGroupInformation;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
/**
* Manages the BPOfferService objects for the data node.
* Creation, removal, starting, stopping, shutdown on BPOfferService
* objects must be done via APIs in this class.
*/
@InterfaceAudience.Private
class BlockPoolManager {
private static final Log LOG = DataNode.LOG;
private final Map<String, BPOfferService> bpByNameserviceId =
Maps.newHashMap();
private final Map<String, BPOfferService> bpByBlockPoolId =
Maps.newHashMap();
private final List<BPOfferService> offerServices =
Lists.newArrayList();
private final DataNode dn;
//This lock is used only to ensure exclusion of refreshNamenodes
private final Object refreshNamenodesLock = new Object();
BlockPoolManager(DataNode dn) {
this.dn = dn;
}
synchronized void addBlockPool(BPOfferService bpos) {
Preconditions.checkArgument(offerServices.contains(bpos),
"Unknown BPOS: %s", bpos);
if (bpos.getBlockPoolId() == null) {
throw new IllegalArgumentException("Null blockpool id");
}
bpByBlockPoolId.put(bpos.getBlockPoolId(), bpos);
}
/**
* Returns the array of BPOfferService objects.
* Caution: The BPOfferService returned could be shutdown any time.
*/
synchronized BPOfferService[] getAllNamenodeThreads() {
BPOfferService[] bposArray = new BPOfferService[offerServices.size()];
return offerServices.toArray(bposArray);
}
synchronized BPOfferService get(String bpid) {
return bpByBlockPoolId.get(bpid);
}
// TODO(HA) would be good to kill this
synchronized BPOfferService get(InetSocketAddress addr) {
for (BPOfferService bpos : offerServices) {
if (bpos.containsNN(addr)) {
return bpos;
}
}
return null;
}
synchronized void remove(BPOfferService t) {
offerServices.remove(t);
bpByBlockPoolId.remove(t.getBlockPoolId());
boolean removed = false;
for (Iterator<BPOfferService> it = bpByNameserviceId.values().iterator();
it.hasNext() && !removed;) {
BPOfferService bpos = it.next();
if (bpos == t) {
it.remove();
LOG.info("Removed " + bpos);
removed = true;
}
}
if (!removed) {
LOG.warn("Couldn't remove BPOS " + t + " from bpByNameserviceId map");
}
}
void shutDownAll() throws InterruptedException {
BPOfferService[] bposArray = this.getAllNamenodeThreads();
for (BPOfferService bpos : bposArray) {
bpos.stop(); //interrupts the threads
}
//now join
for (BPOfferService bpos : bposArray) {
bpos.join();
}
}
synchronized void startAll() throws IOException {
try {
UserGroupInformation.getLoginUser().doAs(
new PrivilegedExceptionAction<Object>() {
public Object run() throws Exception {
for (BPOfferService bpos : offerServices) {
bpos.start();
}
return null;
}
});
} catch (InterruptedException ex) {
IOException ioe = new IOException();
ioe.initCause(ex.getCause());
throw ioe;
}
}
void joinAll() {
for (BPOfferService bpos: this.getAllNamenodeThreads()) {
bpos.join();
}
}
void refreshNamenodes(Configuration conf)
throws IOException {
LOG.info("Refresh request received for nameservices: "
+ conf.get(DFSConfigKeys.DFS_FEDERATION_NAMESERVICES));
Map<String, Map<String, InetSocketAddress>> newAddressMap =
DFSUtil.getNNServiceRpcAddresses(conf);
synchronized (refreshNamenodesLock) {
doRefreshNamenodes(newAddressMap);
}
}
private void doRefreshNamenodes(
Map<String, Map<String, InetSocketAddress>> addrMap) throws IOException {
assert Thread.holdsLock(refreshNamenodesLock);
Set<String> toRefresh = Sets.newHashSet();
Set<String> toAdd = Sets.newHashSet();
Set<String> toRemove;
synchronized (this) {
// Step 1. For each of the new nameservices, figure out whether
// it's an update of the set of NNs for an existing NS,
// or an entirely new nameservice.
for (String nameserviceId : addrMap.keySet()) {
if (bpByNameserviceId.containsKey(nameserviceId)) {
toRefresh.add(nameserviceId);
} else {
toAdd.add(nameserviceId);
}
}
// Step 2. Any nameservices we currently have but are no longer present
// need to be removed.
toRemove = Sets.newHashSet(Sets.difference(
bpByNameserviceId.keySet(), addrMap.keySet()));
assert toRefresh.size() + toAdd.size() ==
addrMap.size() :
"toAdd: " + Joiner.on(",").useForNull("<default>").join(toAdd) +
" toRemove: " + Joiner.on(",").useForNull("<default>").join(toRemove) +
" toRefresh: " + Joiner.on(",").useForNull("<default>").join(toRefresh);
// Step 3. Start new nameservices
if (!toAdd.isEmpty()) {
LOG.info("Starting BPOfferServices for nameservices: " +
Joiner.on(",").useForNull("<default>").join(toAdd));
for (String nsToAdd : toAdd) {
ArrayList<InetSocketAddress> addrs =
Lists.newArrayList(addrMap.get(nsToAdd).values());
BPOfferService bpos = createBPOS(addrs);
bpByNameserviceId.put(nsToAdd, bpos);
offerServices.add(bpos);
}
}
startAll();
}
// Step 4. Shut down old nameservices. This happens outside
// of the synchronized(this) lock since they need to call
// back to .remove() from another thread
if (!toRemove.isEmpty()) {
LOG.info("Stopping BPOfferServices for nameservices: " +
Joiner.on(",").useForNull("<default>").join(toRemove));
for (String nsToRemove : toRemove) {
BPOfferService bpos = bpByNameserviceId.get(nsToRemove);
bpos.stop();
bpos.join();
// they will call remove on their own
}
}
// Step 5. Update nameservices whose NN list has changed
if (!toRefresh.isEmpty()) {
LOG.info("Refreshing list of NNs for nameservices: " +
Joiner.on(",").useForNull("<default>").join(toRefresh));
for (String nsToRefresh : toRefresh) {
BPOfferService bpos = bpByNameserviceId.get(nsToRefresh);
ArrayList<InetSocketAddress> addrs =
Lists.newArrayList(addrMap.get(nsToRefresh).values());
bpos.refreshNNList(addrs);
}
}
}
/**
* Extracted out for test purposes.
*/
protected BPOfferService createBPOS(List<InetSocketAddress> nnAddrs) {
return new BPOfferService(nnAddrs, dn);
}
}

View File

@ -48,7 +48,6 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STORAGEID_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_FEDERATION_NAMESERVICES;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTPS_ENABLE_KEY;
import java.io.BufferedOutputStream;
@ -71,12 +70,10 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
@ -91,7 +88,6 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DFSUtil.ConfiguredNNAddress;
import org.apache.hadoop.hdfs.HDFSPolicyProvider;
import org.apache.hadoop.hdfs.HdfsConfiguration;
@ -169,7 +165,6 @@
import org.mortbay.util.ajax.JSON;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
@ -236,163 +231,6 @@ public static InetSocketAddress createSocketAddr(String target) {
return NetUtils.createSocketAddr(target);
}
/**
* Manages he BPOfferService objects for the data node.
* Creation, removal, starting, stopping, shutdown on BPOfferService
* objects must be done via APIs in this class.
*/
@InterfaceAudience.Private
class BlockPoolManager {
private final Map<String, BPOfferService> bpMapping;
private final List<BPOfferService> offerServices;
//This lock is used only to ensure exclusion of refreshNamenodes
private final Object refreshNamenodesLock = new Object();
BlockPoolManager(Configuration conf)
throws IOException {
bpMapping = new HashMap<String, BPOfferService>();
offerServices = new ArrayList<BPOfferService>();
Map<String, Map<String, InetSocketAddress>> map =
DFSUtil.getNNServiceRpcAddresses(conf);
for (Entry<String, Map<String, InetSocketAddress>> entry :
map.entrySet()) {
List<InetSocketAddress> nnList = Lists.newArrayList(entry.getValue().values());
BPOfferService bpos = new BPOfferService(nnList, DataNode.this);
offerServices.add(bpos);
}
}
synchronized void addBlockPool(BPOfferService bpos) {
Preconditions.checkArgument(offerServices.contains(bpos),
"Unknown BPOS: %s", bpos);
if (bpos.getBlockPoolId() == null) {
throw new IllegalArgumentException("Null blockpool id");
}
LOG.info("===> registering in bpmapping: " + bpos);
bpMapping.put(bpos.getBlockPoolId(), bpos);
}
/**
* Returns the array of BPOfferService objects.
* Caution: The BPOfferService returned could be shutdown any time.
*/
synchronized BPOfferService[] getAllNamenodeThreads() {
BPOfferService[] bposArray = new BPOfferService[offerServices.size()];
return offerServices.toArray(bposArray);
}
synchronized BPOfferService get(String bpid) {
return bpMapping.get(bpid);
}
// TODO(HA) would be good to kill this
synchronized BPOfferService get(InetSocketAddress addr) {
for (BPOfferService bpos : offerServices) {
if (bpos.containsNN(addr)) {
return bpos;
}
}
return null;
}
synchronized void remove(BPOfferService t) {
offerServices.remove(t);
bpMapping.remove(t.getBlockPoolId());
}
void shutDownAll() throws InterruptedException {
BPOfferService[] bposArray = this.getAllNamenodeThreads();
for (BPOfferService bpos : bposArray) {
bpos.stop(); //interrupts the threads
}
//now join
for (BPOfferService bpos : bposArray) {
bpos.join();
}
}
synchronized void startAll() throws IOException {
try {
UserGroupInformation.getLoginUser().doAs(
new PrivilegedExceptionAction<Object>() {
public Object run() throws Exception {
for (BPOfferService bpos : offerServices) {
bpos.start();
}
return null;
}
});
} catch (InterruptedException ex) {
IOException ioe = new IOException();
ioe.initCause(ex.getCause());
throw ioe;
}
}
void joinAll() {
for (BPOfferService bpos: this.getAllNamenodeThreads()) {
bpos.join();
}
}
void refreshNamenodes(Configuration conf)
throws IOException {
throw new UnsupportedOperationException("TODO(HA)");
/*
* TODO(HA)
LOG.info("Refresh request received for nameservices: "
+ conf.get(DFS_FEDERATION_NAMESERVICES));
// TODO(HA): need to update this for multiple NNs per nameservice
// For now, just list all of the NNs into this set
Map<String, Map<String, InetSocketAddress>> newAddressMap =
DFSUtil.getNNServiceRpcAddresses(conf);
Set<InetSocketAddress> newAddresses = Sets.newHashSet();
for (ConfiguredNNAddress cnn : DFSUtil.flattenAddressMap(newAddressMap)) {
newAddresses.add(cnn.getAddress());
}
List<BPOfferService> toShutdown = new ArrayList<BPOfferService>();
List<InetSocketAddress> toStart = new ArrayList<InetSocketAddress>();
synchronized (refreshNamenodesLock) {
synchronized (this) {
for (InetSocketAddress nnaddr : offerServices.keySet()) {
if (!(newAddresses.contains(nnaddr))) {
toShutdown.add(offerServices.get(nnaddr));
}
}
for (InetSocketAddress nnaddr : newAddresses) {
if (!(offerServices.containsKey(nnaddr))) {
toStart.add(nnaddr);
}
}
for (InetSocketAddress nnaddr : toStart) {
BPOfferService bpos = new BPOfferService(nnaddr, DataNode.this);
offerServices.put(bpos.getNNSocketAddress(), bpos);
}
}
for (BPOfferService bpos : toShutdown) {
bpos.stop();
bpos.join();
}
// stoping the BPOSes causes them to call remove() on their own when they
// clean up.
// Now start the threads that are not already running.
startAll();
}
*/
}
}
volatile boolean shouldRun = true;
private BlockPoolManager blockPoolManager;
public volatile FSDatasetInterface data = null;
@ -779,7 +617,8 @@ void startDataNode(Configuration conf,
metrics = DataNodeMetrics.create(conf, getMachineName());
blockPoolManager = new BlockPoolManager(conf);
blockPoolManager = new BlockPoolManager(this);
blockPoolManager.refreshNamenodes(conf);
}
/**

View File

@ -0,0 +1,138 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import static org.junit.Assert.*;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
public class TestBlockPoolManager {
private Log LOG = LogFactory.getLog(TestBlockPoolManager.class);
private DataNode mockDN = Mockito.mock(DataNode.class);
private BlockPoolManager bpm;
private StringBuilder log = new StringBuilder();
private int mockIdx = 1;
@Before
public void setupBPM() {
bpm = new BlockPoolManager(mockDN){
@Override
protected BPOfferService createBPOS(List<InetSocketAddress> nnAddrs) {
final int idx = mockIdx++;
doLog("create #" + idx);
final BPOfferService bpos = Mockito.mock(BPOfferService.class);
Mockito.doReturn("Mock BPOS #" + idx).when(bpos).toString();
// Log refreshes
try {
Mockito.doAnswer(
new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
doLog("refresh #" + idx);
return null;
}
}).when(bpos).refreshNNList(
Mockito.<ArrayList<InetSocketAddress>>any());
} catch (IOException e) {
throw new RuntimeException(e);
}
// Log stops
Mockito.doAnswer(
new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
doLog("stop #" + idx);
bpm.remove(bpos);
return null;
}
}).when(bpos).stop();
return bpos;
}
};
}
private void doLog(String string) {
synchronized(log) {
LOG.info(string);
log.append(string).append("\n");
}
}
@Test
public void testSimpleSingleNS() throws Exception {
Configuration conf = new Configuration();
conf.set(DFSConfigKeys.FS_DEFAULT_NAME_KEY,
"hdfs://mock1:8020");
bpm.refreshNamenodes(conf);
assertEquals("create #1\n", log.toString());
}
@Test
public void testFederationRefresh() throws Exception {
Configuration conf = new Configuration();
conf.set(DFSConfigKeys.DFS_FEDERATION_NAMESERVICES,
"ns1,ns2");
addNN(conf, "ns1", "mock1:8020");
addNN(conf, "ns2", "mock1:8020");
bpm.refreshNamenodes(conf);
assertEquals(
"create #1\n" +
"create #2\n", log.toString());
log.setLength(0);
// Remove the first NS
conf.set(DFSConfigKeys.DFS_FEDERATION_NAMESERVICES,
"ns1");
bpm.refreshNamenodes(conf);
assertEquals(
"stop #1\n" +
"refresh #2\n", log.toString());
log.setLength(0);
// Add back an NS -- this creates a new BPOS since the old
// one for ns2 should have been previously retired
conf.set(DFSConfigKeys.DFS_FEDERATION_NAMESERVICES,
"ns1,ns2");
bpm.refreshNamenodes(conf);
assertEquals(
"create #3\n" +
"refresh #2\n", log.toString());
}
private static void addNN(Configuration conf, String ns, String addr) {
String key = DFSUtil.addKeySuffixes(
DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY, ns);
conf.set(key, addr);
}
}