HDFS-16844: RBF: Adds resilancy when StateStore gets exceptions. (#5138)

Allows the StateStore to stay up when there are errors reading the data.
This commit is contained in:
Owen O'Malley 2022-11-18 17:24:08 +00:00
parent 9b3ffe960e
commit 8ff54dac58
6 changed files with 203 additions and 8 deletions

View File

@ -123,9 +123,13 @@ public boolean loadCache(boolean force) {
// Our cache depends on the store, update it first // Our cache depends on the store, update it first
try { try {
MembershipStore membership = getMembershipStore(); MembershipStore membership = getMembershipStore();
membership.loadCache(force); if (!membership.loadCache(force)) {
return false;
}
DisabledNameserviceStore disabled = getDisabledNameserviceStore(); DisabledNameserviceStore disabled = getDisabledNameserviceStore();
disabled.loadCache(force); if (!disabled.loadCache(force)) {
return false;
}
} catch (IOException e) { } catch (IOException e) {
LOG.error("Cannot update membership from the State Store", e); LOG.error("Cannot update membership from the State Store", e);
} }

View File

@ -398,7 +398,9 @@ public boolean loadCache(boolean force) {
try { try {
// Our cache depends on the store, update it first // Our cache depends on the store, update it first
MountTableStore mountTable = this.getMountTableStore(); MountTableStore mountTable = this.getMountTableStore();
mountTable.loadCache(force); if (!mountTable.loadCache(force)) {
return false;
}
GetMountTableEntriesRequest request = GetMountTableEntriesRequest request =
GetMountTableEntriesRequest.newInstance("/"); GetMountTableEntriesRequest.newInstance("/");

View File

@ -100,7 +100,7 @@ protected CachedRecordStore(
* @throws StateStoreUnavailableException If the cache is not initialized. * @throws StateStoreUnavailableException If the cache is not initialized.
*/ */
private void checkCacheAvailable() throws StateStoreUnavailableException { private void checkCacheAvailable() throws StateStoreUnavailableException {
if (!this.initialized) { if (!getDriver().isDriverReady() || !this.initialized) {
throw new StateStoreUnavailableException( throw new StateStoreUnavailableException(
"Cached State Store not initialized, " + "Cached State Store not initialized, " +
getRecordClass().getSimpleName() + " records not valid"); getRecordClass().getSimpleName() + " records not valid");
@ -125,7 +125,6 @@ public boolean loadCache(boolean force) throws IOException {
} catch (IOException e) { } catch (IOException e) {
LOG.error("Cannot get \"{}\" records from the State Store", LOG.error("Cannot get \"{}\" records from the State Store",
getRecordClass().getSimpleName()); getRecordClass().getSimpleName());
this.initialized = false;
return false; return false;
} }

View File

@ -185,7 +185,9 @@ public NamenodeHeartbeatResponse namenodeHeartbeat(
@Override @Override
public boolean loadCache(boolean force) throws IOException { public boolean loadCache(boolean force) throws IOException {
super.loadCache(force); if (!super.loadCache(force)) {
return false;
}
// Update local cache atomically // Update local cache atomically
cacheWriteLock.lock(); cacheWriteLock.lock();

View File

@ -0,0 +1,139 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.store.records;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils;
import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreBaseImpl;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
/**
* A mock StateStoreDriver that runs in memory that can force IOExceptions
* upon demand.
*/
public class MockStateStoreDriver extends StateStoreBaseImpl {
private boolean giveErrors = false;
private boolean initialized = false;
private final Map<String, Map<String, BaseRecord>> valueMap = new HashMap<>();
@Override
public boolean initDriver() {
initialized = true;
return true;
}
@Override
public <T extends BaseRecord> boolean initRecordStorage(String className,
Class<T> clazz) {
return true;
}
@Override
public boolean isDriverReady() {
return initialized;
}
@Override
public void close() throws Exception {
valueMap.clear();
initialized = false;
}
/**
* Should this object throw an IOException on each following call?
* @param value should we throw errors?
*/
public void setGiveErrors(boolean value) {
giveErrors = value;
}
/**
* Check to see if this StateStore should throw IOException on each call.
* @throws IOException thrown if giveErrors has been set
*/
private void checkErrors() throws IOException {
if (giveErrors) {
throw new IOException("Induced errors");
}
}
@Override
@SuppressWarnings("unchecked")
public <T extends BaseRecord> QueryResult<T> get(Class<T> clazz) throws IOException {
checkErrors();
Map<String, BaseRecord> map = valueMap.get(StateStoreUtils.getRecordName(clazz));
List<T> results =
map != null ? new ArrayList<>((Collection<T>) map.values()) : new ArrayList<>();
return new QueryResult<>(results, System.currentTimeMillis());
}
@Override
public <T extends BaseRecord> boolean putAll(List<T> records,
boolean allowUpdate,
boolean errorIfExists)
throws IOException {
checkErrors();
for (T record : records) {
Map<String, BaseRecord> map =
valueMap.computeIfAbsent(StateStoreUtils.getRecordName(record.getClass()),
k -> new HashMap<>());
String key = record.getPrimaryKey();
BaseRecord oldRecord = map.get(key);
if (oldRecord == null || allowUpdate) {
map.put(key, record);
} else if (errorIfExists) {
throw new IOException("Record already exists for " + record.getClass()
+ ": " + key);
}
}
return true;
}
@Override
public <T extends BaseRecord> boolean removeAll(Class<T> clazz) throws IOException {
checkErrors();
return valueMap.remove(StateStoreUtils.getRecordName(clazz)) != null;
}
@Override
@SuppressWarnings("unchecked")
public <T extends BaseRecord> int remove(Class<T> clazz,
Query<T> query)
throws IOException {
checkErrors();
int result = 0;
Map<String, BaseRecord> map =
valueMap.get(StateStoreUtils.getRecordName(clazz));
if (map != null) {
for (Iterator<BaseRecord> itr = map.values().iterator(); itr.hasNext();) {
BaseRecord record = itr.next();
if (query.matches((T) record)) {
itr.remove();
result += 1;
}
}
}
return result;
}
}

View File

@ -1,4 +1,4 @@
/** /*
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
@ -20,8 +20,16 @@
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import java.io.IOException; import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
import org.apache.hadoop.hdfs.server.federation.router.RouterServiceState; import org.apache.hadoop.hdfs.server.federation.router.RouterServiceState;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer; import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
import org.junit.Test; import org.junit.Test;
@ -40,7 +48,7 @@ public class TestRouterState {
private static final RouterServiceState STATE = RouterServiceState.RUNNING; private static final RouterServiceState STATE = RouterServiceState.RUNNING;
private RouterState generateRecord() throws IOException { private RouterState generateRecord() {
RouterState record = RouterState.newInstance(ADDRESS, START_TIME, STATE); RouterState record = RouterState.newInstance(ADDRESS, START_TIME, STATE);
record.setVersion(VERSION); record.setVersion(VERSION);
record.setCompileInfo(COMPILE_INFO); record.setCompileInfo(COMPILE_INFO);
@ -82,4 +90,45 @@ public void testSerialization() throws IOException {
validateRecord(newRecord); validateRecord(newRecord);
} }
@Test
public void testStateStoreResilience() throws Exception {
StateStoreService service = new StateStoreService();
Configuration conf = new Configuration();
conf.setClass(RBFConfigKeys.FEDERATION_STORE_DRIVER_CLASS,
MockStateStoreDriver.class,
StateStoreDriver.class);
conf.setBoolean(RBFConfigKeys.DFS_ROUTER_METRICS_ENABLE, false);
service.init(conf);
MockStateStoreDriver driver = (MockStateStoreDriver) service.getDriver();
// Add two records for block1
driver.put(MembershipState.newInstance("routerId", "ns1",
"ns1-ha1", "cluster1", "block1", "rpc1",
"service1", "lifeline1", "https", "nn01",
FederationNamenodeServiceState.ACTIVE, false), false, false);
driver.put(MembershipState.newInstance("routerId", "ns1",
"ns1-ha2", "cluster1", "block1", "rpc2",
"service2", "lifeline2", "https", "nn02",
FederationNamenodeServiceState.STANDBY, false), false, false);
// load the cache
service.loadDriver();
MembershipNamenodeResolver resolver = new MembershipNamenodeResolver(conf, service);
service.refreshCaches(true);
// look up block1
List<? extends FederationNamenodeContext> result =
resolver.getNamenodesForBlockPoolId("block1");
assertEquals(2, result.size());
// cause io errors and then reload the cache
driver.setGiveErrors(true);
long previousUpdate = service.getCacheUpdateTime();
service.refreshCaches(true);
assertEquals(previousUpdate, service.getCacheUpdateTime());
// make sure the old cache is still there
result = resolver.getNamenodesForBlockPoolId("block1");
assertEquals(2, result.size());
service.stop();
}
} }