HDFS-16851: RBF: Add a utility to dump the StateStore. (#5155)
This commit is contained in:
parent
f29d9a11bc
commit
6b23e70539
@ -272,6 +272,15 @@ public <T extends RecordStore<?>> T getRegisteredRecordStore(
|
|||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the list of all RecordStores.
|
||||||
|
* @return a list of each RecordStore.
|
||||||
|
*/
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public <T extends RecordStore<? extends BaseRecord>> List<T> getRecordStores() {
|
||||||
|
return new ArrayList<>((Collection<T>) recordStores.values());
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* List of records supported by this State Store.
|
* List of records supported by this State Store.
|
||||||
*
|
*
|
||||||
|
@ -18,6 +18,7 @@
|
|||||||
package org.apache.hadoop.hdfs.tools.federation;
|
package org.apache.hadoop.hdfs.tools.federation;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.io.PrintStream;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
@ -26,6 +27,7 @@
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
|
import java.util.TreeMap;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
@ -46,6 +48,10 @@
|
|||||||
import org.apache.hadoop.hdfs.server.federation.router.RouterClient;
|
import org.apache.hadoop.hdfs.server.federation.router.RouterClient;
|
||||||
import org.apache.hadoop.hdfs.server.federation.router.RouterQuotaUsage;
|
import org.apache.hadoop.hdfs.server.federation.router.RouterQuotaUsage;
|
||||||
import org.apache.hadoop.hdfs.server.federation.router.RouterStateManager;
|
import org.apache.hadoop.hdfs.server.federation.router.RouterStateManager;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.store.CachedRecordStore;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.store.RecordStore;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils;
|
||||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
|
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
|
||||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
|
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
|
||||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.DisableNameserviceRequest;
|
import org.apache.hadoop.hdfs.server.federation.store.protocol.DisableNameserviceRequest;
|
||||||
@ -70,7 +76,9 @@
|
|||||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse;
|
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse;
|
||||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
|
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
|
||||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse;
|
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
|
||||||
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
|
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.PBRecord;
|
||||||
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
|
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
|
||||||
import org.apache.hadoop.ipc.RPC;
|
import org.apache.hadoop.ipc.RPC;
|
||||||
import org.apache.hadoop.ipc.RefreshResponse;
|
import org.apache.hadoop.ipc.RefreshResponse;
|
||||||
@ -97,6 +105,7 @@
|
|||||||
public class RouterAdmin extends Configured implements Tool {
|
public class RouterAdmin extends Configured implements Tool {
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(RouterAdmin.class);
|
private static final Logger LOG = LoggerFactory.getLogger(RouterAdmin.class);
|
||||||
|
private static final String DUMP_COMMAND = "-dumpState";
|
||||||
|
|
||||||
private RouterClient client;
|
private RouterClient client;
|
||||||
|
|
||||||
@ -133,7 +142,7 @@ private String getUsage(String cmd) {
|
|||||||
String[] commands =
|
String[] commands =
|
||||||
{"-add", "-update", "-rm", "-ls", "-getDestination", "-setQuota",
|
{"-add", "-update", "-rm", "-ls", "-getDestination", "-setQuota",
|
||||||
"-setStorageTypeQuota", "-clrQuota", "-clrStorageTypeQuota",
|
"-setStorageTypeQuota", "-clrQuota", "-clrStorageTypeQuota",
|
||||||
"-safemode", "-nameservice", "-getDisabledNameservices",
|
DUMP_COMMAND, "-safemode", "-nameservice", "-getDisabledNameservices",
|
||||||
"-refresh", "-refreshRouterArgs",
|
"-refresh", "-refreshRouterArgs",
|
||||||
"-refreshSuperUserGroupsConfiguration", "-refreshCallQueue"};
|
"-refreshSuperUserGroupsConfiguration", "-refreshCallQueue"};
|
||||||
StringBuilder usage = new StringBuilder();
|
StringBuilder usage = new StringBuilder();
|
||||||
@ -187,6 +196,8 @@ private String getUsage(String cmd) {
|
|||||||
return "\t[-refreshSuperUserGroupsConfiguration]";
|
return "\t[-refreshSuperUserGroupsConfiguration]";
|
||||||
} else if (cmd.equals("-refreshCallQueue")) {
|
} else if (cmd.equals("-refreshCallQueue")) {
|
||||||
return "\t[-refreshCallQueue]";
|
return "\t[-refreshCallQueue]";
|
||||||
|
} else if (cmd.equals(DUMP_COMMAND)) {
|
||||||
|
return "\t[" + DUMP_COMMAND + "]";
|
||||||
}
|
}
|
||||||
return getUsage(null);
|
return getUsage(null);
|
||||||
}
|
}
|
||||||
@ -224,7 +235,8 @@ private void validateMax(String[] arg) {
|
|||||||
if (arg.length > 1) {
|
if (arg.length > 1) {
|
||||||
throw new IllegalArgumentException("No arguments allowed");
|
throw new IllegalArgumentException("No arguments allowed");
|
||||||
}
|
}
|
||||||
} else if (arg[0].equals("-refreshCallQueue")) {
|
} else if (arg[0].equals("-refreshCallQueue") ||
|
||||||
|
arg[0].equals(DUMP_COMMAND)) {
|
||||||
if (arg.length > 1) {
|
if (arg.length > 1) {
|
||||||
throw new IllegalArgumentException("No arguments allowed");
|
throw new IllegalArgumentException("No arguments allowed");
|
||||||
}
|
}
|
||||||
@ -286,6 +298,15 @@ private boolean validateMin(String[] argv) {
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Does this command run in the local process?
|
||||||
|
* @param cmd the string of the command
|
||||||
|
* @return is this a local command?
|
||||||
|
*/
|
||||||
|
boolean isLocalCommand(String cmd) {
|
||||||
|
return DUMP_COMMAND.equals(cmd);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int run(String[] argv) throws Exception {
|
public int run(String[] argv) throws Exception {
|
||||||
if (argv.length < 1) {
|
if (argv.length < 1) {
|
||||||
@ -303,6 +324,10 @@ public int run(String[] argv) throws Exception {
|
|||||||
System.err.println("Not enough parameters specificed for cmd " + cmd);
|
System.err.println("Not enough parameters specificed for cmd " + cmd);
|
||||||
printUsage(cmd);
|
printUsage(cmd);
|
||||||
return exitCode;
|
return exitCode;
|
||||||
|
} else if (isLocalCommand(argv[0])) {
|
||||||
|
if (DUMP_COMMAND.equals(argv[0])) {
|
||||||
|
return dumpStateStore(getConf(), System.out) ? 0 : -1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
String address = null;
|
String address = null;
|
||||||
// Initialize RouterClient
|
// Initialize RouterClient
|
||||||
@ -1301,6 +1326,49 @@ private int refreshCallQueue() throws IOException {
|
|||||||
return returnCode;
|
return returnCode;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Dumps the contents of the StateStore to stdout.
|
||||||
|
* @return true if it was successful
|
||||||
|
*/
|
||||||
|
public static boolean dumpStateStore(Configuration conf,
|
||||||
|
PrintStream output) throws IOException {
|
||||||
|
StateStoreService service = new StateStoreService();
|
||||||
|
conf.setBoolean(RBFConfigKeys.DFS_ROUTER_METRICS_ENABLE, false);
|
||||||
|
service.init(conf);
|
||||||
|
service.loadDriver();
|
||||||
|
if (!service.isDriverReady()) {
|
||||||
|
System.err.println("Can't initialize driver");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
// Get the stores sorted by name
|
||||||
|
Map<String, RecordStore<? extends BaseRecord>> stores = new TreeMap<>();
|
||||||
|
for(RecordStore<? extends BaseRecord> store: service.getRecordStores()) {
|
||||||
|
String recordName = StateStoreUtils.getRecordName(store.getRecordClass());
|
||||||
|
stores.put(recordName, store);
|
||||||
|
}
|
||||||
|
for (Entry<String, RecordStore<? extends BaseRecord>> pair: stores.entrySet()) {
|
||||||
|
String recordName = pair.getKey();
|
||||||
|
RecordStore<? extends BaseRecord> store = pair.getValue();
|
||||||
|
output.println("---- " + recordName + " ----");
|
||||||
|
if (store instanceof CachedRecordStore) {
|
||||||
|
for (Object record: ((CachedRecordStore) store).getCachedRecords()) {
|
||||||
|
if (record instanceof BaseRecord && record instanceof PBRecord) {
|
||||||
|
BaseRecord baseRecord = (BaseRecord) record;
|
||||||
|
// Generate the pseudo-json format of the protobuf record
|
||||||
|
String recordString = ((PBRecord) record).getProto().toString();
|
||||||
|
// Indent each line
|
||||||
|
recordString = " " + recordString.replaceAll("\n", "\n ");
|
||||||
|
output.println(String.format(" %s:", baseRecord.getPrimaryKey()));
|
||||||
|
output.println(recordString);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
output.println();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
service.stop();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Normalize a path for that filesystem.
|
* Normalize a path for that filesystem.
|
||||||
*
|
*
|
||||||
@ -1341,4 +1409,4 @@ public FsPermission getMode() {
|
|||||||
return mode;
|
return mode;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -316,6 +316,17 @@ To trigger a runtime-refresh of the resource specified by \<key\> on \<host:ipc\
|
|||||||
|
|
||||||
[hdfs]$ $HADOOP_HOME/bin/hdfs dfsrouteradmin -refreshRouterArgs <host:ipc_port> <key> [arg1..argn]
|
[hdfs]$ $HADOOP_HOME/bin/hdfs dfsrouteradmin -refreshRouterArgs <host:ipc_port> <key> [arg1..argn]
|
||||||
|
|
||||||
|
### Router state dump
|
||||||
|
|
||||||
|
To diagnose the current state of the routers, you can use the
|
||||||
|
dumpState command. It generates a text dump of the records in the
|
||||||
|
State Store. Since it uses the configuration to find and read the
|
||||||
|
state store, it is often easiest to use the machine where the routers
|
||||||
|
run. The command runs locally, so the routers do not have to be up to
|
||||||
|
use this command.
|
||||||
|
|
||||||
|
[hdfs]$ $HADOOP_HOME/bin/hdfs dfsrouteradmin -dumpState
|
||||||
|
|
||||||
Client configuration
|
Client configuration
|
||||||
--------------------
|
--------------------
|
||||||
|
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
/**
|
/*
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
* or more contributor license agreements. See the NOTICE file
|
* or more contributor license agreements. See the NOTICE file
|
||||||
* distributed with this work for additional information
|
* distributed with this work for additional information
|
||||||
@ -42,16 +42,20 @@
|
|||||||
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
|
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.server.federation.metrics.RBFMetrics;
|
import org.apache.hadoop.hdfs.server.federation.metrics.RBFMetrics;
|
||||||
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
|
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
|
||||||
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
|
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
|
||||||
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
|
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
|
||||||
import org.apache.hadoop.hdfs.server.federation.resolver.MultipleDestinationMountTableResolver;
|
import org.apache.hadoop.hdfs.server.federation.resolver.MultipleDestinationMountTableResolver;
|
||||||
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
|
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
|
||||||
import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
|
import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
|
||||||
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
|
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
|
||||||
import org.apache.hadoop.hdfs.server.federation.store.impl.DisabledNameserviceStoreImpl;
|
import org.apache.hadoop.hdfs.server.federation.store.impl.DisabledNameserviceStoreImpl;
|
||||||
import org.apache.hadoop.hdfs.server.federation.store.impl.MountTableStoreImpl;
|
import org.apache.hadoop.hdfs.server.federation.store.impl.MountTableStoreImpl;
|
||||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
|
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
|
||||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
|
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.store.records.MockStateStoreDriver;
|
||||||
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
|
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
|
||||||
import org.apache.hadoop.hdfs.tools.federation.RouterAdmin;
|
import org.apache.hadoop.hdfs.tools.federation.RouterAdmin;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
@ -671,6 +675,7 @@ public void testInvalidArgumentMessage() throws Exception {
|
|||||||
+ " <quota in bytes or quota size string>]\n"
|
+ " <quota in bytes or quota size string>]\n"
|
||||||
+ "\t[-clrQuota <path>]\n"
|
+ "\t[-clrQuota <path>]\n"
|
||||||
+ "\t[-clrStorageTypeQuota <path>]\n"
|
+ "\t[-clrStorageTypeQuota <path>]\n"
|
||||||
|
+ "\t[-dumpState]\n"
|
||||||
+ "\t[-safemode enter | leave | get]\n"
|
+ "\t[-safemode enter | leave | get]\n"
|
||||||
+ "\t[-nameservice enable | disable <nameservice>]\n"
|
+ "\t[-nameservice enable | disable <nameservice>]\n"
|
||||||
+ "\t[-getDisabledNameservices]\n"
|
+ "\t[-getDisabledNameservices]\n"
|
||||||
@ -1578,6 +1583,72 @@ public void testRefreshCallQueue() throws Exception {
|
|||||||
assertTrue(err.toString().contains("No arguments allowed"));
|
assertTrue(err.toString().contains("No arguments allowed"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDumpState() throws Exception {
|
||||||
|
MockStateStoreDriver driver = new MockStateStoreDriver();
|
||||||
|
driver.clearAll();
|
||||||
|
// Add two records for block1
|
||||||
|
driver.put(MembershipState.newInstance("routerId", "ns1",
|
||||||
|
"ns1-ha1", "cluster1", "block1", "rpc1",
|
||||||
|
"service1", "lifeline1", "https", "nn01",
|
||||||
|
FederationNamenodeServiceState.ACTIVE, false), false, false);
|
||||||
|
driver.put(MembershipState.newInstance("routerId", "ns1",
|
||||||
|
"ns1-ha2", "cluster1", "block1", "rpc2",
|
||||||
|
"service2", "lifeline2", "https", "nn02",
|
||||||
|
FederationNamenodeServiceState.STANDBY, false), false, false);
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.setClass(RBFConfigKeys.FEDERATION_STORE_DRIVER_CLASS,
|
||||||
|
MockStateStoreDriver.class,
|
||||||
|
StateStoreDriver.class);
|
||||||
|
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
|
||||||
|
try (PrintStream stream = new PrintStream(buffer)) {
|
||||||
|
RouterAdmin.dumpStateStore(conf, stream);
|
||||||
|
}
|
||||||
|
final String expected =
|
||||||
|
"---- DisabledNameservice ----\n" +
|
||||||
|
"\n" +
|
||||||
|
"---- MembershipState ----\n" +
|
||||||
|
" ns1-ha1-ns1-routerId:\n" +
|
||||||
|
" dateCreated: XXX\n" +
|
||||||
|
" dateModified: XXX\n" +
|
||||||
|
" routerId: \"routerId\"\n" +
|
||||||
|
" nameserviceId: \"ns1\"\n" +
|
||||||
|
" namenodeId: \"ns1-ha1\"\n" +
|
||||||
|
" clusterId: \"cluster1\"\n" +
|
||||||
|
" blockPoolId: \"block1\"\n" +
|
||||||
|
" webAddress: \"nn01\"\n" +
|
||||||
|
" rpcAddress: \"rpc1\"\n" +
|
||||||
|
" serviceAddress: \"service1\"\n" +
|
||||||
|
" lifelineAddress: \"lifeline1\"\n" +
|
||||||
|
" state: \"ACTIVE\"\n" +
|
||||||
|
" isSafeMode: false\n" +
|
||||||
|
" webScheme: \"https\"\n" +
|
||||||
|
" \n" +
|
||||||
|
" ns1-ha2-ns1-routerId:\n" +
|
||||||
|
" dateCreated: XXX\n" +
|
||||||
|
" dateModified: XXX\n" +
|
||||||
|
" routerId: \"routerId\"\n" +
|
||||||
|
" nameserviceId: \"ns1\"\n" +
|
||||||
|
" namenodeId: \"ns1-ha2\"\n" +
|
||||||
|
" clusterId: \"cluster1\"\n" +
|
||||||
|
" blockPoolId: \"block1\"\n" +
|
||||||
|
" webAddress: \"nn02\"\n" +
|
||||||
|
" rpcAddress: \"rpc2\"\n" +
|
||||||
|
" serviceAddress: \"service2\"\n" +
|
||||||
|
" lifelineAddress: \"lifeline2\"\n" +
|
||||||
|
" state: \"STANDBY\"\n" +
|
||||||
|
" isSafeMode: false\n" +
|
||||||
|
" webScheme: \"https\"\n" +
|
||||||
|
" \n" +
|
||||||
|
"\n" +
|
||||||
|
"---- MountTable ----\n" +
|
||||||
|
"\n" +
|
||||||
|
"---- RouterState ----";
|
||||||
|
// Replace the time values with XXX
|
||||||
|
assertEquals(expected,
|
||||||
|
buffer.toString().trim().replaceAll("[0-9]{4,}+", "XXX"));
|
||||||
|
}
|
||||||
|
|
||||||
private void addMountTable(String src, String nsId, String dst)
|
private void addMountTable(String src, String nsId, String dst)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
String[] argv = new String[] {"-add", src, nsId, dst};
|
String[] argv = new String[] {"-add", src, nsId, dst};
|
||||||
|
@ -35,7 +35,7 @@
|
|||||||
public class MockStateStoreDriver extends StateStoreBaseImpl {
|
public class MockStateStoreDriver extends StateStoreBaseImpl {
|
||||||
private boolean giveErrors = false;
|
private boolean giveErrors = false;
|
||||||
private boolean initialized = false;
|
private boolean initialized = false;
|
||||||
private final Map<String, Map<String, BaseRecord>> valueMap = new HashMap<>();
|
private static final Map<String, Map<String, BaseRecord>> VALUE_MAP = new HashMap<>();
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean initDriver() {
|
public boolean initDriver() {
|
||||||
@ -56,7 +56,7 @@ public boolean isDriverReady() {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() throws Exception {
|
public void close() throws Exception {
|
||||||
valueMap.clear();
|
VALUE_MAP.clear();
|
||||||
initialized = false;
|
initialized = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -82,7 +82,7 @@ private void checkErrors() throws IOException {
|
|||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public <T extends BaseRecord> QueryResult<T> get(Class<T> clazz) throws IOException {
|
public <T extends BaseRecord> QueryResult<T> get(Class<T> clazz) throws IOException {
|
||||||
checkErrors();
|
checkErrors();
|
||||||
Map<String, BaseRecord> map = valueMap.get(StateStoreUtils.getRecordName(clazz));
|
Map<String, BaseRecord> map = VALUE_MAP.get(StateStoreUtils.getRecordName(clazz));
|
||||||
List<T> results =
|
List<T> results =
|
||||||
map != null ? new ArrayList<>((Collection<T>) map.values()) : new ArrayList<>();
|
map != null ? new ArrayList<>((Collection<T>) map.values()) : new ArrayList<>();
|
||||||
return new QueryResult<>(results, System.currentTimeMillis());
|
return new QueryResult<>(results, System.currentTimeMillis());
|
||||||
@ -96,7 +96,7 @@ public <T extends BaseRecord> boolean putAll(List<T> records,
|
|||||||
checkErrors();
|
checkErrors();
|
||||||
for (T record : records) {
|
for (T record : records) {
|
||||||
Map<String, BaseRecord> map =
|
Map<String, BaseRecord> map =
|
||||||
valueMap.computeIfAbsent(StateStoreUtils.getRecordName(record.getClass()),
|
VALUE_MAP.computeIfAbsent(StateStoreUtils.getRecordName(record.getClass()),
|
||||||
k -> new HashMap<>());
|
k -> new HashMap<>());
|
||||||
String key = record.getPrimaryKey();
|
String key = record.getPrimaryKey();
|
||||||
BaseRecord oldRecord = map.get(key);
|
BaseRecord oldRecord = map.get(key);
|
||||||
@ -110,10 +110,17 @@ public <T extends BaseRecord> boolean putAll(List<T> records,
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Clear all records from the store.
|
||||||
|
*/
|
||||||
|
public void clearAll() {
|
||||||
|
VALUE_MAP.clear();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <T extends BaseRecord> boolean removeAll(Class<T> clazz) throws IOException {
|
public <T extends BaseRecord> boolean removeAll(Class<T> clazz) throws IOException {
|
||||||
checkErrors();
|
checkErrors();
|
||||||
return valueMap.remove(StateStoreUtils.getRecordName(clazz)) != null;
|
return VALUE_MAP.remove(StateStoreUtils.getRecordName(clazz)) != null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -124,7 +131,7 @@ public <T extends BaseRecord> int remove(Class<T> clazz,
|
|||||||
checkErrors();
|
checkErrors();
|
||||||
int result = 0;
|
int result = 0;
|
||||||
Map<String, BaseRecord> map =
|
Map<String, BaseRecord> map =
|
||||||
valueMap.get(StateStoreUtils.getRecordName(clazz));
|
VALUE_MAP.get(StateStoreUtils.getRecordName(clazz));
|
||||||
if (map != null) {
|
if (map != null) {
|
||||||
for (Iterator<BaseRecord> itr = map.values().iterator(); itr.hasNext();) {
|
for (Iterator<BaseRecord> itr = map.values().iterator(); itr.hasNext();) {
|
||||||
BaseRecord record = itr.next();
|
BaseRecord record = itr.next();
|
||||||
|
@ -101,6 +101,7 @@ public void testStateStoreResilience() throws Exception {
|
|||||||
conf.setBoolean(RBFConfigKeys.DFS_ROUTER_METRICS_ENABLE, false);
|
conf.setBoolean(RBFConfigKeys.DFS_ROUTER_METRICS_ENABLE, false);
|
||||||
service.init(conf);
|
service.init(conf);
|
||||||
MockStateStoreDriver driver = (MockStateStoreDriver) service.getDriver();
|
MockStateStoreDriver driver = (MockStateStoreDriver) service.getDriver();
|
||||||
|
driver.clearAll();
|
||||||
// Add two records for block1
|
// Add two records for block1
|
||||||
driver.put(MembershipState.newInstance("routerId", "ns1",
|
driver.put(MembershipState.newInstance("routerId", "ns1",
|
||||||
"ns1-ha1", "cluster1", "block1", "rpc1",
|
"ns1-ha1", "cluster1", "block1", "rpc1",
|
||||||
|
Loading…
Reference in New Issue
Block a user