diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt index 6b67be47f0..763245d69a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt @@ -45,3 +45,5 @@ HDFS-2626. BPOfferService.verifyAndSetNamespaceInfo needs to be synchronized (to HDFS-2624. ConfiguredFailoverProxyProvider doesn't correctly stop ProtocolTranslators (todd) HDFS-2625. TestDfsOverAvroRpc failing after introduction of HeartbeatResponse type (todd) + +HDFS-2627. Determine DN's view of which NN is active based on heartbeat responses (todd) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java index eb611bffcb..d750d8587c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java @@ -37,14 +37,15 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.FinalizeCommand; import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand; +import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand; import org.apache.hadoop.ipc.RPC; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Joiner; import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; import com.google.common.collect.Sets; /** @@ -75,10 +76,31 @@ class BPOfferService { UpgradeManagerDatanode upgradeManager = null; private final DataNode dn; - private BPServiceActor bpServiceToActive; + /** + * A reference to the BPServiceActor associated with the currently + * ACTIVE NN. In the case that all NameNodes are in STANDBY mode, + * this can be null. If non-null, this must always refer to a member + * of the {@link #bpServices} list. + */ + private BPServiceActor bpServiceToActive = null; + + /** + * The list of all actors for namenodes in this nameservice, regardless + * of their active or standby states. + */ private List bpServices = new CopyOnWriteArrayList(); + /** + * Each time we receive a heartbeat from a NN claiming to be ACTIVE, + * we record that NN's most recent transaction ID here, so long as it + * is more recent than the previous value. This allows us to detect + * split-brain scenarios in which a prior NN is still asserting its + * ACTIVE state but with a too-low transaction ID. See HDFS-2627 + * for details. + */ + private long lastActiveClaimTxId = -1; + BPOfferService(List nnAddrs, DataNode dn) { Preconditions.checkArgument(!nnAddrs.isEmpty(), "Must pass at least one NN."); @@ -87,10 +109,6 @@ class BPOfferService { for (InetSocketAddress addr : nnAddrs) { this.bpServices.add(new BPServiceActor(addr, this)); } - // TODO(HA): currently we just make the first one the initial - // active. In reality it should start in an unknown state and then - // as we figure out which is active, designate one as such. - this.bpServiceToActive = this.bpServices.get(0); } void refreshNNList(ArrayList addrs) throws IOException { @@ -109,19 +127,23 @@ void refreshNNList(ArrayList addrs) throws IOException { } /** - * returns true if BP thread has completed initialization of storage - * and has registered with the corresponding namenode - * @return true if initialized + * @return true if the service has registered with at least one NameNode. */ boolean isInitialized() { - // TODO(HA) is this right? - return bpServiceToActive != null && bpServiceToActive.isInitialized(); + return bpRegistration != null; } + /** + * @return true if there is at least one actor thread running which is + * talking to a NameNode. + */ boolean isAlive() { - // TODO: should || all the bp actors probably? - return bpServiceToActive != null && - bpServiceToActive.isAlive(); + for (BPServiceActor actor : bpServices) { + if (actor.isAlive()) { + return true; + } + } + return false; } String getBlockPoolId() { @@ -322,7 +344,7 @@ DatanodeRegistration createRegistration() { * Called when an actor shuts down. If this is the last actor * to shut down, shuts down the whole blockpool in the DN. */ - void shutdownActor(BPServiceActor actor) { + synchronized void shutdownActor(BPServiceActor actor) { if (bpServiceToActive == actor) { bpServiceToActive = null; } @@ -339,7 +361,7 @@ void shutdownActor(BPServiceActor actor) { } @Deprecated - InetSocketAddress getNNSocketAddress() { + synchronized InetSocketAddress getNNSocketAddress() { // TODO(HA) this doesn't make sense anymore return bpServiceToActive.getNNSocketAddress(); } @@ -383,8 +405,61 @@ void reportRemoteBadBlock(DatanodeInfo dnInfo, ExtendedBlock block) { * @return a proxy to the active NN */ @Deprecated - DatanodeProtocol getActiveNN() { - return bpServiceToActive.bpNamenode; + synchronized DatanodeProtocol getActiveNN() { + if (bpServiceToActive != null) { + return bpServiceToActive.bpNamenode; + } else { + return null; + } + } + + /** + * Update the BPOS's view of which NN is active, based on a heartbeat + * response from one of the actors. + * + * @param actor the actor which received the heartbeat + * @param nnHaState the HA-related heartbeat contents + */ + synchronized void updateActorStatesFromHeartbeat( + BPServiceActor actor, + NNHAStatusHeartbeat nnHaState) { + final long txid = nnHaState.getTxId(); + + final boolean nnClaimsActive = + nnHaState.getState() == NNHAStatusHeartbeat.State.ACTIVE; + final boolean bposThinksActive = bpServiceToActive == actor; + final boolean isMoreRecentClaim = txid > lastActiveClaimTxId; + + if (nnClaimsActive && !bposThinksActive) { + LOG.info("Namenode " + actor + " trying to claim ACTIVE state with " + + "txid=" + txid); + if (!isMoreRecentClaim) { + // Split-brain scenario - an NN is trying to claim active + // state when a different NN has already claimed it with a higher + // txid. + LOG.warn("NN " + actor + " tried to claim ACTIVE state at txid=" + + txid + " but there was already a more recent claim at txid=" + + lastActiveClaimTxId); + return; + } else { + if (bpServiceToActive == null) { + LOG.info("Acknowledging ACTIVE Namenode " + actor); + } else { + LOG.info("Namenode " + actor + " taking over ACTIVE state from " + + bpServiceToActive + " at higher txid=" + txid); + } + bpServiceToActive = actor; + } + } else if (!nnClaimsActive && bposThinksActive) { + LOG.info("Namenode " + actor + " relinquishing ACTIVE state with " + + "txid=" + nnHaState.getTxId()); + bpServiceToActive = null; + } + + if (bpServiceToActive == actor) { + assert txid >= lastActiveClaimTxId; + lastActiveClaimTxId = txid; + } } /** @@ -415,7 +490,17 @@ void triggerBlockReportForTests() throws IOException { } } - boolean processCommandFromActor(DatanodeCommand cmd, + /** + * Run an immediate heartbeat from all actors. Used by tests. + */ + @VisibleForTesting + void triggerHeartbeatForTests() throws IOException { + for (BPServiceActor actor : bpServices) { + actor.triggerHeartbeatForTests(); + } + } + + synchronized boolean processCommandFromActor(DatanodeCommand cmd, BPServiceActor actor) throws IOException { assert bpServices.contains(actor); if (actor == bpServiceToActive) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index bf49cc0a6b..f6537fa453 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -284,6 +284,14 @@ void triggerBlockReportForTests() throws IOException { lastBlockReport = 0; blockReport(); } + + @VisibleForTesting + void triggerHeartbeatForTests() throws IOException { + synchronized (receivedAndDeletedBlockList) { + lastHeartbeat = 0; + receivedAndDeletedBlockList.notifyAll(); + } + } /** * Report the list blocks to the Namenode @@ -420,8 +428,18 @@ private void offerService() throws Exception { lastHeartbeat = startTime; if (!dn.areHeartbeatsDisabledForTests()) { HeartbeatResponse resp = sendHeartBeat(); + assert resp != null; dn.getMetrics().addHeartbeat(now() - startTime); + // If the state of this NN has changed (eg STANDBY->ACTIVE) + // then let the BPOfferService update itself. + // + // Important that this happens before processCommand below, + // since the first heartbeat to a new active might have commands + // that we should actually process. + bpos.updateActorStatesFromHeartbeat( + this, resp.getNameNodeHaState()); + long startProcessCommands = now(); if (!processCommand(resp.getCommands())) continue; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 09b6634dab..88fa9964fd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -150,11 +150,16 @@ import org.apache.hadoop.hdfs.server.namenode.PendingDataNodeMessages.BlockReportMessage; import org.apache.hadoop.hdfs.server.namenode.PendingDataNodeMessages.CommitBlockSynchronizationMessage; import org.apache.hadoop.hdfs.server.namenode.PendingDataNodeMessages.DataNodeMessage; +import org.apache.hadoop.hdfs.server.namenode.ha.ActiveState; import org.apache.hadoop.hdfs.server.namenode.ha.EditLogTailer; +import org.apache.hadoop.hdfs.server.namenode.ha.HAContext; +import org.apache.hadoop.hdfs.server.namenode.ha.HAState; +import org.apache.hadoop.hdfs.server.namenode.ha.StandbyState; import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMBean; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse; +import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat; import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; @@ -308,6 +313,12 @@ private static final void logAuditEvent(UserGroupInformation ugi, * Used when this NN is in standby state to read from the shared edit log. */ private EditLogTailer editLogTailer = null; + + /** + * Reference to the NN's HAContext object. This is only set once + * {@link #startCommonServices(Configuration, HAContext)} is called. + */ + private HAContext haContext; PendingDataNodeMessages getPendingDataNodeMessages() { return pendingDatanodeMessages; @@ -434,11 +445,13 @@ void stopSecretManager() { /** * Start services common to both active and standby states + * @param haContext * @throws IOException */ - void startCommonServices(Configuration conf) throws IOException { + void startCommonServices(Configuration conf, HAContext haContext) throws IOException { this.registerMBean(); // register the MBean for the FSNamesystemState writeLock(); + this.haContext = haContext; try { nnResourceChecker = new NameNodeResourceChecker(conf); checkAvailableResources(); @@ -2706,12 +2719,28 @@ HeartbeatResponse handleHeartbeat(DatanodeRegistration nodeReg, cmds = new DatanodeCommand[] {cmd}; } } - return new HeartbeatResponse(cmds); + + return new HeartbeatResponse(cmds, createHaStatusHeartbeat()); } finally { readUnlock(); } } + private NNHAStatusHeartbeat createHaStatusHeartbeat() { + HAState state = haContext.getState(); + NNHAStatusHeartbeat.State hbState; + if (state instanceof ActiveState) { + hbState = NNHAStatusHeartbeat.State.ACTIVE; + } else if (state instanceof StandbyState) { + hbState = NNHAStatusHeartbeat.State.STANDBY; + } else { + throw new AssertionError("Invalid state: " + state.getClass()); + } + return new NNHAStatusHeartbeat(hbState, + Math.max(getFSImage().getLastAppliedTxId(), + getFSImage().getEditLog().getLastWrittenTxId())); + } + /** * Returns whether or not there were available resources at the last check of * resources. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java index b05b9f10bd..fca815fdea 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java @@ -426,7 +426,7 @@ protected void validateConfigurationSettings(final Configuration conf) /** Start the services common to active and standby states */ private void startCommonServices(Configuration conf) throws IOException { - namesystem.startCommonServices(conf); + namesystem.startCommonServices(conf, haContext); startHttpServer(conf); rpcServer.start(); plugins = conf.getInstances(DFS_NAMENODE_PLUGINS_KEY, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/HeartbeatResponse.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/HeartbeatResponse.java index fb1a533afc..96f74a0c79 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/HeartbeatResponse.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/HeartbeatResponse.java @@ -35,17 +35,26 @@ public class HeartbeatResponse implements Writable { /** Commands returned from the namenode to the datanode */ private DatanodeCommand[] commands; + /** Information about the current HA-related state of the NN */ + private NNHAStatusHeartbeat haStatus; + public HeartbeatResponse() { // Empty constructor required for Writable } - public HeartbeatResponse(DatanodeCommand[] cmds) { + public HeartbeatResponse(DatanodeCommand[] cmds, + NNHAStatusHeartbeat haStatus) { commands = cmds; + this.haStatus = haStatus; } public DatanodeCommand[] getCommands() { return commands; } + + public NNHAStatusHeartbeat getNameNodeHaState() { + return haStatus; + } /////////////////////////////////////////// // Writable @@ -58,6 +67,7 @@ public void write(DataOutput out) throws IOException { ObjectWritable.writeObject(out, commands[i], commands[i].getClass(), null, true); } + haStatus.write(out); } @Override @@ -69,5 +79,7 @@ public void readFields(DataInput in) throws IOException { commands[i] = (DatanodeCommand) ObjectWritable.readObject(in, objectWritable, null); } + haStatus = new NNHAStatusHeartbeat(); + haStatus.readFields(in); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NNHAStatusHeartbeat.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NNHAStatusHeartbeat.java new file mode 100644 index 0000000000..633aa850df --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NNHAStatusHeartbeat.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.protocol; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; + +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class NNHAStatusHeartbeat implements Writable { + + private State state; + private long txid = HdfsConstants.INVALID_TXID; + + public NNHAStatusHeartbeat() { + } + + public NNHAStatusHeartbeat(State state, long txid) { + this.state = state; + this.txid = txid; + } + + public State getState() { + return state; + } + + public long getTxId() { + return txid; + } + + /////////////////////////////////////////// + // Writable + /////////////////////////////////////////// + @Override + public void write(DataOutput out) throws IOException { + WritableUtils.writeEnum(out, state); + out.writeLong(txid); + } + + @Override + public void readFields(DataInput in) throws IOException { + state = WritableUtils.readEnum(in, State.class); + txid = in.readLong(); + } + + @InterfaceAudience.Private + public enum State { + ACTIVE, + STANDBY; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/HeartbeatResponseWritable.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/HeartbeatResponseWritable.java index f7fe3db7b7..e32c3b126c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/HeartbeatResponseWritable.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/HeartbeatResponseWritable.java @@ -31,6 +31,7 @@ @InterfaceStability.Evolving public class HeartbeatResponseWritable implements Writable { private DatanodeCommandWritable[] commands; + private NNHAStatusHeartbeatWritable haStatus; public HeartbeatResponseWritable() { // Empty constructor for Writable @@ -41,7 +42,8 @@ public HeartbeatResponseWritable(DatanodeCommandWritable[] cmds) { } public HeartbeatResponse convert() { - return new HeartbeatResponse(DatanodeCommandWritable.convert(commands)); + return new HeartbeatResponse(DatanodeCommandWritable.convert(commands), + NNHAStatusHeartbeatWritable.convert(haStatus)); } /////////////////////////////////////////// @@ -55,6 +57,7 @@ public void write(DataOutput out) throws IOException { ObjectWritable.writeObject(out, commands[i], commands[i].getClass(), null, true); } + haStatus.write(out); } @Override @@ -66,6 +69,8 @@ public void readFields(DataInput in) throws IOException { commands[i] = (DatanodeCommandWritable) ObjectWritable.readObject(in, objectWritable, null); } + haStatus = new NNHAStatusHeartbeatWritable(); + haStatus.readFields(in); } public static HeartbeatResponseWritable convert( @@ -73,4 +78,4 @@ public static HeartbeatResponseWritable convert( return new HeartbeatResponseWritable(DatanodeCommandWritable.convert(resp .getCommands())); } -} \ No newline at end of file +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/NNHAStatusHeartbeatWritable.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/NNHAStatusHeartbeatWritable.java new file mode 100644 index 0000000000..44ba33f54d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/NNHAStatusHeartbeatWritable.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.protocolR23Compatible; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat; +import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat.State; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; + +@InterfaceAudience.Private +@InterfaceStability.Evolving +/** + * Response to {@link DatanodeProtocol#sendHeartbeat} + */ +public class NNHAStatusHeartbeatWritable implements Writable { + + private State state; + private long txid = HdfsConstants.INVALID_TXID; + + public NNHAStatusHeartbeatWritable() { + } + + public NNHAStatusHeartbeatWritable(State state, long txid) { + this.state = state; + this.txid = txid; + } + + public State getState() { + return state; + } + + public long getTxId() { + return txid; + } + + /////////////////////////////////////////// + // Writable + /////////////////////////////////////////// + @Override + public void write(DataOutput out) throws IOException { + WritableUtils.writeEnum(out, state); + out.writeLong(txid); + } + + @Override + public void readFields(DataInput in) throws IOException { + state = WritableUtils.readEnum(in, State.class); + txid = in.readLong(); + } + + public static NNHAStatusHeartbeat convert( + NNHAStatusHeartbeatWritable haStatus) { + return new NNHAStatusHeartbeat(haStatus.getState(), haStatus.getTxId()); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java index de26891f95..144b5c2aa6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.net.InetSocketAddress; +import java.util.Arrays; import java.util.Map; import org.apache.commons.logging.Log; @@ -32,9 +33,12 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics; import org.apache.hadoop.hdfs.server.protocol.BlockCommand; +import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse; +import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat; +import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat.State; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; import org.apache.hadoop.test.GenericTestUtils; @@ -43,6 +47,8 @@ import org.junit.Test; import org.mockito.ArgumentCaptor; import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import com.google.common.base.Supplier; import com.google.common.collect.Lists; @@ -63,13 +69,15 @@ public class TestBPOfferService { private DatanodeProtocol mockNN1; private DatanodeProtocol mockNN2; + private NNHAStatusHeartbeat[] mockHaStatuses = new NNHAStatusHeartbeat[2]; + private int heartbeatCounts[] = new int[2]; private DataNode mockDn; private FSDatasetInterface mockFSDataset; @Before public void setupMocks() throws Exception { - mockNN1 = setupNNMock(); - mockNN2 = setupNNMock(); + mockNN1 = setupNNMock(0); + mockNN2 = setupNNMock(1); // Set up a mock DN with the bare-bones configuration // objects, etc. @@ -92,14 +100,17 @@ public void setupMocks() throws Exception { /** * Set up a mock NN with the bare minimum for a DN to register to it. */ - private DatanodeProtocol setupNNMock() throws Exception { + private DatanodeProtocol setupNNMock(int nnIdx) throws Exception { DatanodeProtocol mock = Mockito.mock(DatanodeProtocol.class); Mockito.doReturn( new NamespaceInfo(1, FAKE_CLUSTERID, FAKE_BPID, 0, HdfsConstants.LAYOUT_VERSION)) .when(mock).versionRequest(); - Mockito.doReturn(new HeartbeatResponse(null)) + Mockito.doReturn(new DatanodeRegistration("fake-node")) + .when(mock).registerDatanode(Mockito.any(DatanodeRegistration.class)); + + Mockito.doAnswer(new HeartbeatAnswer(nnIdx)) .when(mock).sendHeartbeat( Mockito.any(DatanodeRegistration.class), Mockito.anyLong(), @@ -109,10 +120,31 @@ private DatanodeProtocol setupNNMock() throws Exception { Mockito.anyInt(), Mockito.anyInt(), Mockito.anyInt()); - + mockHaStatuses[nnIdx] = new NNHAStatusHeartbeat(State.STANDBY, 0); return mock; } + /** + * Mock answer for heartbeats which returns an empty set of commands + * and the HA status for the chosen NN from the + * {@link TestBPOfferService#mockHaStatuses} array. + */ + private class HeartbeatAnswer implements Answer { + private final int nnIdx; + + public HeartbeatAnswer(int nnIdx) { + this.nnIdx = nnIdx; + } + + @Override + public HeartbeatResponse answer(InvocationOnMock invocation) throws Throwable { + heartbeatCounts[nnIdx]++; + return new HeartbeatResponse(new DatanodeCommand[0], + mockHaStatuses[nnIdx]); + } + } + + /** * Test that the BPOS can register to talk to two different NNs, * sends block reports to both, etc. @@ -204,6 +236,53 @@ public void testNNsFromDifferentClusters() throws Exception { bpos.stop(); } } + + /** + * Test that the DataNode determines the active NameNode correctly + * based on the HA-related information in heartbeat responses. + * See HDFS-2627. + */ + @Test + public void testPickActiveNameNode() throws Exception { + BPOfferService bpos = setupBPOSForNNs(mockNN1, mockNN2); + bpos.start(); + try { + waitForInitialization(bpos); + + // Should start with neither NN as active. + assertNull(bpos.getActiveNN()); + + // Have NN1 claim active at txid 1 + mockHaStatuses[0] = new NNHAStatusHeartbeat(State.ACTIVE, 1); + waitForHeartbeats(bpos); + assertSame(mockNN1, bpos.getActiveNN()); + + // NN2 claims active at a higher txid + mockHaStatuses[1] = new NNHAStatusHeartbeat(State.ACTIVE, 2); + waitForHeartbeats(bpos); + assertSame(mockNN2, bpos.getActiveNN()); + + // Even after another heartbeat from the first NN, it should + // think NN2 is active, since it claimed a higher txid + waitForHeartbeats(bpos); + assertSame(mockNN2, bpos.getActiveNN()); + + // Even if NN2 goes to standby, DN shouldn't reset to talking to NN1, + // because NN1's txid is lower than the last active txid. Instead, + // it should consider neither active. + mockHaStatuses[1] = new NNHAStatusHeartbeat(State.STANDBY, 2); + waitForHeartbeats(bpos); + assertNull(bpos.getActiveNN()); + + // Now if NN1 goes back to a higher txid, it should be considered active + mockHaStatuses[0] = new NNHAStatusHeartbeat(State.ACTIVE, 3); + waitForHeartbeats(bpos); + assertSame(mockNN1, bpos.getActiveNN()); + + } finally { + bpos.stop(); + } + } private void waitForOneToFail(final BPOfferService bpos) throws Exception { @@ -269,6 +348,30 @@ public Boolean get() { }, 500, 10000); } + private void waitForHeartbeats(BPOfferService bpos) + throws Exception { + final int countAtStart[]; + synchronized (heartbeatCounts) { + countAtStart = Arrays.copyOf( + heartbeatCounts, heartbeatCounts.length); + } + bpos.triggerHeartbeatForTests(); + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + synchronized (heartbeatCounts) { + for (int i = 0; i < countAtStart.length; i++) { + if (heartbeatCounts[i] <= countAtStart[i]) { + return false; + } + } + return true; + } + } + }, 200, 10000); + } + + private ReceivedDeletedBlockInfo[] waitForBlockReceived( ExtendedBlock fakeBlock, DatanodeProtocol mockNN) throws Exception {