HDFS-17534. RBF: Support leader follower mode for multiple subclusters (#6861). Contributed by Yuanbo Liu.
Reviewed-by: Inigo Goiri <inigoiri@apache.org> Signed-off-by: Ayush Saxena <ayushsaxena@apache.org>
This commit is contained in:
parent
783a852029
commit
f211af30be
@ -25,6 +25,7 @@
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.order.HashFirstResolver;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.order.HashResolver;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.order.LeaderFollowerResolver;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.order.LocalResolver;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.order.OrderedResolver;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.order.RandomResolver;
|
||||
@ -78,8 +79,8 @@ public MultipleDestinationMountTableResolver(
|
||||
addResolver(DestinationOrder.LOCAL, new LocalResolver(conf, router));
|
||||
addResolver(DestinationOrder.RANDOM, new RandomResolver());
|
||||
addResolver(DestinationOrder.HASH_ALL, new HashResolver());
|
||||
addResolver(DestinationOrder.SPACE,
|
||||
new AvailableSpaceResolver(conf, router));
|
||||
addResolver(DestinationOrder.SPACE, new AvailableSpaceResolver(conf, router));
|
||||
addResolver(DestinationOrder.LEADER_FOLLOWER, new LeaderFollowerResolver());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -29,11 +29,14 @@ public enum DestinationOrder {
|
||||
LOCAL, // Local first
|
||||
RANDOM, // Random order
|
||||
HASH_ALL, // Follow consistent hashing
|
||||
SPACE; // Available space based order
|
||||
SPACE, // Available space based order
|
||||
LEADER_FOLLOWER; // Try leader sub-cluster first, if failed, try followers
|
||||
|
||||
/** Approaches that write folders in all subclusters. */
|
||||
public static final EnumSet<DestinationOrder> FOLDER_ALL = EnumSet.of(
|
||||
HASH_ALL,
|
||||
RANDOM,
|
||||
SPACE);
|
||||
SPACE,
|
||||
// leader-follower mode should make sure all directory exists in case of switching
|
||||
LEADER_FOLLOWER);
|
||||
}
|
@ -0,0 +1,50 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.federation.resolver.order;
|
||||
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* LEADER_FOLLOWER can be used in cross-cluster disaster tolerance,
|
||||
* and the order of namespaces is always "leader,follower,follower...".
|
||||
* Write data in leader sub-cluster as many as possible. If leader
|
||||
* sub-cluster failed, try followers then, the same goes for reading data.
|
||||
*/
|
||||
public class LeaderFollowerResolver implements OrderedResolver {
|
||||
protected static final Logger LOG =
|
||||
LoggerFactory.getLogger(LeaderFollowerResolver.class);
|
||||
|
||||
@Override
|
||||
public String getFirstNamespace(String path, PathLocation loc) {
|
||||
try {
|
||||
// Always return first destination.
|
||||
// In leader/follower mode, admin add sub-clusters
|
||||
// by the order of leader,follower,follower...
|
||||
// The first element is always the leader sub-cluster,
|
||||
// so invoking getDefaultLocation is suitable here.
|
||||
RemoteLocation remoteLocation = loc.getDefaultLocation();
|
||||
return remoteLocation.getNameserviceId();
|
||||
} catch (Exception ex) {
|
||||
LOG.error("Cannot find sub-cluster for {}", loc);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
@ -343,6 +343,8 @@ private DestinationOrder convert(DestOrder order) {
|
||||
return DestinationOrder.HASH_ALL;
|
||||
case SPACE:
|
||||
return DestinationOrder.SPACE;
|
||||
case LEADER_FOLLOWER:
|
||||
return DestinationOrder.LEADER_FOLLOWER;
|
||||
default:
|
||||
return DestinationOrder.HASH;
|
||||
}
|
||||
@ -358,6 +360,8 @@ private DestOrder convert(DestinationOrder order) {
|
||||
return DestOrder.HASH_ALL;
|
||||
case SPACE:
|
||||
return DestOrder.SPACE;
|
||||
case LEADER_FOLLOWER:
|
||||
return DestOrder.LEADER_FOLLOWER;
|
||||
default:
|
||||
return DestOrder.HASH;
|
||||
}
|
||||
|
@ -148,6 +148,7 @@ public MountTable getNewOrUpdatedMountTableEntryWithAttributes(MountTable existi
|
||||
* @throws IOException If mount table instantiation fails.
|
||||
*/
|
||||
private MountTable getMountTableForAddRequest(String mountSrc) throws IOException {
|
||||
// linked hash map can keep the order of inserting.
|
||||
Map<String, String> destMap = new LinkedHashMap<>();
|
||||
for (String ns : this.getNss()) {
|
||||
destMap.put(ns, this.getDest());
|
||||
|
@ -164,23 +164,25 @@ private static String getUsage(String cmd) {
|
||||
if (cmd.equals("-add")) {
|
||||
return "\t[-add <source> <nameservice1, nameservice2, ...> <destination> "
|
||||
+ "[-readonly] [-faulttolerant] "
|
||||
+ "[-order HASH|LOCAL|RANDOM|HASH_ALL|SPACE] "
|
||||
+ "[-order HASH|LOCAL|RANDOM|HASH_ALL|SPACE|LEADER_FOLLOWER] "
|
||||
+ "-owner <owner> -group <group> -mode <mode>]";
|
||||
} else if (cmd.equals(ADD_ALL_COMMAND)) {
|
||||
return "\t[" + ADD_ALL_COMMAND + " "
|
||||
+ "<source1> <nameservice1,nameservice2,...> <destination1> "
|
||||
+ "[-readonly] [-faulttolerant] " + "[-order HASH|LOCAL|RANDOM|HASH_ALL|SPACE] "
|
||||
+ "[-readonly] [-faulttolerant] " + "[-order HASH|LOCAL|RANDOM|HASH_ALL|SPACE"
|
||||
+ "|LEADER_FOLLOWER] "
|
||||
+ "-owner <owner1> -group <group1> -mode <mode1>"
|
||||
+ " , "
|
||||
+ "<source2> <nameservice1,nameservice2,...> <destination2> "
|
||||
+ "[-readonly] [-faulttolerant] " + "[-order HASH|LOCAL|RANDOM|HASH_ALL|SPACE] "
|
||||
+ "[-readonly] [-faulttolerant] " + "[-order HASH|LOCAL|RANDOM|HASH_ALL|SPACE"
|
||||
+ "|LEADER_FOLLOWER] "
|
||||
+ "-owner <owner2> -group <group2> -mode <mode2>"
|
||||
+ " , ...]";
|
||||
} else if (cmd.equals("-update")) {
|
||||
return "\t[-update <source>"
|
||||
+ " [<nameservice1, nameservice2, ...> <destination>] "
|
||||
+ "[-readonly true|false] [-faulttolerant true|false] "
|
||||
+ "[-order HASH|LOCAL|RANDOM|HASH_ALL|SPACE] "
|
||||
+ "[-order HASH|LOCAL|RANDOM|HASH_ALL|SPACE|LEADER_FOLLOWER] "
|
||||
+ "-owner <owner> -group <group> -mode <mode>]";
|
||||
} else if (cmd.equals("-rm")) {
|
||||
return "\t[-rm <source>]";
|
||||
|
@ -142,6 +142,7 @@ message MountTableRecordProto {
|
||||
RANDOM = 2;
|
||||
HASH_ALL = 3;
|
||||
SPACE = 4;
|
||||
LEADER_FOLLOWER = 5;
|
||||
}
|
||||
optional DestOrder destOrder = 6 [default = HASH];
|
||||
|
||||
|
@ -288,6 +288,7 @@ For deciding where to create a new file/folder it uses the order parameter, it c
|
||||
* RANDOM: Random subcluster. This is usually useful for balancing the load across. Folders are created in all subclusters.
|
||||
* HASH_ALL: Follow consistent hashing at all the levels. This approach tries to balance the reads and writes evenly across subclusters. Folders are created in all subclusters.
|
||||
* SPACE: Try to write data in the subcluster with the most available space. Folders are created in all subclusters.
|
||||
* LEADER_FOLLOWER: Try to write data in the leader subcluster as much as possible, if failed, try follower subclusters. Folders are created in all subclusters.
|
||||
|
||||
For the hash-based approaches, the difference is that HASH would make all the files/folders within a folder belong to the same subcluster while HASH_ALL will spread all files under a mount point.
|
||||
For example, assuming we have a HASH mount point for `/data/hash`, files and folders under `/data/hash/folder0` will all be in the same subcluster.
|
||||
@ -297,6 +298,9 @@ RANDOM can be used for reading and writing data from/into different subclusters.
|
||||
The common use for this approach is to have the same data in multiple subclusters and balance the reads across subclusters.
|
||||
For example, if thousands of containers need to read the same data (e.g., a library), one can use RANDOM to read the data from any of the subclusters.
|
||||
|
||||
LEADER_FOLLOWER can be used in cross-cluster disaster tolerance, it's not for sharing overloads among sub-clusters. When using this mode like `-add /data ns2,ns1 /data -order LEADER_FOLLOWER`,
|
||||
`ns2` is considered an active subcluster and `ns1` is considered a follower subcluster. The order of namespaces is always `leader,follower,follower...`.
|
||||
|
||||
To determine which subcluster contains a file:
|
||||
|
||||
[hdfs]$ $HADOOP_HOME/bin/hdfs dfsrouteradmin -getDestination /user/user1/file.txt
|
||||
|
@ -24,6 +24,7 @@
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
@ -104,6 +105,15 @@ public void setup() throws IOException {
|
||||
MountTable readOnlyEntry = MountTable.newInstance("/readonly", mapReadOnly);
|
||||
readOnlyEntry.setReadOnly(true);
|
||||
resolver.addEntry(readOnlyEntry);
|
||||
|
||||
// leader follower mode
|
||||
Map<String, String> leaderFollowerMap = new LinkedHashMap<>();
|
||||
leaderFollowerMap.put("subcluster1", "/leaderfollower");
|
||||
leaderFollowerMap.put("subcluster0", "/leaderfollower");
|
||||
leaderFollowerMap.put("subcluster2", "/leaderfollower");
|
||||
MountTable leaderFollowerEntry = MountTable.newInstance("/leaderfollower", leaderFollowerMap);
|
||||
leaderFollowerEntry.setDestOrder(DestinationOrder.LEADER_FOLLOWER);
|
||||
resolver.addEntry(leaderFollowerEntry);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -340,6 +350,13 @@ public void testReadOnly() throws IOException {
|
||||
assertDest("subcluster1", dest12);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLeaderFollower() throws IOException {
|
||||
PathLocation dest0 =
|
||||
resolver.getDestinationForPath("/leaderfollower/folder0/file0.txt");
|
||||
assertDest("subcluster1", dest0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLocalResolver() throws IOException {
|
||||
PathLocation dest0 =
|
||||
|
@ -0,0 +1,66 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.federation.resolver.order;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.MultipleDestinationMountTableResolver;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
|
||||
import org.apache.hadoop.hdfs.server.federation.router.Router;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
public class TestLeaderFollowerResolver {
|
||||
@Test
|
||||
public void testResolve() throws Exception {
|
||||
// Mock the subcluster mapping
|
||||
Configuration conf = new Configuration();
|
||||
Router router = mock(Router.class);
|
||||
LeaderFollowerResolver leaderFollowerResolver = new LeaderFollowerResolver();
|
||||
|
||||
// Add the mocks to the resolver
|
||||
MultipleDestinationMountTableResolver resolver =
|
||||
new MultipleDestinationMountTableResolver(conf, router);
|
||||
resolver.addResolver(DestinationOrder.LEADER_FOLLOWER, leaderFollowerResolver);
|
||||
|
||||
Map<String, String> mapLocal = new LinkedHashMap<>();
|
||||
mapLocal.put("subcluster2", "/local");
|
||||
mapLocal.put("subcluster0", "/local");
|
||||
mapLocal.put("subcluster1", "/local");
|
||||
MountTable localEntry = MountTable.newInstance("/local", mapLocal);
|
||||
localEntry.setDestOrder(DestinationOrder.LEADER_FOLLOWER);
|
||||
resolver.addEntry(localEntry);
|
||||
|
||||
PathLocation dest = resolver.getDestinationForPath("/local/file0.txt");
|
||||
assertDestination("subcluster2", dest);
|
||||
|
||||
}
|
||||
|
||||
private static void assertDestination(String expectedNsId, PathLocation loc) {
|
||||
List<RemoteLocation> dests = loc.getDestinations();
|
||||
RemoteLocation dest = dests.get(0);
|
||||
assertEquals(expectedNsId, dest.getNameserviceId());
|
||||
}
|
||||
}
|
@ -265,6 +265,31 @@ public void testAddOrderMountTable() throws Exception {
|
||||
testAddOrderMountTable(DestinationOrder.RANDOM);
|
||||
testAddOrderMountTable(DestinationOrder.HASH_ALL);
|
||||
testAddOrderMountTable(DestinationOrder.SPACE);
|
||||
testAddOrderMountTable(DestinationOrder.LEADER_FOLLOWER);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLeaderFollower() throws Exception {
|
||||
DestinationOrder order = DestinationOrder.LEADER_FOLLOWER;
|
||||
final String mnt = "/newAdd1" + order;
|
||||
final String nsId = "ns1,ns2,ns0";
|
||||
final String dest = "/changAdd";
|
||||
|
||||
String[] argv = new String[] {
|
||||
"-add", mnt, nsId, dest, "-order", order.toString()};
|
||||
assertEquals(0, ToolRunner.run(admin, argv));
|
||||
|
||||
stateStore.loadCache(MountTableStoreImpl.class, true);
|
||||
MountTableManager mountTable = client.getMountTableManager();
|
||||
GetMountTableEntriesRequest request =
|
||||
GetMountTableEntriesRequest.newInstance(mnt);
|
||||
GetMountTableEntriesResponse response =
|
||||
mountTable.getMountTableEntries(request);
|
||||
List<MountTable> entries = response.getEntries();
|
||||
assertEquals(1, entries.size());
|
||||
assertEquals("ns1", entries.get(0).getDestinations().get(0).getNameserviceId());
|
||||
assertEquals("ns2", entries.get(0).getDestinations().get(1).getNameserviceId());
|
||||
assertEquals("ns0", entries.get(0).getDestinations().get(2).getNameserviceId());
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -795,7 +820,7 @@ public void testInvalidArgumentMessage() throws Exception {
|
||||
assertTrue("Wrong message: " + out, out.toString().contains(
|
||||
"\t[-add <source> <nameservice1, nameservice2, ...> <destination> "
|
||||
+ "[-readonly] [-faulttolerant] "
|
||||
+ "[-order HASH|LOCAL|RANDOM|HASH_ALL|SPACE] "
|
||||
+ "[-order HASH|LOCAL|RANDOM|HASH_ALL|SPACE|LEADER_FOLLOWER] "
|
||||
+ "-owner <owner> -group <group> -mode <mode>]"));
|
||||
out.reset();
|
||||
|
||||
@ -804,7 +829,7 @@ public void testInvalidArgumentMessage() throws Exception {
|
||||
assertTrue("Wrong message: " + out, out.toString().contains(
|
||||
"\t[-update <source> [<nameservice1, nameservice2, ...> <destination>] "
|
||||
+ "[-readonly true|false] [-faulttolerant true|false] "
|
||||
+ "[-order HASH|LOCAL|RANDOM|HASH_ALL|SPACE] "
|
||||
+ "[-order HASH|LOCAL|RANDOM|HASH_ALL|SPACE|LEADER_FOLLOWER] "
|
||||
+ "-owner <owner> -group <group> -mode <mode>]"));
|
||||
out.reset();
|
||||
|
||||
@ -852,18 +877,18 @@ public void testInvalidArgumentMessage() throws Exception {
|
||||
String expected = "Usage: hdfs dfsrouteradmin :\n"
|
||||
+ "\t[-add <source> <nameservice1, nameservice2, ...> <destination> "
|
||||
+ "[-readonly] [-faulttolerant] "
|
||||
+ "[-order HASH|LOCAL|RANDOM|HASH_ALL|SPACE] "
|
||||
+ "[-order HASH|LOCAL|RANDOM|HASH_ALL|SPACE|LEADER_FOLLOWER] "
|
||||
+ "-owner <owner> -group <group> -mode <mode>]\n"
|
||||
+ "\t[-addAll <source1> <nameservice1,nameservice2,...> <destination1> "
|
||||
+ "[-readonly] [-faulttolerant] [-order HASH|LOCAL|RANDOM|HASH_ALL|SPACE] "
|
||||
+ "[-readonly] [-faulttolerant] [-order HASH|LOCAL|RANDOM|HASH_ALL|SPACE|LEADER_FOLLOWER] "
|
||||
+ "-owner <owner1> -group <group1> -mode <mode1>"
|
||||
+ " , <source2> <nameservice1,nameservice2,...> <destination2> "
|
||||
+ "[-readonly] [-faulttolerant] [-order HASH|LOCAL|RANDOM|HASH_ALL|SPACE] "
|
||||
+ "[-readonly] [-faulttolerant] [-order HASH|LOCAL|RANDOM|HASH_ALL|SPACE|LEADER_FOLLOWER] "
|
||||
+ "-owner <owner2> -group <group2> -mode <mode2> , ...]\n"
|
||||
+ "\t[-update <source> [<nameservice1, nameservice2, ...> "
|
||||
+ "<destination>] [-readonly true|false]"
|
||||
+ " [-faulttolerant true|false] "
|
||||
+ "[-order HASH|LOCAL|RANDOM|HASH_ALL|SPACE] "
|
||||
+ "[-order HASH|LOCAL|RANDOM|HASH_ALL|SPACE|LEADER_FOLLOWER] "
|
||||
+ "-owner <owner> -group <group> -mode <mode>]\n" + "\t[-rm <source>]\n"
|
||||
+ "\t[-ls [-d] <path>]\n"
|
||||
+ "\t[-getDestination <path>]\n"
|
||||
|
Loading…
Reference in New Issue
Block a user