diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java index 862f851fb5..db1dcdf181 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java @@ -193,13 +193,53 @@ private void updateNameNodeState(final String nsId, } } + /** + * Try to shuffle the multiple observer namenodes if listObserversFirst is true. + * @param inputNameNodes the input FederationNamenodeContext list. If listObserversFirst is true, + * all observers will be placed at the front of the collection. + * @param listObserversFirst true if we need to shuffle the multiple front observer namenodes. + * @return a list of FederationNamenodeContext. + * @param a subclass of FederationNamenodeContext. + */ + private List shuffleObserverNN( + List inputNameNodes, boolean listObserversFirst) { + if (!listObserversFirst) { + return inputNameNodes; + } + // Get Observers first. + List observerList = new ArrayList<>(); + for (T t : inputNameNodes) { + if (t.getState() == OBSERVER) { + observerList.add(t); + } else { + // The inputNameNodes are already sorted, so it can break + // when the first non-observer is encountered. + break; + } + } + // Returns the inputNameNodes if no shuffle is required + if (observerList.size() <= 1) { + return inputNameNodes; + } + + // Shuffle multiple Observers + Collections.shuffle(observerList); + + List ret = new ArrayList<>(inputNameNodes.size()); + ret.addAll(observerList); + for (int i = observerList.size(); i < inputNameNodes.size(); i++) { + ret.add(inputNameNodes.get(i)); + } + return Collections.unmodifiableList(ret); + } + @Override public List getNamenodesForNameserviceId( final String nsId, boolean listObserversFirst) throws IOException { List ret = cacheNS.get(Pair.of(nsId, listObserversFirst)); if (ret != null) { - return ret; + return shuffleObserverNN(ret, listObserversFirst); } // Not cached, generate the value diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestNamenodeResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestNamenodeResolver.java index b602a27c95..05d21b9b27 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestNamenodeResolver.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestNamenodeResolver.java @@ -90,6 +90,98 @@ public void setup() throws IOException, InterruptedException { assertTrue(cleared); } + @Test + public void testShuffleObserverNNs() throws Exception { + // Add an active entry to the store + NamenodeStatusReport activeReport = createNamenodeReport( + NAMESERVICES[0], NAMENODES[0], HAServiceState.ACTIVE); + assertTrue(namenodeResolver.registerNamenode(activeReport)); + + // Add a standby entry to the store + NamenodeStatusReport standbyReport = createNamenodeReport( + NAMESERVICES[0], NAMENODES[1], HAServiceState.STANDBY); + assertTrue(namenodeResolver.registerNamenode(standbyReport)); + + // Load cache + stateStore.refreshCaches(true); + + // Get namenodes from state store. + List withoutObserver = + namenodeResolver.getNamenodesForNameserviceId(NAMESERVICES[0], true); + assertEquals(2, withoutObserver.size()); + assertEquals(FederationNamenodeServiceState.ACTIVE, withoutObserver.get(0).getState()); + assertEquals(FederationNamenodeServiceState.STANDBY, withoutObserver.get(1).getState()); + + // Get namenodes from cache. + withoutObserver = namenodeResolver.getNamenodesForNameserviceId(NAMESERVICES[0], true); + assertEquals(2, withoutObserver.size()); + assertEquals(FederationNamenodeServiceState.ACTIVE, withoutObserver.get(0).getState()); + assertEquals(FederationNamenodeServiceState.STANDBY, withoutObserver.get(1).getState()); + + // Add an observer entry to the store + NamenodeStatusReport observerReport1 = createNamenodeReport( + NAMESERVICES[0], NAMENODES[2], HAServiceState.OBSERVER); + assertTrue(namenodeResolver.registerNamenode(observerReport1)); + + // Load cache + stateStore.refreshCaches(true); + + // Get namenodes from state store. + List observerList = + namenodeResolver.getNamenodesForNameserviceId(NAMESERVICES[0], true); + assertEquals(3, observerList.size()); + assertEquals(FederationNamenodeServiceState.OBSERVER, observerList.get(0).getState()); + assertEquals(FederationNamenodeServiceState.ACTIVE, observerList.get(1).getState()); + assertEquals(FederationNamenodeServiceState.STANDBY, observerList.get(2).getState()); + + // Get namenodes from cache. + observerList = namenodeResolver.getNamenodesForNameserviceId(NAMESERVICES[0], true); + assertEquals(3, observerList.size()); + assertEquals(FederationNamenodeServiceState.OBSERVER, observerList.get(0).getState()); + assertEquals(FederationNamenodeServiceState.ACTIVE, observerList.get(1).getState()); + assertEquals(FederationNamenodeServiceState.STANDBY, observerList.get(2).getState()); + + // Add one new observer entry to the store + NamenodeStatusReport observerReport2 = createNamenodeReport( + NAMESERVICES[0], NAMENODES[3], HAServiceState.OBSERVER); + assertTrue(namenodeResolver.registerNamenode(observerReport2)); + + // Load cache + stateStore.refreshCaches(true); + + // Get namenodes from state store. + List observerList2 = + namenodeResolver.getNamenodesForNameserviceId(NAMESERVICES[0], true); + assertEquals(4, observerList2.size()); + assertEquals(FederationNamenodeServiceState.OBSERVER, observerList2.get(0).getState()); + assertEquals(FederationNamenodeServiceState.OBSERVER, observerList2.get(1).getState()); + assertEquals(FederationNamenodeServiceState.ACTIVE, observerList2.get(2).getState()); + assertEquals(FederationNamenodeServiceState.STANDBY, observerList2.get(3).getState()); + + // Get namenodes from cache. + observerList2 = namenodeResolver.getNamenodesForNameserviceId(NAMESERVICES[0], true); + assertEquals(4, observerList2.size()); + assertEquals(FederationNamenodeServiceState.OBSERVER, observerList2.get(0).getState()); + assertEquals(FederationNamenodeServiceState.OBSERVER, observerList2.get(1).getState()); + assertEquals(FederationNamenodeServiceState.ACTIVE, observerList2.get(2).getState()); + assertEquals(FederationNamenodeServiceState.STANDBY, observerList2.get(3).getState()); + + // Test shuffler + List observerList3; + boolean hit = false; + for (int i = 0; i < 1000; i++) { + observerList3 = namenodeResolver.getNamenodesForNameserviceId(NAMESERVICES[0], true); + assertEquals(FederationNamenodeServiceState.OBSERVER, observerList3.get(0).getState()); + assertEquals(FederationNamenodeServiceState.OBSERVER, observerList3.get(1).getState()); + if (observerList3.get(0).getNamenodeId().equals(observerList2.get(1).getNamenodeId()) && + observerList3.get(1).getNamenodeId().equals(observerList2.get(0).getNamenodeId())) { + hit = true; + break; + } + } + assertTrue(hit); + } + @Test public void testStateStoreDisconnected() throws Exception {