HDFS-2627. Determine DN's view of which NN is active based on heartbeat responses. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1211735 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2011-12-08 02:00:20 +00:00
parent f39aac60e0
commit 6016e95fee
10 changed files with 434 additions and 30 deletions

View File

@ -45,3 +45,5 @@ HDFS-2626. BPOfferService.verifyAndSetNamespaceInfo needs to be synchronized (to
HDFS-2624. ConfiguredFailoverProxyProvider doesn't correctly stop ProtocolTranslators (todd)
HDFS-2625. TestDfsOverAvroRpc failing after introduction of HeartbeatResponse type (todd)
HDFS-2627. Determine DN's view of which NN is active based on heartbeat responses (todd)

View File

@ -37,14 +37,15 @@
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.FinalizeCommand;
import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
import org.apache.hadoop.ipc.RPC;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
/**
@ -75,10 +76,31 @@ class BPOfferService {
UpgradeManagerDatanode upgradeManager = null;
private final DataNode dn;
private BPServiceActor bpServiceToActive;
/**
* A reference to the BPServiceActor associated with the currently
* ACTIVE NN. In the case that all NameNodes are in STANDBY mode,
* this can be null. If non-null, this must always refer to a member
* of the {@link #bpServices} list.
*/
private BPServiceActor bpServiceToActive = null;
/**
* The list of all actors for namenodes in this nameservice, regardless
* of their active or standby states.
*/
private List<BPServiceActor> bpServices =
new CopyOnWriteArrayList<BPServiceActor>();
/**
* Each time we receive a heartbeat from a NN claiming to be ACTIVE,
* we record that NN's most recent transaction ID here, so long as it
* is more recent than the previous value. This allows us to detect
* split-brain scenarios in which a prior NN is still asserting its
* ACTIVE state but with a too-low transaction ID. See HDFS-2627
* for details.
*/
private long lastActiveClaimTxId = -1;
BPOfferService(List<InetSocketAddress> nnAddrs, DataNode dn) {
Preconditions.checkArgument(!nnAddrs.isEmpty(),
"Must pass at least one NN.");
@ -87,10 +109,6 @@ class BPOfferService {
for (InetSocketAddress addr : nnAddrs) {
this.bpServices.add(new BPServiceActor(addr, this));
}
// TODO(HA): currently we just make the first one the initial
// active. In reality it should start in an unknown state and then
// as we figure out which is active, designate one as such.
this.bpServiceToActive = this.bpServices.get(0);
}
void refreshNNList(ArrayList<InetSocketAddress> addrs) throws IOException {
@ -109,19 +127,23 @@ void refreshNNList(ArrayList<InetSocketAddress> addrs) throws IOException {
}
/**
* returns true if BP thread has completed initialization of storage
* and has registered with the corresponding namenode
* @return true if initialized
* @return true if the service has registered with at least one NameNode.
*/
boolean isInitialized() {
// TODO(HA) is this right?
return bpServiceToActive != null && bpServiceToActive.isInitialized();
return bpRegistration != null;
}
/**
* @return true if there is at least one actor thread running which is
* talking to a NameNode.
*/
boolean isAlive() {
// TODO: should || all the bp actors probably?
return bpServiceToActive != null &&
bpServiceToActive.isAlive();
for (BPServiceActor actor : bpServices) {
if (actor.isAlive()) {
return true;
}
}
return false;
}
String getBlockPoolId() {
@ -322,7 +344,7 @@ DatanodeRegistration createRegistration() {
* Called when an actor shuts down. If this is the last actor
* to shut down, shuts down the whole blockpool in the DN.
*/
void shutdownActor(BPServiceActor actor) {
synchronized void shutdownActor(BPServiceActor actor) {
if (bpServiceToActive == actor) {
bpServiceToActive = null;
}
@ -339,7 +361,7 @@ void shutdownActor(BPServiceActor actor) {
}
@Deprecated
InetSocketAddress getNNSocketAddress() {
synchronized InetSocketAddress getNNSocketAddress() {
// TODO(HA) this doesn't make sense anymore
return bpServiceToActive.getNNSocketAddress();
}
@ -383,8 +405,61 @@ void reportRemoteBadBlock(DatanodeInfo dnInfo, ExtendedBlock block) {
* @return a proxy to the active NN
*/
@Deprecated
DatanodeProtocol getActiveNN() {
return bpServiceToActive.bpNamenode;
synchronized DatanodeProtocol getActiveNN() {
if (bpServiceToActive != null) {
return bpServiceToActive.bpNamenode;
} else {
return null;
}
}
/**
* Update the BPOS's view of which NN is active, based on a heartbeat
* response from one of the actors.
*
* @param actor the actor which received the heartbeat
* @param nnHaState the HA-related heartbeat contents
*/
synchronized void updateActorStatesFromHeartbeat(
BPServiceActor actor,
NNHAStatusHeartbeat nnHaState) {
final long txid = nnHaState.getTxId();
final boolean nnClaimsActive =
nnHaState.getState() == NNHAStatusHeartbeat.State.ACTIVE;
final boolean bposThinksActive = bpServiceToActive == actor;
final boolean isMoreRecentClaim = txid > lastActiveClaimTxId;
if (nnClaimsActive && !bposThinksActive) {
LOG.info("Namenode " + actor + " trying to claim ACTIVE state with " +
"txid=" + txid);
if (!isMoreRecentClaim) {
// Split-brain scenario - an NN is trying to claim active
// state when a different NN has already claimed it with a higher
// txid.
LOG.warn("NN " + actor + " tried to claim ACTIVE state at txid=" +
txid + " but there was already a more recent claim at txid=" +
lastActiveClaimTxId);
return;
} else {
if (bpServiceToActive == null) {
LOG.info("Acknowledging ACTIVE Namenode " + actor);
} else {
LOG.info("Namenode " + actor + " taking over ACTIVE state from " +
bpServiceToActive + " at higher txid=" + txid);
}
bpServiceToActive = actor;
}
} else if (!nnClaimsActive && bposThinksActive) {
LOG.info("Namenode " + actor + " relinquishing ACTIVE state with " +
"txid=" + nnHaState.getTxId());
bpServiceToActive = null;
}
if (bpServiceToActive == actor) {
assert txid >= lastActiveClaimTxId;
lastActiveClaimTxId = txid;
}
}
/**
@ -415,7 +490,17 @@ void triggerBlockReportForTests() throws IOException {
}
}
boolean processCommandFromActor(DatanodeCommand cmd,
/**
* Run an immediate heartbeat from all actors. Used by tests.
*/
@VisibleForTesting
void triggerHeartbeatForTests() throws IOException {
for (BPServiceActor actor : bpServices) {
actor.triggerHeartbeatForTests();
}
}
synchronized boolean processCommandFromActor(DatanodeCommand cmd,
BPServiceActor actor) throws IOException {
assert bpServices.contains(actor);
if (actor == bpServiceToActive) {

View File

@ -284,6 +284,14 @@ void triggerBlockReportForTests() throws IOException {
lastBlockReport = 0;
blockReport();
}
@VisibleForTesting
void triggerHeartbeatForTests() throws IOException {
synchronized (receivedAndDeletedBlockList) {
lastHeartbeat = 0;
receivedAndDeletedBlockList.notifyAll();
}
}
/**
* Report the list blocks to the Namenode
@ -420,8 +428,18 @@ private void offerService() throws Exception {
lastHeartbeat = startTime;
if (!dn.areHeartbeatsDisabledForTests()) {
HeartbeatResponse resp = sendHeartBeat();
assert resp != null;
dn.getMetrics().addHeartbeat(now() - startTime);
// If the state of this NN has changed (eg STANDBY->ACTIVE)
// then let the BPOfferService update itself.
//
// Important that this happens before processCommand below,
// since the first heartbeat to a new active might have commands
// that we should actually process.
bpos.updateActorStatesFromHeartbeat(
this, resp.getNameNodeHaState());
long startProcessCommands = now();
if (!processCommand(resp.getCommands()))
continue;

View File

@ -150,11 +150,16 @@
import org.apache.hadoop.hdfs.server.namenode.PendingDataNodeMessages.BlockReportMessage;
import org.apache.hadoop.hdfs.server.namenode.PendingDataNodeMessages.CommitBlockSynchronizationMessage;
import org.apache.hadoop.hdfs.server.namenode.PendingDataNodeMessages.DataNodeMessage;
import org.apache.hadoop.hdfs.server.namenode.ha.ActiveState;
import org.apache.hadoop.hdfs.server.namenode.ha.EditLogTailer;
import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
import org.apache.hadoop.hdfs.server.namenode.ha.HAState;
import org.apache.hadoop.hdfs.server.namenode.ha.StandbyState;
import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMBean;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
@ -308,6 +313,12 @@ private static final void logAuditEvent(UserGroupInformation ugi,
* Used when this NN is in standby state to read from the shared edit log.
*/
private EditLogTailer editLogTailer = null;
/**
* Reference to the NN's HAContext object. This is only set once
* {@link #startCommonServices(Configuration, HAContext)} is called.
*/
private HAContext haContext;
PendingDataNodeMessages getPendingDataNodeMessages() {
return pendingDatanodeMessages;
@ -434,11 +445,13 @@ void stopSecretManager() {
/**
* Start services common to both active and standby states
* @param haContext
* @throws IOException
*/
void startCommonServices(Configuration conf) throws IOException {
void startCommonServices(Configuration conf, HAContext haContext) throws IOException {
this.registerMBean(); // register the MBean for the FSNamesystemState
writeLock();
this.haContext = haContext;
try {
nnResourceChecker = new NameNodeResourceChecker(conf);
checkAvailableResources();
@ -2706,12 +2719,28 @@ HeartbeatResponse handleHeartbeat(DatanodeRegistration nodeReg,
cmds = new DatanodeCommand[] {cmd};
}
}
return new HeartbeatResponse(cmds);
return new HeartbeatResponse(cmds, createHaStatusHeartbeat());
} finally {
readUnlock();
}
}
private NNHAStatusHeartbeat createHaStatusHeartbeat() {
HAState state = haContext.getState();
NNHAStatusHeartbeat.State hbState;
if (state instanceof ActiveState) {
hbState = NNHAStatusHeartbeat.State.ACTIVE;
} else if (state instanceof StandbyState) {
hbState = NNHAStatusHeartbeat.State.STANDBY;
} else {
throw new AssertionError("Invalid state: " + state.getClass());
}
return new NNHAStatusHeartbeat(hbState,
Math.max(getFSImage().getLastAppliedTxId(),
getFSImage().getEditLog().getLastWrittenTxId()));
}
/**
* Returns whether or not there were available resources at the last check of
* resources.

View File

@ -426,7 +426,7 @@ protected void validateConfigurationSettings(final Configuration conf)
/** Start the services common to active and standby states */
private void startCommonServices(Configuration conf) throws IOException {
namesystem.startCommonServices(conf);
namesystem.startCommonServices(conf, haContext);
startHttpServer(conf);
rpcServer.start();
plugins = conf.getInstances(DFS_NAMENODE_PLUGINS_KEY,

View File

@ -35,17 +35,26 @@ public class HeartbeatResponse implements Writable {
/** Commands returned from the namenode to the datanode */
private DatanodeCommand[] commands;
/** Information about the current HA-related state of the NN */
private NNHAStatusHeartbeat haStatus;
public HeartbeatResponse() {
// Empty constructor required for Writable
}
public HeartbeatResponse(DatanodeCommand[] cmds) {
public HeartbeatResponse(DatanodeCommand[] cmds,
NNHAStatusHeartbeat haStatus) {
commands = cmds;
this.haStatus = haStatus;
}
public DatanodeCommand[] getCommands() {
return commands;
}
public NNHAStatusHeartbeat getNameNodeHaState() {
return haStatus;
}
///////////////////////////////////////////
// Writable
@ -58,6 +67,7 @@ public void write(DataOutput out) throws IOException {
ObjectWritable.writeObject(out, commands[i], commands[i].getClass(),
null, true);
}
haStatus.write(out);
}
@Override
@ -69,5 +79,7 @@ public void readFields(DataInput in) throws IOException {
commands[i] = (DatanodeCommand) ObjectWritable.readObject(in,
objectWritable, null);
}
haStatus = new NNHAStatusHeartbeat();
haStatus.readFields(in);
}
}

View File

@ -0,0 +1,73 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.protocol;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class NNHAStatusHeartbeat implements Writable {
private State state;
private long txid = HdfsConstants.INVALID_TXID;
public NNHAStatusHeartbeat() {
}
public NNHAStatusHeartbeat(State state, long txid) {
this.state = state;
this.txid = txid;
}
public State getState() {
return state;
}
public long getTxId() {
return txid;
}
///////////////////////////////////////////
// Writable
///////////////////////////////////////////
@Override
public void write(DataOutput out) throws IOException {
WritableUtils.writeEnum(out, state);
out.writeLong(txid);
}
@Override
public void readFields(DataInput in) throws IOException {
state = WritableUtils.readEnum(in, State.class);
txid = in.readLong();
}
@InterfaceAudience.Private
public enum State {
ACTIVE,
STANDBY;
}
}

View File

@ -31,6 +31,7 @@
@InterfaceStability.Evolving
public class HeartbeatResponseWritable implements Writable {
private DatanodeCommandWritable[] commands;
private NNHAStatusHeartbeatWritable haStatus;
public HeartbeatResponseWritable() {
// Empty constructor for Writable
@ -41,7 +42,8 @@ public HeartbeatResponseWritable(DatanodeCommandWritable[] cmds) {
}
public HeartbeatResponse convert() {
return new HeartbeatResponse(DatanodeCommandWritable.convert(commands));
return new HeartbeatResponse(DatanodeCommandWritable.convert(commands),
NNHAStatusHeartbeatWritable.convert(haStatus));
}
///////////////////////////////////////////
@ -55,6 +57,7 @@ public void write(DataOutput out) throws IOException {
ObjectWritable.writeObject(out, commands[i], commands[i].getClass(),
null, true);
}
haStatus.write(out);
}
@Override
@ -66,6 +69,8 @@ public void readFields(DataInput in) throws IOException {
commands[i] = (DatanodeCommandWritable) ObjectWritable.readObject(in,
objectWritable, null);
}
haStatus = new NNHAStatusHeartbeatWritable();
haStatus.readFields(in);
}
public static HeartbeatResponseWritable convert(
@ -73,4 +78,4 @@ public static HeartbeatResponseWritable convert(
return new HeartbeatResponseWritable(DatanodeCommandWritable.convert(resp
.getCommands()));
}
}
}

View File

@ -0,0 +1,77 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.protocolR23Compatible;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat.State;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
@InterfaceAudience.Private
@InterfaceStability.Evolving
/**
* Response to {@link DatanodeProtocol#sendHeartbeat}
*/
public class NNHAStatusHeartbeatWritable implements Writable {
private State state;
private long txid = HdfsConstants.INVALID_TXID;
public NNHAStatusHeartbeatWritable() {
}
public NNHAStatusHeartbeatWritable(State state, long txid) {
this.state = state;
this.txid = txid;
}
public State getState() {
return state;
}
public long getTxId() {
return txid;
}
///////////////////////////////////////////
// Writable
///////////////////////////////////////////
@Override
public void write(DataOutput out) throws IOException {
WritableUtils.writeEnum(out, state);
out.writeLong(txid);
}
@Override
public void readFields(DataInput in) throws IOException {
state = WritableUtils.readEnum(in, State.class);
txid = in.readLong();
}
public static NNHAStatusHeartbeat convert(
NNHAStatusHeartbeatWritable haStatus) {
return new NNHAStatusHeartbeat(haStatus.getState(), haStatus.getTxId());
}
}

View File

@ -21,6 +21,7 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Map;
import org.apache.commons.logging.Log;
@ -32,9 +33,12 @@
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat.State;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.test.GenericTestUtils;
@ -43,6 +47,8 @@
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
@ -63,13 +69,15 @@ public class TestBPOfferService {
private DatanodeProtocol mockNN1;
private DatanodeProtocol mockNN2;
private NNHAStatusHeartbeat[] mockHaStatuses = new NNHAStatusHeartbeat[2];
private int heartbeatCounts[] = new int[2];
private DataNode mockDn;
private FSDatasetInterface mockFSDataset;
@Before
public void setupMocks() throws Exception {
mockNN1 = setupNNMock();
mockNN2 = setupNNMock();
mockNN1 = setupNNMock(0);
mockNN2 = setupNNMock(1);
// Set up a mock DN with the bare-bones configuration
// objects, etc.
@ -92,14 +100,17 @@ public void setupMocks() throws Exception {
/**
* Set up a mock NN with the bare minimum for a DN to register to it.
*/
private DatanodeProtocol setupNNMock() throws Exception {
private DatanodeProtocol setupNNMock(int nnIdx) throws Exception {
DatanodeProtocol mock = Mockito.mock(DatanodeProtocol.class);
Mockito.doReturn(
new NamespaceInfo(1, FAKE_CLUSTERID, FAKE_BPID,
0, HdfsConstants.LAYOUT_VERSION))
.when(mock).versionRequest();
Mockito.doReturn(new HeartbeatResponse(null))
Mockito.doReturn(new DatanodeRegistration("fake-node"))
.when(mock).registerDatanode(Mockito.any(DatanodeRegistration.class));
Mockito.doAnswer(new HeartbeatAnswer(nnIdx))
.when(mock).sendHeartbeat(
Mockito.any(DatanodeRegistration.class),
Mockito.anyLong(),
@ -109,10 +120,31 @@ private DatanodeProtocol setupNNMock() throws Exception {
Mockito.anyInt(),
Mockito.anyInt(),
Mockito.anyInt());
mockHaStatuses[nnIdx] = new NNHAStatusHeartbeat(State.STANDBY, 0);
return mock;
}
/**
* Mock answer for heartbeats which returns an empty set of commands
* and the HA status for the chosen NN from the
* {@link TestBPOfferService#mockHaStatuses} array.
*/
private class HeartbeatAnswer implements Answer<HeartbeatResponse> {
private final int nnIdx;
public HeartbeatAnswer(int nnIdx) {
this.nnIdx = nnIdx;
}
@Override
public HeartbeatResponse answer(InvocationOnMock invocation) throws Throwable {
heartbeatCounts[nnIdx]++;
return new HeartbeatResponse(new DatanodeCommand[0],
mockHaStatuses[nnIdx]);
}
}
/**
* Test that the BPOS can register to talk to two different NNs,
* sends block reports to both, etc.
@ -204,6 +236,53 @@ public void testNNsFromDifferentClusters() throws Exception {
bpos.stop();
}
}
/**
* Test that the DataNode determines the active NameNode correctly
* based on the HA-related information in heartbeat responses.
* See HDFS-2627.
*/
@Test
public void testPickActiveNameNode() throws Exception {
BPOfferService bpos = setupBPOSForNNs(mockNN1, mockNN2);
bpos.start();
try {
waitForInitialization(bpos);
// Should start with neither NN as active.
assertNull(bpos.getActiveNN());
// Have NN1 claim active at txid 1
mockHaStatuses[0] = new NNHAStatusHeartbeat(State.ACTIVE, 1);
waitForHeartbeats(bpos);
assertSame(mockNN1, bpos.getActiveNN());
// NN2 claims active at a higher txid
mockHaStatuses[1] = new NNHAStatusHeartbeat(State.ACTIVE, 2);
waitForHeartbeats(bpos);
assertSame(mockNN2, bpos.getActiveNN());
// Even after another heartbeat from the first NN, it should
// think NN2 is active, since it claimed a higher txid
waitForHeartbeats(bpos);
assertSame(mockNN2, bpos.getActiveNN());
// Even if NN2 goes to standby, DN shouldn't reset to talking to NN1,
// because NN1's txid is lower than the last active txid. Instead,
// it should consider neither active.
mockHaStatuses[1] = new NNHAStatusHeartbeat(State.STANDBY, 2);
waitForHeartbeats(bpos);
assertNull(bpos.getActiveNN());
// Now if NN1 goes back to a higher txid, it should be considered active
mockHaStatuses[0] = new NNHAStatusHeartbeat(State.ACTIVE, 3);
waitForHeartbeats(bpos);
assertSame(mockNN1, bpos.getActiveNN());
} finally {
bpos.stop();
}
}
private void waitForOneToFail(final BPOfferService bpos)
throws Exception {
@ -269,6 +348,30 @@ public Boolean get() {
}, 500, 10000);
}
private void waitForHeartbeats(BPOfferService bpos)
throws Exception {
final int countAtStart[];
synchronized (heartbeatCounts) {
countAtStart = Arrays.copyOf(
heartbeatCounts, heartbeatCounts.length);
}
bpos.triggerHeartbeatForTests();
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
synchronized (heartbeatCounts) {
for (int i = 0; i < countAtStart.length; i++) {
if (heartbeatCounts[i] <= countAtStart[i]) {
return false;
}
}
return true;
}
}
}, 200, 10000);
}
private ReceivedDeletedBlockInfo[] waitForBlockReceived(
ExtendedBlock fakeBlock,
DatanodeProtocol mockNN) throws Exception {