diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java index 1daebdc77e..a2bec12226 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java @@ -201,10 +201,18 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic { FEDERATION_STORE_PREFIX + "membership.expiration"; public static final long FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS_DEFAULT = TimeUnit.MINUTES.toMillis(5); + public static final String FEDERATION_STORE_MEMBERSHIP_EXPIRATION_DELETION_MS + = FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS + ".deletion"; + public static final long + FEDERATION_STORE_MEMBERSHIP_EXPIRATION_DELETION_MS_DEFAULT = -1; public static final String FEDERATION_STORE_ROUTER_EXPIRATION_MS = FEDERATION_STORE_PREFIX + "router.expiration"; public static final long FEDERATION_STORE_ROUTER_EXPIRATION_MS_DEFAULT = TimeUnit.MINUTES.toMillis(5); + public static final String FEDERATION_STORE_ROUTER_EXPIRATION_DELETION_MS = + FEDERATION_STORE_ROUTER_EXPIRATION_MS + ".deletion"; + public static final long + FEDERATION_STORE_ROUTER_EXPIRATION_DELETION_MS_DEFAULT = -1; // HDFS Router safe mode public static final String DFS_ROUTER_SAFEMODE_ENABLE = diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java index 5cfb521fa0..7b28c03a52 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java @@ -19,7 +19,6 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.concurrent.locks.Lock; @@ -164,13 +163,15 @@ private boolean isUpdateTime() { /** * Updates the state store with any record overrides we detected, such as an - * expired state. + * expired state. If an expired record exists beyond deletion time, it is + * removed. * * @param query RecordQueryResult containing the data to be inspected. * @throws IOException If the values cannot be updated. */ public void overrideExpiredRecords(QueryResult query) throws IOException { List commitRecords = new ArrayList<>(); + List deleteRecords = new ArrayList<>(); List newRecords = query.getRecords(); long currentDriverTime = query.getTimestamp(); if (newRecords == null || currentDriverTime <= 0) { @@ -178,7 +179,16 @@ public void overrideExpiredRecords(QueryResult query) throws IOException { return; } for (R record : newRecords) { - if (record.checkExpired(currentDriverTime)) { + if (record.shouldBeDeleted(currentDriverTime)) { + String recordName = StateStoreUtils.getRecordName(record.getClass()); + if (getDriver().remove(record)) { + deleteRecords.add(record); + LOG.info("Deleted State Store record {}: {}", recordName, record); + } else { + LOG.warn("Couldn't delete State Store record {}: {}", recordName, + record); + } + } else if (record.checkExpired(currentDriverTime)) { String recordName = StateStoreUtils.getRecordName(record.getClass()); LOG.info("Override State Store record {}: {}", recordName, record); commitRecords.add(record); @@ -187,6 +197,9 @@ public void overrideExpiredRecords(QueryResult query) throws IOException { if (commitRecords.size() > 0) { getDriver().putAll(commitRecords, true, false); } + if (deleteRecords.size() > 0) { + newRecords.removeAll(deleteRecords); + } } /** @@ -197,7 +210,8 @@ public void overrideExpiredRecords(QueryResult query) throws IOException { * @throws IOException If the values cannot be updated. */ public void overrideExpiredRecord(R record) throws IOException { - List newRecords = Collections.singletonList(record); + List newRecords = new ArrayList<>(); + newRecords.add(record); long time = getDriver().getTime(); QueryResult query = new QueryResult<>(newRecords, time); overrideExpiredRecords(query); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java index 37b62fb0b0..66c288238e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java @@ -160,15 +160,27 @@ protected void serviceInit(Configuration config) throws Exception { this.addService(monitorService); // Set expirations intervals for each record - MembershipState.setExpirationMs(conf.getLong( + MembershipState.setExpirationMs(conf.getTimeDuration( RBFConfigKeys.FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS, - RBFConfigKeys.FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS_DEFAULT)); + RBFConfigKeys.FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS_DEFAULT, + TimeUnit.MILLISECONDS)); + + MembershipState.setDeletionMs(conf.getTimeDuration( + RBFConfigKeys.FEDERATION_STORE_MEMBERSHIP_EXPIRATION_DELETION_MS, + RBFConfigKeys + .FEDERATION_STORE_MEMBERSHIP_EXPIRATION_DELETION_MS_DEFAULT, + TimeUnit.MILLISECONDS)); RouterState.setExpirationMs(conf.getTimeDuration( RBFConfigKeys.FEDERATION_STORE_ROUTER_EXPIRATION_MS, RBFConfigKeys.FEDERATION_STORE_ROUTER_EXPIRATION_MS_DEFAULT, TimeUnit.MILLISECONDS)); + RouterState.setDeletionMs(conf.getTimeDuration( + RBFConfigKeys.FEDERATION_STORE_ROUTER_EXPIRATION_DELETION_MS, + RBFConfigKeys.FEDERATION_STORE_ROUTER_EXPIRATION_DELETION_MS_DEFAULT, + TimeUnit.MILLISECONDS)); + // Cache update service this.cacheUpdater = new StateStoreCacheUpdateService(this); addService(this.cacheUpdater); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/BaseRecord.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/BaseRecord.java index 7212f3afbb..86721eaa47 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/BaseRecord.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/BaseRecord.java @@ -74,6 +74,26 @@ public abstract class BaseRecord implements Comparable { */ public abstract long getExpirationMs(); + /** + * Check if this record is expired. The default is false. Override for + * customized behavior. + * + * @return True if the record is expired. + */ + public boolean isExpired() { + return false; + } + + /** + * Get the deletion time for the expired record. The default is disabled. + * Override for customized behavior. + * + * @return Deletion time for the expired record. + */ + public long getDeletionMs() { + return -1; + } + /** * Map of primary key names to values for the record. The primary key can be * a combination of 1-n different State Store serialized values. @@ -202,12 +222,34 @@ public int compareTo(BaseRecord record) { */ public boolean checkExpired(long currentTime) { long expiration = getExpirationMs(); - if (getDateModified() > 0 && expiration > 0) { - return (getDateModified() + expiration) < currentTime; + long modifiedTime = getDateModified(); + if (modifiedTime > 0 && expiration > 0) { + return (modifiedTime + expiration) < currentTime; } return false; } + /** + * Called when this record is expired and expired deletion is enabled, checks + * for the deletion. If an expired record exists beyond the deletion time, it + * should be deleted. + * + * @param currentTime The current timestamp in ms from the data store, to be + * compared against the modification and creation dates of the + * object. + * @return boolean True if the record has been updated and should be + * deleted from the data store. + */ + public boolean shouldBeDeleted(long currentTime) { + long deletionTime = getDeletionMs(); + if (isExpired() && deletionTime > 0) { + long elapsedTime = currentTime - (getDateModified() + getExpirationMs()); + return elapsedTime > deletionTime; + } else { + return false; + } + } + /** * Validates the record. Called when the record is created, populated from the * state store, and before committing to the state store. If validate failed, diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MembershipState.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MembershipState.java index 642c72b010..8b9f71338c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MembershipState.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MembershipState.java @@ -48,6 +48,8 @@ public abstract class MembershipState extends BaseRecord /** Expiration time in ms for this entry. */ private static long expirationMs; + /** Deletion time in ms for this expired entry. */ + private static long deletionMs; /** Comparator based on the name.*/ public static final Comparator NAME_COMPARATOR = @@ -330,4 +332,23 @@ public long getExpirationMs() { public static void setExpirationMs(long time) { MembershipState.expirationMs = time; } + + @Override + public boolean isExpired() { + return getState() == EXPIRED; + } + + @Override + public long getDeletionMs() { + return MembershipState.deletionMs; + } + + /** + * Set the deletion time for this class. + * + * @param time Deletion time in milliseconds. + */ + public static void setDeletionMs(long time) { + MembershipState.deletionMs = time; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/QueryResult.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/QueryResult.java index 64c2c71fe9..a5cda4b041 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/QueryResult.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/QueryResult.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hdfs.server.federation.store.records; -import java.util.Collections; import java.util.List; /** @@ -42,7 +41,7 @@ public QueryResult(final List recs, final long time) { * @return List of records. */ public List getRecords() { - return Collections.unmodifiableList(this.records); + return this.records; } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/RouterState.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/RouterState.java index 2fe6941ba1..761e2a4872 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/RouterState.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/RouterState.java @@ -40,6 +40,9 @@ public abstract class RouterState extends BaseRecord { /** Expiration time in ms for this entry. */ private static long expirationMs; + /** Deletion time in ms for this entry when it is expired. */ + private static long deletionMs; + /** * Constructors. */ @@ -169,4 +172,18 @@ public long getExpirationMs() { public static void setExpirationMs(long time) { RouterState.expirationMs = time; } + + @Override + public boolean isExpired() { + return getStatus() == RouterServiceState.EXPIRED; + } + + @Override + public long getDeletionMs() { + return RouterState.deletionMs; + } + + public static void setDeletionMs(long time) { + RouterState.deletionMs = time; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MembershipStatePBImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MembershipStatePBImpl.java index 614957b16c..4e2868a85a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MembershipStatePBImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MembershipStatePBImpl.java @@ -315,7 +315,9 @@ public long getLastContact() { @Override public void setDateModified(long time) { - this.translator.getBuilder().setDateModified(time); + if (getState() != FederationNamenodeServiceState.EXPIRED) { + this.translator.getBuilder().setDateModified(time); + } } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/RouterStatePBImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/RouterStatePBImpl.java index d837386585..107996c21b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/RouterStatePBImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/RouterStatePBImpl.java @@ -182,7 +182,9 @@ public long getDateStarted() { @Override public void setDateModified(long time) { - this.translator.getBuilder().setDateModified(time); + if (getStatus() != RouterServiceState.EXPIRED) { + this.translator.getBuilder().setDateModified(time); + } } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml index 3f743f9774..fba0869dd9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml @@ -345,6 +345,16 @@ + + dfs.federation.router.store.membership.expiration.deletion + -1 + + Deletion time in milliseconds for a membership record. If an expired + membership record exists beyond this time, it will be deleted. If this + value is negative, the deletion is disabled. + + + dfs.federation.router.heartbeat.enable true @@ -391,6 +401,16 @@ + + dfs.federation.router.store.router.expiration.deletion + -1 + + Deletion time in milliseconds for a router state record. If an expired + router state record exists beyond this time, it will be deleted. If this + value is negative, the deletion is disabled. + + + dfs.federation.router.safemode.enable true diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreMembershipState.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreMembershipState.java index dd349daa10..f1f15c67b4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreMembershipState.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreMembershipState.java @@ -34,6 +34,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState; import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys; @@ -43,6 +44,7 @@ import org.apache.hadoop.hdfs.server.federation.store.protocol.NamenodeHeartbeatResponse; import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateNamenodeRegistrationRequest; import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.Time; import org.junit.Before; import org.junit.BeforeClass; @@ -57,10 +59,14 @@ public class TestStateStoreMembershipState extends TestStateStoreBase { @BeforeClass public static void create() { - // Reduce expirations to 5 seconds + // Reduce expirations to 2 seconds getConf().setLong( RBFConfigKeys.FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS, - TimeUnit.SECONDS.toMillis(5)); + TimeUnit.SECONDS.toMillis(2)); + // Set deletion time to 2 seconds + getConf().setLong( + RBFConfigKeys.FEDERATION_STORE_MEMBERSHIP_EXPIRATION_DELETION_MS, + TimeUnit.SECONDS.toMillis(2)); } @Before @@ -363,8 +369,8 @@ public void testRegistrationNoQuorum() } @Test - public void testRegistrationExpired() - throws InterruptedException, IOException { + public void testRegistrationExpiredAndDeletion() + throws InterruptedException, IOException, TimeoutException { // Populate the state store with a single NN element // 1) ns0:nn0 - Active @@ -385,20 +391,32 @@ public void testRegistrationExpired() assertNotNull(quorumEntry); assertEquals(ROUTERS[0], quorumEntry.getRouterId()); assertEquals(FederationNamenodeServiceState.ACTIVE, quorumEntry.getState()); - - // Wait past expiration (set in conf to 5 seconds) - Thread.sleep(6000); - // Reload cache - assertTrue(getStateStore().loadCache(MembershipStore.class, true)); - - // Verify entry is now expired and is no longer in the cache - quorumEntry = getNamenodeRegistration(NAMESERVICES[0], NAMENODES[0]); + quorumEntry = getExpiredNamenodeRegistration(report.getNameserviceId(), + report.getNamenodeId()); assertNull(quorumEntry); + // Wait past expiration (set in conf to 2 seconds) + GenericTestUtils.waitFor(() -> { + try { + assertTrue(getStateStore().loadCache(MembershipStore.class, true)); + // Verify entry is expired and is no longer in the cache + return getNamenodeRegistration(NAMESERVICES[0], NAMENODES[0]) == null; + } catch (IOException e) { + return false; + } + }, 100, 3000); + + // Verify entry is in expired membership records + quorumEntry = getExpiredNamenodeRegistration(NAMESERVICES[0], NAMENODES[0]); + assertNotNull(quorumEntry); + // Verify entry is now expired and can't be used by RPC service quorumEntry = getNamenodeRegistration( report.getNameserviceId(), report.getNamenodeId()); assertNull(quorumEntry); + quorumEntry = getExpiredNamenodeRegistration( + report.getNameserviceId(), report.getNamenodeId()); + assertNotNull(quorumEntry); // Heartbeat again, updates dateModified assertTrue(namenodeHeartbeat(report)); @@ -411,6 +429,36 @@ public void testRegistrationExpired() assertNotNull(quorumEntry); assertEquals(ROUTERS[0], quorumEntry.getRouterId()); assertEquals(FederationNamenodeServiceState.ACTIVE, quorumEntry.getState()); + quorumEntry = getExpiredNamenodeRegistration( + report.getNameserviceId(), report.getNamenodeId()); + assertNull(quorumEntry); + + // Wait past expiration (set in conf to 2 seconds) + GenericTestUtils.waitFor(() -> { + try { + assertTrue(getStateStore().loadCache(MembershipStore.class, true)); + // Verify entry is expired and is no longer in the cache + return getNamenodeRegistration(NAMESERVICES[0], NAMENODES[0]) == null; + } catch (IOException e) { + return false; + } + }, 100, 3000); + + // Verify entry is in expired membership records + quorumEntry = getExpiredNamenodeRegistration(NAMESERVICES[0], NAMENODES[0]); + assertNotNull(quorumEntry); + + // Wait past deletion (set in conf to 2 seconds) + GenericTestUtils.waitFor(() -> { + try { + assertTrue(getStateStore().loadCache(MembershipStore.class, true)); + // Verify entry is deleted from even the expired membership records + return getExpiredNamenodeRegistration(NAMESERVICES[0], NAMENODES[0]) + == null; + } catch (IOException e) { + return false; + } + }, 100, 3000); } /** @@ -441,6 +489,34 @@ private MembershipState getNamenodeRegistration( return null; } + /** + * Get a single expired namenode membership record from the store. + * + * @param nsId The HDFS nameservice ID to search for + * @param nnId The HDFS namenode ID to search for + * @return The single expired NamenodeMembershipRecord that matches the query + * or null if not found. + * @throws IOException if the query could not be executed. + */ + private MembershipState getExpiredNamenodeRegistration( + final String nsId, final String nnId) throws IOException { + + MembershipState partial = MembershipState.newInstance(); + partial.setNameserviceId(nsId); + partial.setNamenodeId(nnId); + GetNamenodeRegistrationsRequest request = + GetNamenodeRegistrationsRequest.newInstance(partial); + GetNamenodeRegistrationsResponse response = + membershipStore.getExpiredNamenodeRegistrations(request); + + List results = response.getNamenodeMemberships(); + if (results != null && results.size() == 1) { + MembershipState record = results.get(0); + return record; + } + return null; + } + /** * Register a namenode heartbeat with the state store. * diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreRouterState.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreRouterState.java index db1df198c5..9b459f3ba2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreRouterState.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreRouterState.java @@ -28,6 +28,7 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.hadoop.hdfs.server.federation.router.FederationUtil; import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys; @@ -36,6 +37,7 @@ import org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationsRequest; import org.apache.hadoop.hdfs.server.federation.store.protocol.RouterHeartbeatRequest; import org.apache.hadoop.hdfs.server.federation.store.records.RouterState; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.Time; import org.junit.Before; import org.junit.BeforeClass; @@ -50,10 +52,14 @@ public class TestStateStoreRouterState extends TestStateStoreBase { @BeforeClass public static void create() { - // Reduce expirations to 5 seconds + // Reduce expirations to 2 seconds getConf().setTimeDuration( RBFConfigKeys.FEDERATION_STORE_ROUTER_EXPIRATION_MS, - 5, TimeUnit.SECONDS); + 2, TimeUnit.SECONDS); + // Set deletion time to 2 seconds + getConf().setTimeDuration( + RBFConfigKeys.FEDERATION_STORE_ROUTER_EXPIRATION_DELETION_MS, + 2, TimeUnit.SECONDS); } @Before @@ -130,8 +136,8 @@ public void testUpdateRouterStatus() } @Test - public void testRouterStateExpired() - throws IOException, InterruptedException { + public void testRouterStateExpiredAndDeletion() + throws IOException, InterruptedException, TimeoutException { long dateStarted = Time.now(); String address = "testaddress"; @@ -149,17 +155,46 @@ public void testRouterStateExpired() routerStore.getRouterRegistration(getRequest).getRouter(); assertNotNull(record); - // Wait past expiration (set to 5 sec in config) - Thread.sleep(6000); + // Wait past expiration (set in conf to 2 seconds) + GenericTestUtils.waitFor(() -> { + try { + RouterState routerState = routerStore + .getRouterRegistration(getRequest).getRouter(); + // Verify entry is expired + return routerState.getStatus() == RouterServiceState.EXPIRED; + } catch (IOException e) { + return false; + } + }, 100, 3000); - // Verify expired - RouterState r = routerStore.getRouterRegistration(getRequest).getRouter(); - assertEquals(RouterServiceState.EXPIRED, r.getStatus()); - - // Heartbeat again and this shouldn't be EXPIRED anymore + // Heartbeat again and this shouldn't be EXPIRED at this point assertTrue(routerStore.routerHeartbeat(request).getStatus()); - r = routerStore.getRouterRegistration(getRequest).getRouter(); + RouterState r = routerStore.getRouterRegistration(getRequest).getRouter(); assertEquals(RouterServiceState.RUNNING, r.getStatus()); + + // Wait past expiration (set in conf to 2 seconds) + GenericTestUtils.waitFor(() -> { + try { + RouterState routerState = routerStore + .getRouterRegistration(getRequest).getRouter(); + // Verify entry is expired + return routerState.getStatus() == RouterServiceState.EXPIRED; + } catch (IOException e) { + return false; + } + }, 100, 3000); + + // Wait deletion (set in conf to 2 seconds) + GenericTestUtils.waitFor(() -> { + try { + RouterState routerState = routerStore + .getRouterRegistration(getRequest).getRouter(); + // Verify entry is deleted + return routerState.getStatus() == null; + } catch (IOException e) { + return false; + } + }, 100, 3000); } @Test diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java index d6c829ba7e..b3a9fb5adc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java @@ -184,7 +184,10 @@ private boolean validateRecord( long now = stateStore.getDriver().getTime(); assertTrue( committed.getDateCreated() <= now && committed.getDateCreated() > 0); - assertTrue(committed.getDateModified() >= committed.getDateCreated()); + // since expired record doesn't update the modification time, let's skip it + if (!committed.isExpired()) { + assertTrue(committed.getDateModified() >= committed.getDateCreated()); + } return ret; }