From 6b23e70539cfae6d47c20cb5c4a0ad4261964ca4 Mon Sep 17 00:00:00 2001 From: Owen O'Malley Date: Tue, 29 Nov 2022 22:12:35 +0000 Subject: [PATCH] HDFS-16851: RBF: Add a utility to dump the StateStore. (#5155) --- .../federation/store/StateStoreService.java | 9 +++ .../hdfs/tools/federation/RouterAdmin.java | 74 ++++++++++++++++++- .../src/site/markdown/HDFSRouterFederation.md | 11 +++ .../federation/router/TestRouterAdminCLI.java | 73 +++++++++++++++++- .../store/records/MockStateStoreDriver.java | 19 +++-- .../store/records/TestRouterState.java | 1 + 6 files changed, 177 insertions(+), 10 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java index 95a3858832..a401805794 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java @@ -272,6 +272,15 @@ public > T getRegisteredRecordStore( return null; } + /** + * Get the list of all RecordStores. + * @return a list of each RecordStore. + */ + @SuppressWarnings("unchecked") + public > List getRecordStores() { + return new ArrayList<>((Collection) recordStores.values()); + } + /** * List of records supported by this State Store. * diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java index b8e7c796a1..f7a9424e69 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.tools.federation; import java.io.IOException; +import java.io.PrintStream; import java.net.InetSocketAddress; import java.util.Arrays; import java.util.Collection; @@ -26,6 +27,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.TreeMap; import java.util.regex.Pattern; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -46,6 +48,10 @@ import org.apache.hadoop.hdfs.server.federation.router.RouterClient; import org.apache.hadoop.hdfs.server.federation.router.RouterQuotaUsage; import org.apache.hadoop.hdfs.server.federation.router.RouterStateManager; +import org.apache.hadoop.hdfs.server.federation.store.CachedRecordStore; +import org.apache.hadoop.hdfs.server.federation.store.RecordStore; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils; import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest; import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse; import org.apache.hadoop.hdfs.server.federation.store.protocol.DisableNameserviceRequest; @@ -70,7 +76,9 @@ import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse; import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest; import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse; +import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; +import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.PBRecord; import org.apache.hadoop.ipc.ProtobufRpcEngine2; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RefreshResponse; @@ -97,6 +105,7 @@ public class RouterAdmin extends Configured implements Tool { private static final Logger LOG = LoggerFactory.getLogger(RouterAdmin.class); + private static final String DUMP_COMMAND = "-dumpState"; private RouterClient client; @@ -133,7 +142,7 @@ private String getUsage(String cmd) { String[] commands = {"-add", "-update", "-rm", "-ls", "-getDestination", "-setQuota", "-setStorageTypeQuota", "-clrQuota", "-clrStorageTypeQuota", - "-safemode", "-nameservice", "-getDisabledNameservices", + DUMP_COMMAND, "-safemode", "-nameservice", "-getDisabledNameservices", "-refresh", "-refreshRouterArgs", "-refreshSuperUserGroupsConfiguration", "-refreshCallQueue"}; StringBuilder usage = new StringBuilder(); @@ -187,6 +196,8 @@ private String getUsage(String cmd) { return "\t[-refreshSuperUserGroupsConfiguration]"; } else if (cmd.equals("-refreshCallQueue")) { return "\t[-refreshCallQueue]"; + } else if (cmd.equals(DUMP_COMMAND)) { + return "\t[" + DUMP_COMMAND + "]"; } return getUsage(null); } @@ -224,7 +235,8 @@ private void validateMax(String[] arg) { if (arg.length > 1) { throw new IllegalArgumentException("No arguments allowed"); } - } else if (arg[0].equals("-refreshCallQueue")) { + } else if (arg[0].equals("-refreshCallQueue") || + arg[0].equals(DUMP_COMMAND)) { if (arg.length > 1) { throw new IllegalArgumentException("No arguments allowed"); } @@ -286,6 +298,15 @@ private boolean validateMin(String[] argv) { return true; } + /** + * Does this command run in the local process? + * @param cmd the string of the command + * @return is this a local command? + */ + boolean isLocalCommand(String cmd) { + return DUMP_COMMAND.equals(cmd); + } + @Override public int run(String[] argv) throws Exception { if (argv.length < 1) { @@ -303,6 +324,10 @@ public int run(String[] argv) throws Exception { System.err.println("Not enough parameters specificed for cmd " + cmd); printUsage(cmd); return exitCode; + } else if (isLocalCommand(argv[0])) { + if (DUMP_COMMAND.equals(argv[0])) { + return dumpStateStore(getConf(), System.out) ? 0 : -1; + } } String address = null; // Initialize RouterClient @@ -1301,6 +1326,49 @@ private int refreshCallQueue() throws IOException { return returnCode; } + /** + * Dumps the contents of the StateStore to stdout. + * @return true if it was successful + */ + public static boolean dumpStateStore(Configuration conf, + PrintStream output) throws IOException { + StateStoreService service = new StateStoreService(); + conf.setBoolean(RBFConfigKeys.DFS_ROUTER_METRICS_ENABLE, false); + service.init(conf); + service.loadDriver(); + if (!service.isDriverReady()) { + System.err.println("Can't initialize driver"); + return false; + } + // Get the stores sorted by name + Map> stores = new TreeMap<>(); + for(RecordStore store: service.getRecordStores()) { + String recordName = StateStoreUtils.getRecordName(store.getRecordClass()); + stores.put(recordName, store); + } + for (Entry> pair: stores.entrySet()) { + String recordName = pair.getKey(); + RecordStore store = pair.getValue(); + output.println("---- " + recordName + " ----"); + if (store instanceof CachedRecordStore) { + for (Object record: ((CachedRecordStore) store).getCachedRecords()) { + if (record instanceof BaseRecord && record instanceof PBRecord) { + BaseRecord baseRecord = (BaseRecord) record; + // Generate the pseudo-json format of the protobuf record + String recordString = ((PBRecord) record).getProto().toString(); + // Indent each line + recordString = " " + recordString.replaceAll("\n", "\n "); + output.println(String.format(" %s:", baseRecord.getPrimaryKey())); + output.println(recordString); + } + } + output.println(); + } + } + service.stop(); + return true; + } + /** * Normalize a path for that filesystem. * @@ -1341,4 +1409,4 @@ public FsPermission getMode() { return mode; } } -} \ No newline at end of file +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md index dce2c65466..a1da6c0ef4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md @@ -316,6 +316,17 @@ To trigger a runtime-refresh of the resource specified by \ on \ [arg1..argn] +### Router state dump + +To diagnose the current state of the routers, you can use the +dumpState command. It generates a text dump of the records in the +State Store. Since it uses the configuration to find and read the +state store, it is often easiest to use the machine where the routers +run. The command runs locally, so the routers do not have to be up to +use this command. + + [hdfs]$ $HADOOP_HOME/bin/hdfs dfsrouteradmin -dumpState + Client configuration -------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java index db6925b10e..559a827dde 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -42,16 +42,20 @@ import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster; import org.apache.hadoop.hdfs.server.federation.metrics.RBFMetrics; import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState; import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager; import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver; import org.apache.hadoop.hdfs.server.federation.resolver.MultipleDestinationMountTableResolver; import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder; import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; import org.apache.hadoop.hdfs.server.federation.store.impl.DisabledNameserviceStoreImpl; import org.apache.hadoop.hdfs.server.federation.store.impl.MountTableStoreImpl; import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest; import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse; +import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState; +import org.apache.hadoop.hdfs.server.federation.store.records.MockStateStoreDriver; import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; import org.apache.hadoop.hdfs.tools.federation.RouterAdmin; import org.apache.hadoop.security.UserGroupInformation; @@ -671,6 +675,7 @@ public void testInvalidArgumentMessage() throws Exception { + " ]\n" + "\t[-clrQuota ]\n" + "\t[-clrStorageTypeQuota ]\n" + + "\t[-dumpState]\n" + "\t[-safemode enter | leave | get]\n" + "\t[-nameservice enable | disable ]\n" + "\t[-getDisabledNameservices]\n" @@ -1578,6 +1583,72 @@ public void testRefreshCallQueue() throws Exception { assertTrue(err.toString().contains("No arguments allowed")); } + @Test + public void testDumpState() throws Exception { + MockStateStoreDriver driver = new MockStateStoreDriver(); + driver.clearAll(); + // Add two records for block1 + driver.put(MembershipState.newInstance("routerId", "ns1", + "ns1-ha1", "cluster1", "block1", "rpc1", + "service1", "lifeline1", "https", "nn01", + FederationNamenodeServiceState.ACTIVE, false), false, false); + driver.put(MembershipState.newInstance("routerId", "ns1", + "ns1-ha2", "cluster1", "block1", "rpc2", + "service2", "lifeline2", "https", "nn02", + FederationNamenodeServiceState.STANDBY, false), false, false); + Configuration conf = new Configuration(); + conf.setClass(RBFConfigKeys.FEDERATION_STORE_DRIVER_CLASS, + MockStateStoreDriver.class, + StateStoreDriver.class); + ByteArrayOutputStream buffer = new ByteArrayOutputStream(); + try (PrintStream stream = new PrintStream(buffer)) { + RouterAdmin.dumpStateStore(conf, stream); + } + final String expected = + "---- DisabledNameservice ----\n" + + "\n" + + "---- MembershipState ----\n" + + " ns1-ha1-ns1-routerId:\n" + + " dateCreated: XXX\n" + + " dateModified: XXX\n" + + " routerId: \"routerId\"\n" + + " nameserviceId: \"ns1\"\n" + + " namenodeId: \"ns1-ha1\"\n" + + " clusterId: \"cluster1\"\n" + + " blockPoolId: \"block1\"\n" + + " webAddress: \"nn01\"\n" + + " rpcAddress: \"rpc1\"\n" + + " serviceAddress: \"service1\"\n" + + " lifelineAddress: \"lifeline1\"\n" + + " state: \"ACTIVE\"\n" + + " isSafeMode: false\n" + + " webScheme: \"https\"\n" + + " \n" + + " ns1-ha2-ns1-routerId:\n" + + " dateCreated: XXX\n" + + " dateModified: XXX\n" + + " routerId: \"routerId\"\n" + + " nameserviceId: \"ns1\"\n" + + " namenodeId: \"ns1-ha2\"\n" + + " clusterId: \"cluster1\"\n" + + " blockPoolId: \"block1\"\n" + + " webAddress: \"nn02\"\n" + + " rpcAddress: \"rpc2\"\n" + + " serviceAddress: \"service2\"\n" + + " lifelineAddress: \"lifeline2\"\n" + + " state: \"STANDBY\"\n" + + " isSafeMode: false\n" + + " webScheme: \"https\"\n" + + " \n" + + "\n" + + "---- MountTable ----\n" + + "\n" + + "---- RouterState ----"; + // Replace the time values with XXX + assertEquals(expected, + buffer.toString().trim().replaceAll("[0-9]{4,}+", "XXX")); + } + private void addMountTable(String src, String nsId, String dst) throws Exception { String[] argv = new String[] {"-add", src, nsId, dst}; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/MockStateStoreDriver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/MockStateStoreDriver.java index 57185a0a60..9f600cb6f3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/MockStateStoreDriver.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/MockStateStoreDriver.java @@ -35,7 +35,7 @@ public class MockStateStoreDriver extends StateStoreBaseImpl { private boolean giveErrors = false; private boolean initialized = false; - private final Map> valueMap = new HashMap<>(); + private static final Map> VALUE_MAP = new HashMap<>(); @Override public boolean initDriver() { @@ -56,7 +56,7 @@ public boolean isDriverReady() { @Override public void close() throws Exception { - valueMap.clear(); + VALUE_MAP.clear(); initialized = false; } @@ -82,7 +82,7 @@ private void checkErrors() throws IOException { @SuppressWarnings("unchecked") public QueryResult get(Class clazz) throws IOException { checkErrors(); - Map map = valueMap.get(StateStoreUtils.getRecordName(clazz)); + Map map = VALUE_MAP.get(StateStoreUtils.getRecordName(clazz)); List results = map != null ? new ArrayList<>((Collection) map.values()) : new ArrayList<>(); return new QueryResult<>(results, System.currentTimeMillis()); @@ -96,7 +96,7 @@ public boolean putAll(List records, checkErrors(); for (T record : records) { Map map = - valueMap.computeIfAbsent(StateStoreUtils.getRecordName(record.getClass()), + VALUE_MAP.computeIfAbsent(StateStoreUtils.getRecordName(record.getClass()), k -> new HashMap<>()); String key = record.getPrimaryKey(); BaseRecord oldRecord = map.get(key); @@ -110,10 +110,17 @@ public boolean putAll(List records, return true; } + /** + * Clear all records from the store. + */ + public void clearAll() { + VALUE_MAP.clear(); + } + @Override public boolean removeAll(Class clazz) throws IOException { checkErrors(); - return valueMap.remove(StateStoreUtils.getRecordName(clazz)) != null; + return VALUE_MAP.remove(StateStoreUtils.getRecordName(clazz)) != null; } @Override @@ -124,7 +131,7 @@ public int remove(Class clazz, checkErrors(); int result = 0; Map map = - valueMap.get(StateStoreUtils.getRecordName(clazz)); + VALUE_MAP.get(StateStoreUtils.getRecordName(clazz)); if (map != null) { for (Iterator itr = map.values().iterator(); itr.hasNext();) { BaseRecord record = itr.next(); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/TestRouterState.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/TestRouterState.java index 4289999429..8226178fe7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/TestRouterState.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/TestRouterState.java @@ -101,6 +101,7 @@ public void testStateStoreResilience() throws Exception { conf.setBoolean(RBFConfigKeys.DFS_ROUTER_METRICS_ENABLE, false); service.init(conf); MockStateStoreDriver driver = (MockStateStoreDriver) service.getDriver(); + driver.clearAll(); // Add two records for block1 driver.put(MembershipState.newInstance("routerId", "ns1", "ns1-ha1", "cluster1", "block1", "rpc1",