From 881034ad452781399f1be6046c2b38a2f2f524d1 Mon Sep 17 00:00:00 2001 From: dannytbecker <43830149+dannytbecker@users.noreply.github.com> Date: Wed, 1 May 2024 13:56:53 -0700 Subject: [PATCH] CachedRecordStore should check if the record state is expired (#6783) --- .../federation/store/CachedRecordStore.java | 2 +- .../store/TestStateStoreMembershipState.java | 101 ++++++++++++++++++ 2 files changed, 102 insertions(+), 1 deletion(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java index 08dcc1c6e4..59da614535 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java @@ -189,7 +189,7 @@ public void overrideExpiredRecords(QueryResult query) throws IOException { LOG.warn("Couldn't delete State Store record {}: {}", recordName, record); } - } else if (record.checkExpired(currentDriverTime)) { + } else if (!record.isExpired() && record.checkExpired(currentDriverTime)) { String recordName = StateStoreUtils.getRecordName(record.getClass()); LOG.info("Override State Store record {}: {}", recordName, record); commitRecords.add(record); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreMembershipState.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreMembershipState.java index f7f0970bd3..9e3eb97853 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreMembershipState.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreMembershipState.java @@ -29,11 +29,18 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.spy; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -49,16 +56,22 @@ import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateNamenodeRegistrationRequest; import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.test.GenericTestUtils.DelayAnswer; import org.apache.hadoop.util.Time; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Test the basic {@link MembershipStore} membership functionality. */ public class TestStateStoreMembershipState extends TestStateStoreBase { + private static Logger LOG = LoggerFactory.getLogger( + TestStateStoreMembershipState.class); + private static MembershipStore membershipStore; @BeforeClass @@ -529,6 +542,94 @@ public void testRegistrationExpiredAndDeletion() }, 100, 3000); } + @Test + public void testRegistrationExpiredRaceCondition() + throws InterruptedException, IOException, TimeoutException, ExecutionException { + + // Populate the state store with a single NN element + // 1) ns0:nn0 - Expired + // Create a thread to refresh the cached records, pulling the expired record + // into the thread's memory + // Then insert an active record, and confirm that the refresh thread does not + // override the active record with the expired record it has in memory + + MembershipState.setDeletionMs(-1); + + MembershipState expiredReport = createRegistration( + NAMESERVICES[0], NAMENODES[0], ROUTERS[0], + FederationNamenodeServiceState.ACTIVE); + expiredReport.setDateModified(Time.monotonicNow() - 5000); + expiredReport.setState(FederationNamenodeServiceState.EXPIRED); + assertTrue(namenodeHeartbeat(expiredReport)); + + // Load cache + MembershipStore memStoreSpy = spy(membershipStore); + DelayAnswer delayer = new DelayAnswer(LOG); + doAnswer(delayer).when(memStoreSpy).overrideExpiredRecords(any()); + + ExecutorService pool = Executors.newFixedThreadPool(1); + + Future cacheRefreshFuture = pool.submit(() -> { + try { + return memStoreSpy.loadCache(true); + } catch (IOException e) { + LOG.error("Exception while loading cache:", e); + } + return false; + }); + + // Verify quorum and entry + MembershipState quorumEntry = getNamenodeRegistration( + expiredReport.getNameserviceId(), expiredReport.getNamenodeId()); + assertNull(quorumEntry); + + + MembershipState record = membershipStore.getDriver() + .get(MembershipState.class).getRecords().get(0); + assertNotNull(record); + assertEquals(ROUTERS[0], record.getRouterId()); + assertEquals(FederationNamenodeServiceState.EXPIRED, + record.getState()); + + // Insert active while the other thread refreshing it's cache + MembershipState activeReport = createRegistration( + NAMESERVICES[0], NAMENODES[0], ROUTERS[0], + FederationNamenodeServiceState.ACTIVE); + + delayer.waitForCall(); + assertTrue(namenodeHeartbeat(activeReport)); + + record = membershipStore.getDriver() + .get(MembershipState.class).getRecords().get(0); + assertNotNull(record); + assertEquals(ROUTERS[0], record.getRouterId()); + assertEquals(FederationNamenodeServiceState.ACTIVE, + record.getState()); + + quorumEntry = getExpiredNamenodeRegistration( + expiredReport.getNameserviceId(), expiredReport.getNamenodeId()); + assertNull(quorumEntry); + + // Allow the thread to finish refreshing the cache + delayer.proceed(); + assertTrue(cacheRefreshFuture.get(5, TimeUnit.SECONDS)); + + // The state store should still be the active report + record = membershipStore.getDriver() + .get(MembershipState.class).getRecords().get(0); + assertNotNull(record); + assertEquals(ROUTERS[0], record.getRouterId()); + assertEquals(FederationNamenodeServiceState.ACTIVE, + record.getState()); + + membershipStore.loadCache(true); + + quorumEntry = getExpiredNamenodeRegistration( + expiredReport.getNameserviceId(), + expiredReport.getNamenodeId()); + assertNull(quorumEntry); + } + @Test public void testNamespaceInfoWithUnavailableNameNodeRegistration() throws IOException {