HDFS-14593. RBF: Implement deletion feature for expired records in State Store. Contributed by Takanobu Asanuma.

This commit is contained in:
Ayush Saxena 2019-07-15 22:38:00 +05:30
parent 61bbdeee19
commit 64d4abf489
13 changed files with 288 additions and 37 deletions

View File

@ -201,10 +201,18 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
FEDERATION_STORE_PREFIX + "membership.expiration";
public static final long FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS_DEFAULT =
TimeUnit.MINUTES.toMillis(5);
public static final String FEDERATION_STORE_MEMBERSHIP_EXPIRATION_DELETION_MS
= FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS + ".deletion";
public static final long
FEDERATION_STORE_MEMBERSHIP_EXPIRATION_DELETION_MS_DEFAULT = -1;
public static final String FEDERATION_STORE_ROUTER_EXPIRATION_MS =
FEDERATION_STORE_PREFIX + "router.expiration";
public static final long FEDERATION_STORE_ROUTER_EXPIRATION_MS_DEFAULT =
TimeUnit.MINUTES.toMillis(5);
public static final String FEDERATION_STORE_ROUTER_EXPIRATION_DELETION_MS =
FEDERATION_STORE_ROUTER_EXPIRATION_MS + ".deletion";
public static final long
FEDERATION_STORE_ROUTER_EXPIRATION_DELETION_MS_DEFAULT = -1;
// HDFS Router safe mode
public static final String DFS_ROUTER_SAFEMODE_ENABLE =

View File

@ -19,7 +19,6 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.locks.Lock;
@ -164,13 +163,15 @@ private boolean isUpdateTime() {
/**
* Updates the state store with any record overrides we detected, such as an
* expired state.
* expired state. If an expired record exists beyond deletion time, it is
* removed.
*
* @param query RecordQueryResult containing the data to be inspected.
* @throws IOException If the values cannot be updated.
*/
public void overrideExpiredRecords(QueryResult<R> query) throws IOException {
List<R> commitRecords = new ArrayList<>();
List<R> deleteRecords = new ArrayList<>();
List<R> newRecords = query.getRecords();
long currentDriverTime = query.getTimestamp();
if (newRecords == null || currentDriverTime <= 0) {
@ -178,7 +179,16 @@ public void overrideExpiredRecords(QueryResult<R> query) throws IOException {
return;
}
for (R record : newRecords) {
if (record.checkExpired(currentDriverTime)) {
if (record.shouldBeDeleted(currentDriverTime)) {
String recordName = StateStoreUtils.getRecordName(record.getClass());
if (getDriver().remove(record)) {
deleteRecords.add(record);
LOG.info("Deleted State Store record {}: {}", recordName, record);
} else {
LOG.warn("Couldn't delete State Store record {}: {}", recordName,
record);
}
} else if (record.checkExpired(currentDriverTime)) {
String recordName = StateStoreUtils.getRecordName(record.getClass());
LOG.info("Override State Store record {}: {}", recordName, record);
commitRecords.add(record);
@ -187,6 +197,9 @@ public void overrideExpiredRecords(QueryResult<R> query) throws IOException {
if (commitRecords.size() > 0) {
getDriver().putAll(commitRecords, true, false);
}
if (deleteRecords.size() > 0) {
newRecords.removeAll(deleteRecords);
}
}
/**
@ -197,7 +210,8 @@ public void overrideExpiredRecords(QueryResult<R> query) throws IOException {
* @throws IOException If the values cannot be updated.
*/
public void overrideExpiredRecord(R record) throws IOException {
List<R> newRecords = Collections.singletonList(record);
List<R> newRecords = new ArrayList<>();
newRecords.add(record);
long time = getDriver().getTime();
QueryResult<R> query = new QueryResult<>(newRecords, time);
overrideExpiredRecords(query);

View File

@ -160,15 +160,27 @@ protected void serviceInit(Configuration config) throws Exception {
this.addService(monitorService);
// Set expirations intervals for each record
MembershipState.setExpirationMs(conf.getLong(
MembershipState.setExpirationMs(conf.getTimeDuration(
RBFConfigKeys.FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS,
RBFConfigKeys.FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS_DEFAULT));
RBFConfigKeys.FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS_DEFAULT,
TimeUnit.MILLISECONDS));
MembershipState.setDeletionMs(conf.getTimeDuration(
RBFConfigKeys.FEDERATION_STORE_MEMBERSHIP_EXPIRATION_DELETION_MS,
RBFConfigKeys
.FEDERATION_STORE_MEMBERSHIP_EXPIRATION_DELETION_MS_DEFAULT,
TimeUnit.MILLISECONDS));
RouterState.setExpirationMs(conf.getTimeDuration(
RBFConfigKeys.FEDERATION_STORE_ROUTER_EXPIRATION_MS,
RBFConfigKeys.FEDERATION_STORE_ROUTER_EXPIRATION_MS_DEFAULT,
TimeUnit.MILLISECONDS));
RouterState.setDeletionMs(conf.getTimeDuration(
RBFConfigKeys.FEDERATION_STORE_ROUTER_EXPIRATION_DELETION_MS,
RBFConfigKeys.FEDERATION_STORE_ROUTER_EXPIRATION_DELETION_MS_DEFAULT,
TimeUnit.MILLISECONDS));
// Cache update service
this.cacheUpdater = new StateStoreCacheUpdateService(this);
addService(this.cacheUpdater);

View File

@ -74,6 +74,26 @@ public abstract class BaseRecord implements Comparable<BaseRecord> {
*/
public abstract long getExpirationMs();
/**
* Check if this record is expired. The default is false. Override for
* customized behavior.
*
* @return True if the record is expired.
*/
public boolean isExpired() {
return false;
}
/**
* Get the deletion time for the expired record. The default is disabled.
* Override for customized behavior.
*
* @return Deletion time for the expired record.
*/
public long getDeletionMs() {
return -1;
}
/**
* Map of primary key names to values for the record. The primary key can be
* a combination of 1-n different State Store serialized values.
@ -202,12 +222,34 @@ public int compareTo(BaseRecord record) {
*/
public boolean checkExpired(long currentTime) {
long expiration = getExpirationMs();
if (getDateModified() > 0 && expiration > 0) {
return (getDateModified() + expiration) < currentTime;
long modifiedTime = getDateModified();
if (modifiedTime > 0 && expiration > 0) {
return (modifiedTime + expiration) < currentTime;
}
return false;
}
/**
* Called when this record is expired and expired deletion is enabled, checks
* for the deletion. If an expired record exists beyond the deletion time, it
* should be deleted.
*
* @param currentTime The current timestamp in ms from the data store, to be
* compared against the modification and creation dates of the
* object.
* @return boolean True if the record has been updated and should be
* deleted from the data store.
*/
public boolean shouldBeDeleted(long currentTime) {
long deletionTime = getDeletionMs();
if (isExpired() && deletionTime > 0) {
long elapsedTime = currentTime - (getDateModified() + getExpirationMs());
return elapsedTime > deletionTime;
} else {
return false;
}
}
/**
* Validates the record. Called when the record is created, populated from the
* state store, and before committing to the state store. If validate failed,

View File

@ -48,6 +48,8 @@ public abstract class MembershipState extends BaseRecord
/** Expiration time in ms for this entry. */
private static long expirationMs;
/** Deletion time in ms for this expired entry. */
private static long deletionMs;
/** Comparator based on the name.*/
public static final Comparator<MembershipState> NAME_COMPARATOR =
@ -330,4 +332,23 @@ public long getExpirationMs() {
public static void setExpirationMs(long time) {
MembershipState.expirationMs = time;
}
@Override
public boolean isExpired() {
return getState() == EXPIRED;
}
@Override
public long getDeletionMs() {
return MembershipState.deletionMs;
}
/**
* Set the deletion time for this class.
*
* @param time Deletion time in milliseconds.
*/
public static void setDeletionMs(long time) {
MembershipState.deletionMs = time;
}
}

View File

@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hdfs.server.federation.store.records;
import java.util.Collections;
import java.util.List;
/**
@ -42,7 +41,7 @@ public QueryResult(final List<T> recs, final long time) {
* @return List of records.
*/
public List<T> getRecords() {
return Collections.unmodifiableList(this.records);
return this.records;
}
/**

View File

@ -40,6 +40,9 @@ public abstract class RouterState extends BaseRecord {
/** Expiration time in ms for this entry. */
private static long expirationMs;
/** Deletion time in ms for this entry when it is expired. */
private static long deletionMs;
/**
* Constructors.
*/
@ -169,4 +172,18 @@ public long getExpirationMs() {
public static void setExpirationMs(long time) {
RouterState.expirationMs = time;
}
@Override
public boolean isExpired() {
return getStatus() == RouterServiceState.EXPIRED;
}
@Override
public long getDeletionMs() {
return RouterState.deletionMs;
}
public static void setDeletionMs(long time) {
RouterState.deletionMs = time;
}
}

View File

@ -315,7 +315,9 @@ public long getLastContact() {
@Override
public void setDateModified(long time) {
this.translator.getBuilder().setDateModified(time);
if (getState() != FederationNamenodeServiceState.EXPIRED) {
this.translator.getBuilder().setDateModified(time);
}
}
@Override

View File

@ -182,7 +182,9 @@ public long getDateStarted() {
@Override
public void setDateModified(long time) {
this.translator.getBuilder().setDateModified(time);
if (getStatus() != RouterServiceState.EXPIRED) {
this.translator.getBuilder().setDateModified(time);
}
}
@Override

View File

@ -345,6 +345,16 @@
</description>
</property>
<property>
<name>dfs.federation.router.store.membership.expiration.deletion</name>
<value>-1</value>
<description>
Deletion time in milliseconds for a membership record. If an expired
membership record exists beyond this time, it will be deleted. If this
value is negative, the deletion is disabled.
</description>
</property>
<property>
<name>dfs.federation.router.heartbeat.enable</name>
<value>true</value>
@ -391,6 +401,16 @@
</description>
</property>
<property>
<name>dfs.federation.router.store.router.expiration.deletion</name>
<value>-1</value>
<description>
Deletion time in milliseconds for a router state record. If an expired
router state record exists beyond this time, it will be deleted. If this
value is negative, the deletion is disabled.
</description>
</property>
<property>
<name>dfs.federation.router.safemode.enable</name>
<value>true</value>

View File

@ -34,6 +34,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
@ -43,6 +44,7 @@
import org.apache.hadoop.hdfs.server.federation.store.protocol.NamenodeHeartbeatResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateNamenodeRegistrationRequest;
import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
import org.junit.Before;
import org.junit.BeforeClass;
@ -57,10 +59,14 @@ public class TestStateStoreMembershipState extends TestStateStoreBase {
@BeforeClass
public static void create() {
// Reduce expirations to 5 seconds
// Reduce expirations to 2 seconds
getConf().setLong(
RBFConfigKeys.FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS,
TimeUnit.SECONDS.toMillis(5));
TimeUnit.SECONDS.toMillis(2));
// Set deletion time to 2 seconds
getConf().setLong(
RBFConfigKeys.FEDERATION_STORE_MEMBERSHIP_EXPIRATION_DELETION_MS,
TimeUnit.SECONDS.toMillis(2));
}
@Before
@ -363,8 +369,8 @@ public void testRegistrationNoQuorum()
}
@Test
public void testRegistrationExpired()
throws InterruptedException, IOException {
public void testRegistrationExpiredAndDeletion()
throws InterruptedException, IOException, TimeoutException {
// Populate the state store with a single NN element
// 1) ns0:nn0 - Active
@ -385,20 +391,32 @@ public void testRegistrationExpired()
assertNotNull(quorumEntry);
assertEquals(ROUTERS[0], quorumEntry.getRouterId());
assertEquals(FederationNamenodeServiceState.ACTIVE, quorumEntry.getState());
// Wait past expiration (set in conf to 5 seconds)
Thread.sleep(6000);
// Reload cache
assertTrue(getStateStore().loadCache(MembershipStore.class, true));
// Verify entry is now expired and is no longer in the cache
quorumEntry = getNamenodeRegistration(NAMESERVICES[0], NAMENODES[0]);
quorumEntry = getExpiredNamenodeRegistration(report.getNameserviceId(),
report.getNamenodeId());
assertNull(quorumEntry);
// Wait past expiration (set in conf to 2 seconds)
GenericTestUtils.waitFor(() -> {
try {
assertTrue(getStateStore().loadCache(MembershipStore.class, true));
// Verify entry is expired and is no longer in the cache
return getNamenodeRegistration(NAMESERVICES[0], NAMENODES[0]) == null;
} catch (IOException e) {
return false;
}
}, 100, 3000);
// Verify entry is in expired membership records
quorumEntry = getExpiredNamenodeRegistration(NAMESERVICES[0], NAMENODES[0]);
assertNotNull(quorumEntry);
// Verify entry is now expired and can't be used by RPC service
quorumEntry = getNamenodeRegistration(
report.getNameserviceId(), report.getNamenodeId());
assertNull(quorumEntry);
quorumEntry = getExpiredNamenodeRegistration(
report.getNameserviceId(), report.getNamenodeId());
assertNotNull(quorumEntry);
// Heartbeat again, updates dateModified
assertTrue(namenodeHeartbeat(report));
@ -411,6 +429,36 @@ public void testRegistrationExpired()
assertNotNull(quorumEntry);
assertEquals(ROUTERS[0], quorumEntry.getRouterId());
assertEquals(FederationNamenodeServiceState.ACTIVE, quorumEntry.getState());
quorumEntry = getExpiredNamenodeRegistration(
report.getNameserviceId(), report.getNamenodeId());
assertNull(quorumEntry);
// Wait past expiration (set in conf to 2 seconds)
GenericTestUtils.waitFor(() -> {
try {
assertTrue(getStateStore().loadCache(MembershipStore.class, true));
// Verify entry is expired and is no longer in the cache
return getNamenodeRegistration(NAMESERVICES[0], NAMENODES[0]) == null;
} catch (IOException e) {
return false;
}
}, 100, 3000);
// Verify entry is in expired membership records
quorumEntry = getExpiredNamenodeRegistration(NAMESERVICES[0], NAMENODES[0]);
assertNotNull(quorumEntry);
// Wait past deletion (set in conf to 2 seconds)
GenericTestUtils.waitFor(() -> {
try {
assertTrue(getStateStore().loadCache(MembershipStore.class, true));
// Verify entry is deleted from even the expired membership records
return getExpiredNamenodeRegistration(NAMESERVICES[0], NAMENODES[0])
== null;
} catch (IOException e) {
return false;
}
}, 100, 3000);
}
/**
@ -441,6 +489,34 @@ private MembershipState getNamenodeRegistration(
return null;
}
/**
* Get a single expired namenode membership record from the store.
*
* @param nsId The HDFS nameservice ID to search for
* @param nnId The HDFS namenode ID to search for
* @return The single expired NamenodeMembershipRecord that matches the query
* or null if not found.
* @throws IOException if the query could not be executed.
*/
private MembershipState getExpiredNamenodeRegistration(
final String nsId, final String nnId) throws IOException {
MembershipState partial = MembershipState.newInstance();
partial.setNameserviceId(nsId);
partial.setNamenodeId(nnId);
GetNamenodeRegistrationsRequest request =
GetNamenodeRegistrationsRequest.newInstance(partial);
GetNamenodeRegistrationsResponse response =
membershipStore.getExpiredNamenodeRegistrations(request);
List<MembershipState> results = response.getNamenodeMemberships();
if (results != null && results.size() == 1) {
MembershipState record = results.get(0);
return record;
}
return null;
}
/**
* Register a namenode heartbeat with the state store.
*

View File

@ -28,6 +28,7 @@
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.hdfs.server.federation.router.FederationUtil;
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
@ -36,6 +37,7 @@
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationsRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RouterHeartbeatRequest;
import org.apache.hadoop.hdfs.server.federation.store.records.RouterState;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
import org.junit.Before;
import org.junit.BeforeClass;
@ -50,10 +52,14 @@ public class TestStateStoreRouterState extends TestStateStoreBase {
@BeforeClass
public static void create() {
// Reduce expirations to 5 seconds
// Reduce expirations to 2 seconds
getConf().setTimeDuration(
RBFConfigKeys.FEDERATION_STORE_ROUTER_EXPIRATION_MS,
5, TimeUnit.SECONDS);
2, TimeUnit.SECONDS);
// Set deletion time to 2 seconds
getConf().setTimeDuration(
RBFConfigKeys.FEDERATION_STORE_ROUTER_EXPIRATION_DELETION_MS,
2, TimeUnit.SECONDS);
}
@Before
@ -130,8 +136,8 @@ public void testUpdateRouterStatus()
}
@Test
public void testRouterStateExpired()
throws IOException, InterruptedException {
public void testRouterStateExpiredAndDeletion()
throws IOException, InterruptedException, TimeoutException {
long dateStarted = Time.now();
String address = "testaddress";
@ -149,17 +155,46 @@ public void testRouterStateExpired()
routerStore.getRouterRegistration(getRequest).getRouter();
assertNotNull(record);
// Wait past expiration (set to 5 sec in config)
Thread.sleep(6000);
// Wait past expiration (set in conf to 2 seconds)
GenericTestUtils.waitFor(() -> {
try {
RouterState routerState = routerStore
.getRouterRegistration(getRequest).getRouter();
// Verify entry is expired
return routerState.getStatus() == RouterServiceState.EXPIRED;
} catch (IOException e) {
return false;
}
}, 100, 3000);
// Verify expired
RouterState r = routerStore.getRouterRegistration(getRequest).getRouter();
assertEquals(RouterServiceState.EXPIRED, r.getStatus());
// Heartbeat again and this shouldn't be EXPIRED anymore
// Heartbeat again and this shouldn't be EXPIRED at this point
assertTrue(routerStore.routerHeartbeat(request).getStatus());
r = routerStore.getRouterRegistration(getRequest).getRouter();
RouterState r = routerStore.getRouterRegistration(getRequest).getRouter();
assertEquals(RouterServiceState.RUNNING, r.getStatus());
// Wait past expiration (set in conf to 2 seconds)
GenericTestUtils.waitFor(() -> {
try {
RouterState routerState = routerStore
.getRouterRegistration(getRequest).getRouter();
// Verify entry is expired
return routerState.getStatus() == RouterServiceState.EXPIRED;
} catch (IOException e) {
return false;
}
}, 100, 3000);
// Wait deletion (set in conf to 2 seconds)
GenericTestUtils.waitFor(() -> {
try {
RouterState routerState = routerStore
.getRouterRegistration(getRequest).getRouter();
// Verify entry is deleted
return routerState.getStatus() == null;
} catch (IOException e) {
return false;
}
}, 100, 3000);
}
@Test

View File

@ -184,7 +184,10 @@ private boolean validateRecord(
long now = stateStore.getDriver().getTime();
assertTrue(
committed.getDateCreated() <= now && committed.getDateCreated() > 0);
assertTrue(committed.getDateModified() >= committed.getDateCreated());
// since expired record doesn't update the modification time, let's skip it
if (!committed.isExpired()) {
assertTrue(committed.getDateModified() >= committed.getDateCreated());
}
return ret;
}