HDFS-16831. [RBF SBN] GetNamenodesForNameserviceId should shuffle Observer NameNodes every time (#5098)
This commit is contained in:
parent
17035da46e
commit
df093ef9af
@ -193,13 +193,53 @@ private void updateNameNodeState(final String nsId,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Try to shuffle the multiple observer namenodes if listObserversFirst is true.
|
||||||
|
* @param inputNameNodes the input FederationNamenodeContext list. If listObserversFirst is true,
|
||||||
|
* all observers will be placed at the front of the collection.
|
||||||
|
* @param listObserversFirst true if we need to shuffle the multiple front observer namenodes.
|
||||||
|
* @return a list of FederationNamenodeContext.
|
||||||
|
* @param <T> a subclass of FederationNamenodeContext.
|
||||||
|
*/
|
||||||
|
private <T extends FederationNamenodeContext> List<T> shuffleObserverNN(
|
||||||
|
List<T> inputNameNodes, boolean listObserversFirst) {
|
||||||
|
if (!listObserversFirst) {
|
||||||
|
return inputNameNodes;
|
||||||
|
}
|
||||||
|
// Get Observers first.
|
||||||
|
List<T> observerList = new ArrayList<>();
|
||||||
|
for (T t : inputNameNodes) {
|
||||||
|
if (t.getState() == OBSERVER) {
|
||||||
|
observerList.add(t);
|
||||||
|
} else {
|
||||||
|
// The inputNameNodes are already sorted, so it can break
|
||||||
|
// when the first non-observer is encountered.
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Returns the inputNameNodes if no shuffle is required
|
||||||
|
if (observerList.size() <= 1) {
|
||||||
|
return inputNameNodes;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Shuffle multiple Observers
|
||||||
|
Collections.shuffle(observerList);
|
||||||
|
|
||||||
|
List<T> ret = new ArrayList<>(inputNameNodes.size());
|
||||||
|
ret.addAll(observerList);
|
||||||
|
for (int i = observerList.size(); i < inputNameNodes.size(); i++) {
|
||||||
|
ret.add(inputNameNodes.get(i));
|
||||||
|
}
|
||||||
|
return Collections.unmodifiableList(ret);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<? extends FederationNamenodeContext> getNamenodesForNameserviceId(
|
public List<? extends FederationNamenodeContext> getNamenodesForNameserviceId(
|
||||||
final String nsId, boolean listObserversFirst) throws IOException {
|
final String nsId, boolean listObserversFirst) throws IOException {
|
||||||
|
|
||||||
List<? extends FederationNamenodeContext> ret = cacheNS.get(Pair.of(nsId, listObserversFirst));
|
List<? extends FederationNamenodeContext> ret = cacheNS.get(Pair.of(nsId, listObserversFirst));
|
||||||
if (ret != null) {
|
if (ret != null) {
|
||||||
return ret;
|
return shuffleObserverNN(ret, listObserversFirst);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Not cached, generate the value
|
// Not cached, generate the value
|
||||||
|
@ -90,6 +90,98 @@ public void setup() throws IOException, InterruptedException {
|
|||||||
assertTrue(cleared);
|
assertTrue(cleared);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testShuffleObserverNNs() throws Exception {
|
||||||
|
// Add an active entry to the store
|
||||||
|
NamenodeStatusReport activeReport = createNamenodeReport(
|
||||||
|
NAMESERVICES[0], NAMENODES[0], HAServiceState.ACTIVE);
|
||||||
|
assertTrue(namenodeResolver.registerNamenode(activeReport));
|
||||||
|
|
||||||
|
// Add a standby entry to the store
|
||||||
|
NamenodeStatusReport standbyReport = createNamenodeReport(
|
||||||
|
NAMESERVICES[0], NAMENODES[1], HAServiceState.STANDBY);
|
||||||
|
assertTrue(namenodeResolver.registerNamenode(standbyReport));
|
||||||
|
|
||||||
|
// Load cache
|
||||||
|
stateStore.refreshCaches(true);
|
||||||
|
|
||||||
|
// Get namenodes from state store.
|
||||||
|
List<? extends FederationNamenodeContext> withoutObserver =
|
||||||
|
namenodeResolver.getNamenodesForNameserviceId(NAMESERVICES[0], true);
|
||||||
|
assertEquals(2, withoutObserver.size());
|
||||||
|
assertEquals(FederationNamenodeServiceState.ACTIVE, withoutObserver.get(0).getState());
|
||||||
|
assertEquals(FederationNamenodeServiceState.STANDBY, withoutObserver.get(1).getState());
|
||||||
|
|
||||||
|
// Get namenodes from cache.
|
||||||
|
withoutObserver = namenodeResolver.getNamenodesForNameserviceId(NAMESERVICES[0], true);
|
||||||
|
assertEquals(2, withoutObserver.size());
|
||||||
|
assertEquals(FederationNamenodeServiceState.ACTIVE, withoutObserver.get(0).getState());
|
||||||
|
assertEquals(FederationNamenodeServiceState.STANDBY, withoutObserver.get(1).getState());
|
||||||
|
|
||||||
|
// Add an observer entry to the store
|
||||||
|
NamenodeStatusReport observerReport1 = createNamenodeReport(
|
||||||
|
NAMESERVICES[0], NAMENODES[2], HAServiceState.OBSERVER);
|
||||||
|
assertTrue(namenodeResolver.registerNamenode(observerReport1));
|
||||||
|
|
||||||
|
// Load cache
|
||||||
|
stateStore.refreshCaches(true);
|
||||||
|
|
||||||
|
// Get namenodes from state store.
|
||||||
|
List<? extends FederationNamenodeContext> observerList =
|
||||||
|
namenodeResolver.getNamenodesForNameserviceId(NAMESERVICES[0], true);
|
||||||
|
assertEquals(3, observerList.size());
|
||||||
|
assertEquals(FederationNamenodeServiceState.OBSERVER, observerList.get(0).getState());
|
||||||
|
assertEquals(FederationNamenodeServiceState.ACTIVE, observerList.get(1).getState());
|
||||||
|
assertEquals(FederationNamenodeServiceState.STANDBY, observerList.get(2).getState());
|
||||||
|
|
||||||
|
// Get namenodes from cache.
|
||||||
|
observerList = namenodeResolver.getNamenodesForNameserviceId(NAMESERVICES[0], true);
|
||||||
|
assertEquals(3, observerList.size());
|
||||||
|
assertEquals(FederationNamenodeServiceState.OBSERVER, observerList.get(0).getState());
|
||||||
|
assertEquals(FederationNamenodeServiceState.ACTIVE, observerList.get(1).getState());
|
||||||
|
assertEquals(FederationNamenodeServiceState.STANDBY, observerList.get(2).getState());
|
||||||
|
|
||||||
|
// Add one new observer entry to the store
|
||||||
|
NamenodeStatusReport observerReport2 = createNamenodeReport(
|
||||||
|
NAMESERVICES[0], NAMENODES[3], HAServiceState.OBSERVER);
|
||||||
|
assertTrue(namenodeResolver.registerNamenode(observerReport2));
|
||||||
|
|
||||||
|
// Load cache
|
||||||
|
stateStore.refreshCaches(true);
|
||||||
|
|
||||||
|
// Get namenodes from state store.
|
||||||
|
List<? extends FederationNamenodeContext> observerList2 =
|
||||||
|
namenodeResolver.getNamenodesForNameserviceId(NAMESERVICES[0], true);
|
||||||
|
assertEquals(4, observerList2.size());
|
||||||
|
assertEquals(FederationNamenodeServiceState.OBSERVER, observerList2.get(0).getState());
|
||||||
|
assertEquals(FederationNamenodeServiceState.OBSERVER, observerList2.get(1).getState());
|
||||||
|
assertEquals(FederationNamenodeServiceState.ACTIVE, observerList2.get(2).getState());
|
||||||
|
assertEquals(FederationNamenodeServiceState.STANDBY, observerList2.get(3).getState());
|
||||||
|
|
||||||
|
// Get namenodes from cache.
|
||||||
|
observerList2 = namenodeResolver.getNamenodesForNameserviceId(NAMESERVICES[0], true);
|
||||||
|
assertEquals(4, observerList2.size());
|
||||||
|
assertEquals(FederationNamenodeServiceState.OBSERVER, observerList2.get(0).getState());
|
||||||
|
assertEquals(FederationNamenodeServiceState.OBSERVER, observerList2.get(1).getState());
|
||||||
|
assertEquals(FederationNamenodeServiceState.ACTIVE, observerList2.get(2).getState());
|
||||||
|
assertEquals(FederationNamenodeServiceState.STANDBY, observerList2.get(3).getState());
|
||||||
|
|
||||||
|
// Test shuffler
|
||||||
|
List<? extends FederationNamenodeContext> observerList3;
|
||||||
|
boolean hit = false;
|
||||||
|
for (int i = 0; i < 1000; i++) {
|
||||||
|
observerList3 = namenodeResolver.getNamenodesForNameserviceId(NAMESERVICES[0], true);
|
||||||
|
assertEquals(FederationNamenodeServiceState.OBSERVER, observerList3.get(0).getState());
|
||||||
|
assertEquals(FederationNamenodeServiceState.OBSERVER, observerList3.get(1).getState());
|
||||||
|
if (observerList3.get(0).getNamenodeId().equals(observerList2.get(1).getNamenodeId()) &&
|
||||||
|
observerList3.get(1).getNamenodeId().equals(observerList2.get(0).getNamenodeId())) {
|
||||||
|
hit = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assertTrue(hit);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testStateStoreDisconnected() throws Exception {
|
public void testStateStoreDisconnected() throws Exception {
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user