HDFS-16851: RBF: Add a utility to dump the StateStore. (#5155)

This commit is contained in:
Owen O'Malley 2022-11-29 22:12:35 +00:00 committed by GitHub
parent 0ef572abed
commit 03471a736c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 177 additions and 10 deletions

View File

@ -272,6 +272,15 @@ public <T extends RecordStore<?>> T getRegisteredRecordStore(
return null;
}
/**
* Get the list of all RecordStores.
* @return a list of each RecordStore.
*/
@SuppressWarnings("unchecked")
public <T extends RecordStore<? extends BaseRecord>> List<T> getRecordStores() {
return new ArrayList<>((Collection<T>) recordStores.values());
}
/**
* List of records supported by this State Store.
*

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.tools.federation;
import java.io.IOException;
import java.io.PrintStream;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Collection;
@ -26,6 +27,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.TreeMap;
import java.util.regex.Pattern;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@ -46,6 +48,10 @@
import org.apache.hadoop.hdfs.server.federation.router.RouterClient;
import org.apache.hadoop.hdfs.server.federation.router.RouterQuotaUsage;
import org.apache.hadoop.hdfs.server.federation.router.RouterStateManager;
import org.apache.hadoop.hdfs.server.federation.store.CachedRecordStore;
import org.apache.hadoop.hdfs.server.federation.store.RecordStore;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.DisableNameserviceRequest;
@ -70,7 +76,9 @@
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.PBRecord;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RefreshResponse;
@ -97,6 +105,7 @@
public class RouterAdmin extends Configured implements Tool {
private static final Logger LOG = LoggerFactory.getLogger(RouterAdmin.class);
private static final String DUMP_COMMAND = "-dumpState";
private RouterClient client;
@ -133,7 +142,7 @@ private String getUsage(String cmd) {
String[] commands =
{"-add", "-update", "-rm", "-ls", "-getDestination", "-setQuota",
"-setStorageTypeQuota", "-clrQuota", "-clrStorageTypeQuota",
"-safemode", "-nameservice", "-getDisabledNameservices",
DUMP_COMMAND, "-safemode", "-nameservice", "-getDisabledNameservices",
"-refresh", "-refreshRouterArgs",
"-refreshSuperUserGroupsConfiguration", "-refreshCallQueue"};
StringBuilder usage = new StringBuilder();
@ -187,6 +196,8 @@ private String getUsage(String cmd) {
return "\t[-refreshSuperUserGroupsConfiguration]";
} else if (cmd.equals("-refreshCallQueue")) {
return "\t[-refreshCallQueue]";
} else if (cmd.equals(DUMP_COMMAND)) {
return "\t[" + DUMP_COMMAND + "]";
}
return getUsage(null);
}
@ -224,7 +235,8 @@ private void validateMax(String[] arg) {
if (arg.length > 1) {
throw new IllegalArgumentException("No arguments allowed");
}
} else if (arg[0].equals("-refreshCallQueue")) {
} else if (arg[0].equals("-refreshCallQueue") ||
arg[0].equals(DUMP_COMMAND)) {
if (arg.length > 1) {
throw new IllegalArgumentException("No arguments allowed");
}
@ -286,6 +298,15 @@ private boolean validateMin(String[] argv) {
return true;
}
/**
* Does this command run in the local process?
* @param cmd the string of the command
* @return is this a local command?
*/
boolean isLocalCommand(String cmd) {
return DUMP_COMMAND.equals(cmd);
}
@Override
public int run(String[] argv) throws Exception {
if (argv.length < 1) {
@ -303,6 +324,10 @@ public int run(String[] argv) throws Exception {
System.err.println("Not enough parameters specificed for cmd " + cmd);
printUsage(cmd);
return exitCode;
} else if (isLocalCommand(argv[0])) {
if (DUMP_COMMAND.equals(argv[0])) {
return dumpStateStore(getConf(), System.out) ? 0 : -1;
}
}
String address = null;
// Initialize RouterClient
@ -1301,6 +1326,49 @@ private int refreshCallQueue() throws IOException {
return returnCode;
}
/**
* Dumps the contents of the StateStore to stdout.
* @return true if it was successful
*/
public static boolean dumpStateStore(Configuration conf,
PrintStream output) throws IOException {
StateStoreService service = new StateStoreService();
conf.setBoolean(RBFConfigKeys.DFS_ROUTER_METRICS_ENABLE, false);
service.init(conf);
service.loadDriver();
if (!service.isDriverReady()) {
System.err.println("Can't initialize driver");
return false;
}
// Get the stores sorted by name
Map<String, RecordStore<? extends BaseRecord>> stores = new TreeMap<>();
for(RecordStore<? extends BaseRecord> store: service.getRecordStores()) {
String recordName = StateStoreUtils.getRecordName(store.getRecordClass());
stores.put(recordName, store);
}
for (Entry<String, RecordStore<? extends BaseRecord>> pair: stores.entrySet()) {
String recordName = pair.getKey();
RecordStore<? extends BaseRecord> store = pair.getValue();
output.println("---- " + recordName + " ----");
if (store instanceof CachedRecordStore) {
for (Object record: ((CachedRecordStore) store).getCachedRecords()) {
if (record instanceof BaseRecord && record instanceof PBRecord) {
BaseRecord baseRecord = (BaseRecord) record;
// Generate the pseudo-json format of the protobuf record
String recordString = ((PBRecord) record).getProto().toString();
// Indent each line
recordString = " " + recordString.replaceAll("\n", "\n ");
output.println(String.format(" %s:", baseRecord.getPrimaryKey()));
output.println(recordString);
}
}
output.println();
}
}
service.stop();
return true;
}
/**
* Normalize a path for that filesystem.
*
@ -1341,4 +1409,4 @@ public FsPermission getMode() {
return mode;
}
}
}
}

View File

@ -328,6 +328,17 @@ To trigger a runtime-refresh of the resource specified by \<key\> on \<host:ipc\
[hdfs]$ $HADOOP_HOME/bin/hdfs dfsrouteradmin -refreshRouterArgs <host:ipc_port> <key> [arg1..argn]
### Router state dump
To diagnose the current state of the routers, you can use the
dumpState command. It generates a text dump of the records in the
State Store. Since it uses the configuration to find and read the
state store, it is often easiest to use the machine where the routers
run. The command runs locally, so the routers do not have to be up to
use this command.
[hdfs]$ $HADOOP_HOME/bin/hdfs dfsrouteradmin -dumpState
Client configuration
--------------------

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -42,16 +42,20 @@
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
import org.apache.hadoop.hdfs.server.federation.metrics.RBFMetrics;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.MultipleDestinationMountTableResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
import org.apache.hadoop.hdfs.server.federation.store.impl.DisabledNameserviceStoreImpl;
import org.apache.hadoop.hdfs.server.federation.store.impl.MountTableStoreImpl;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
import org.apache.hadoop.hdfs.server.federation.store.records.MockStateStoreDriver;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.apache.hadoop.hdfs.tools.federation.RouterAdmin;
import org.apache.hadoop.security.UserGroupInformation;
@ -852,6 +856,7 @@ public void testInvalidArgumentMessage() throws Exception {
+ " <quota in bytes or quota size string>]\n"
+ "\t[-clrQuota <path>]\n"
+ "\t[-clrStorageTypeQuota <path>]\n"
+ "\t[-dumpState]\n"
+ "\t[-safemode enter | leave | get]\n"
+ "\t[-nameservice enable | disable <nameservice>]\n"
+ "\t[-getDisabledNameservices]\n"
@ -1759,6 +1764,72 @@ public void testRefreshCallQueue() throws Exception {
assertTrue(err.toString().contains("No arguments allowed"));
}
@Test
public void testDumpState() throws Exception {
MockStateStoreDriver driver = new MockStateStoreDriver();
driver.clearAll();
// Add two records for block1
driver.put(MembershipState.newInstance("routerId", "ns1",
"ns1-ha1", "cluster1", "block1", "rpc1",
"service1", "lifeline1", "https", "nn01",
FederationNamenodeServiceState.ACTIVE, false), false, false);
driver.put(MembershipState.newInstance("routerId", "ns1",
"ns1-ha2", "cluster1", "block1", "rpc2",
"service2", "lifeline2", "https", "nn02",
FederationNamenodeServiceState.STANDBY, false), false, false);
Configuration conf = new Configuration();
conf.setClass(RBFConfigKeys.FEDERATION_STORE_DRIVER_CLASS,
MockStateStoreDriver.class,
StateStoreDriver.class);
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
try (PrintStream stream = new PrintStream(buffer)) {
RouterAdmin.dumpStateStore(conf, stream);
}
final String expected =
"---- DisabledNameservice ----\n" +
"\n" +
"---- MembershipState ----\n" +
" ns1-ha1-ns1-routerId:\n" +
" dateCreated: XXX\n" +
" dateModified: XXX\n" +
" routerId: \"routerId\"\n" +
" nameserviceId: \"ns1\"\n" +
" namenodeId: \"ns1-ha1\"\n" +
" clusterId: \"cluster1\"\n" +
" blockPoolId: \"block1\"\n" +
" webAddress: \"nn01\"\n" +
" rpcAddress: \"rpc1\"\n" +
" serviceAddress: \"service1\"\n" +
" lifelineAddress: \"lifeline1\"\n" +
" state: \"ACTIVE\"\n" +
" isSafeMode: false\n" +
" webScheme: \"https\"\n" +
" \n" +
" ns1-ha2-ns1-routerId:\n" +
" dateCreated: XXX\n" +
" dateModified: XXX\n" +
" routerId: \"routerId\"\n" +
" nameserviceId: \"ns1\"\n" +
" namenodeId: \"ns1-ha2\"\n" +
" clusterId: \"cluster1\"\n" +
" blockPoolId: \"block1\"\n" +
" webAddress: \"nn02\"\n" +
" rpcAddress: \"rpc2\"\n" +
" serviceAddress: \"service2\"\n" +
" lifelineAddress: \"lifeline2\"\n" +
" state: \"STANDBY\"\n" +
" isSafeMode: false\n" +
" webScheme: \"https\"\n" +
" \n" +
"\n" +
"---- MountTable ----\n" +
"\n" +
"---- RouterState ----";
// Replace the time values with XXX
assertEquals(expected,
buffer.toString().trim().replaceAll("[0-9]{4,}+", "XXX"));
}
private void addMountTable(String src, String nsId, String dst)
throws Exception {
String[] argv = new String[] {"-add", src, nsId, dst};

View File

@ -35,7 +35,7 @@
public class MockStateStoreDriver extends StateStoreBaseImpl {
private boolean giveErrors = false;
private boolean initialized = false;
private final Map<String, Map<String, BaseRecord>> valueMap = new HashMap<>();
private static final Map<String, Map<String, BaseRecord>> VALUE_MAP = new HashMap<>();
@Override
public boolean initDriver() {
@ -56,7 +56,7 @@ public boolean isDriverReady() {
@Override
public void close() throws Exception {
valueMap.clear();
VALUE_MAP.clear();
initialized = false;
}
@ -82,7 +82,7 @@ private void checkErrors() throws IOException {
@SuppressWarnings("unchecked")
public <T extends BaseRecord> QueryResult<T> get(Class<T> clazz) throws IOException {
checkErrors();
Map<String, BaseRecord> map = valueMap.get(StateStoreUtils.getRecordName(clazz));
Map<String, BaseRecord> map = VALUE_MAP.get(StateStoreUtils.getRecordName(clazz));
List<T> results =
map != null ? new ArrayList<>((Collection<T>) map.values()) : new ArrayList<>();
return new QueryResult<>(results, System.currentTimeMillis());
@ -96,7 +96,7 @@ public <T extends BaseRecord> boolean putAll(List<T> records,
checkErrors();
for (T record : records) {
Map<String, BaseRecord> map =
valueMap.computeIfAbsent(StateStoreUtils.getRecordName(record.getClass()),
VALUE_MAP.computeIfAbsent(StateStoreUtils.getRecordName(record.getClass()),
k -> new HashMap<>());
String key = record.getPrimaryKey();
BaseRecord oldRecord = map.get(key);
@ -110,10 +110,17 @@ public <T extends BaseRecord> boolean putAll(List<T> records,
return true;
}
/**
* Clear all records from the store.
*/
public void clearAll() {
VALUE_MAP.clear();
}
@Override
public <T extends BaseRecord> boolean removeAll(Class<T> clazz) throws IOException {
checkErrors();
return valueMap.remove(StateStoreUtils.getRecordName(clazz)) != null;
return VALUE_MAP.remove(StateStoreUtils.getRecordName(clazz)) != null;
}
@Override
@ -124,7 +131,7 @@ public <T extends BaseRecord> int remove(Class<T> clazz,
checkErrors();
int result = 0;
Map<String, BaseRecord> map =
valueMap.get(StateStoreUtils.getRecordName(clazz));
VALUE_MAP.get(StateStoreUtils.getRecordName(clazz));
if (map != null) {
for (Iterator<BaseRecord> itr = map.values().iterator(); itr.hasNext();) {
BaseRecord record = itr.next();

View File

@ -101,6 +101,7 @@ public void testStateStoreResilience() throws Exception {
conf.setBoolean(RBFConfigKeys.DFS_ROUTER_METRICS_ENABLE, false);
service.init(conf);
MockStateStoreDriver driver = (MockStateStoreDriver) service.getDriver();
driver.clearAll();
// Add two records for block1
driver.put(MembershipState.newInstance("routerId", "ns1",
"ns1-ha1", "cluster1", "block1", "rpc1",