HDFS-12875. RBF: Complete logic for -readonly option of dfsrouteradmin add command. Contributed by Inigo Goiri.

This commit is contained in:
Inigo Goiri 2017-12-11 15:14:57 -08:00
parent 2316f52690
commit 5cd1056ad7
9 changed files with 275 additions and 38 deletions

View File

@ -103,8 +103,10 @@
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo;
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation;
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
@ -1982,6 +1984,17 @@ private List<RemoteLocation> getLocationsForPath(
this.subclusterResolver);
}
// We may block some write operations
if (opCategory.get() == OperationCategory.WRITE) {
// Check if the path is in a read only mount point
if (isPathReadOnly(path)) {
if (this.rpcMonitor != null) {
this.rpcMonitor.routerFailureReadOnly();
}
throw new IOException(path + " is in a read only mount point");
}
}
return location.getDestinations();
} catch (IOException ioe) {
if (this.rpcMonitor != null) {
@ -1991,6 +2004,27 @@ private List<RemoteLocation> getLocationsForPath(
}
}
/**
* Check if a path is in a read only mount point.
*
* @param path Path to check.
* @return If the path is in a read only mount point.
*/
private boolean isPathReadOnly(final String path) {
if (subclusterResolver instanceof MountTableResolver) {
try {
MountTableResolver mountTable = (MountTableResolver)subclusterResolver;
MountTable entry = mountTable.getMountPoint(path);
if (entry != null && entry.isReadOnly()) {
return true;
}
} catch (IOException e) {
LOG.error("Cannot get mount point: {}", e.getMessage());
}
}
return false;
}
/**
* Get the modification dates for mount points.
*

View File

@ -77,7 +77,7 @@ public RouterAdmin(Configuration conf) {
public void printUsage() {
String usage = "Federation Admin Tools:\n"
+ "\t[-add <source> <nameservice> <destination> "
+ "[-readonly] [-order HASH|LOCAL|RANDOM|HASH_ALL]]\n"
+ "[-readonly]\n"
+ "\t[-rm <source>]\n"
+ "\t[-ls <path>]\n";
System.out.println(usage);

View File

@ -425,7 +425,7 @@ Runs the DFS router. See [Router](./HDFSRouterFederation.html#Router) for more i
Usage:
hdfs dfsrouteradmin
[-add <source> <nameservice> <destination>]
[-add <source> <nameservice> <destination> [-readonly]]
[-rm <source>]
[-ls <path>]

View File

@ -184,6 +184,10 @@ For example, to create three mount points and list them:
[hdfs]$ $HADOOP_HOME/bin/hdfs dfsrouteradmin -add /data/app2 ns3 /data/app2
[hdfs]$ $HADOOP_HOME/bin/hdfs dfsrouteradmin -ls
It also supports mount points that disallow writes:
[hdfs]$ $HADOOP_HOME/bin/hdfs dfsrouteradmin -add /readonly ns1 / -readonly
If a mount point is not set, the Router will map it to the default namespace `dfs.federation.router.default.nameserviceId`.

View File

@ -20,7 +20,6 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
@ -52,6 +51,9 @@
import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.test.GenericTestUtils;
import com.google.common.base.Supplier;
/**
* Helper utilities for testing HDFS Federation.
@ -108,33 +110,41 @@ public static NamenodeStatusReport createNamenodeReport(String ns, String nn,
return report;
}
public static void waitNamenodeRegistered(ActiveNamenodeResolver resolver,
String nsId, String nnId, FederationNamenodeServiceState finalState)
throws InterruptedException, IllegalStateException, IOException {
for (int loopCount = 0; loopCount < 20; loopCount++) {
if (loopCount > 0) {
Thread.sleep(1000);
}
/**
* Wait for a namenode to be registered with a particular state.
* @param resolver Active namenode resolver.
* @param nsId Nameservice identifier.
* @param nnId Namenode identifier.
* @param finalState State to check for.
* @throws Exception Failed to verify State Store registration of namenode
* nsId:nnId for state.
*/
public static void waitNamenodeRegistered(
final ActiveNamenodeResolver resolver,
final String nsId, final String nnId,
final FederationNamenodeServiceState state) throws Exception {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
try {
List<? extends FederationNamenodeContext> namenodes =
resolver.getNamenodesForNameserviceId(nsId);
if (namenodes != null) {
for (FederationNamenodeContext namenode : namenodes) {
// Check if this is the Namenode we are checking
if (namenode.getNamenodeId() == nnId ||
namenode.getNamenodeId().equals(nnId)) {
if (finalState != null && !namenode.getState().equals(finalState)) {
// Wrong state, wait a bit more
break;
} else {
// Found and verified
return;
return state == null || namenode.getState().equals(state);
}
}
}
} catch (IOException e) {
// Ignore
}
fail("Failed to verify State Store registration of " + nsId + " " + nnId +
" for state " + finalState);
return false;
}
}, 1000, 20 * 1000);
}
public static boolean verifyDate(Date d1, Date d2, long precision) {

View File

@ -747,8 +747,7 @@ public void registerNamenodes() throws IOException {
}
}
public void waitNamenodeRegistration()
throws InterruptedException, IllegalStateException, IOException {
public void waitNamenodeRegistration() throws Exception {
for (RouterContext r : this.routers) {
Router router = r.router;
for (NamenodeContext nn : this.namenodes) {
@ -761,7 +760,7 @@ public void waitNamenodeRegistration()
public void waitRouterRegistrationQuorum(RouterContext router,
FederationNamenodeServiceState state, String nsId, String nnId)
throws InterruptedException, IOException {
throws Exception {
LOG.info("Waiting for NN {} {} to transition to {}", nsId, nnId, state);
ActiveNamenodeResolver nnResolver = router.router.getNamenodeResolver();
waitNamenodeRegistered(nnResolver, nsId, nnId, state);

View File

@ -69,6 +69,7 @@ private Map<String, String> getMountTableEntry(
* ______file1.txt -> 4:/user/file1.txt
* __usr
* ____bin -> 2:/bin
* __readonly -> 2:/tmp
*
* @throws IOException If it cannot set the mount table.
*/
@ -107,6 +108,12 @@ private void setupMountTable() throws IOException {
// /user/a/demo/test/b
map = getMountTableEntry("3", "/user/test");
mountTable.addEntry(MountTable.newInstance("/user/a/demo/test/b", map));
// /readonly
map = getMountTableEntry("2", "/tmp");
MountTable readOnlyEntry = MountTable.newInstance("/readonly", map);
readOnlyEntry.setReadOnly(true);
mountTable.addEntry(readOnlyEntry);
}
@Before
@ -152,6 +159,9 @@ public void testDestination() throws IOException {
assertEquals("3->/user/test/a",
mountTable.getDestinationForPath("/user/test/a").toString());
assertEquals("2->/tmp/tesfile1.txt",
mountTable.getDestinationForPath("/readonly/tesfile1.txt").toString());
}
private void compareLists(List<String> list1, String[] list2) {
@ -166,8 +176,8 @@ public void testGetMountPoints() throws IOException {
// Check getting all mount points (virtual and real) beneath a path
List<String> mounts = mountTable.getMountPoints("/");
assertEquals(3, mounts.size());
compareLists(mounts, new String[] {"tmp", "user", "usr"});
assertEquals(4, mounts.size());
compareLists(mounts, new String[] {"tmp", "user", "usr", "readonly"});
mounts = mountTable.getMountPoints("/user");
assertEquals(2, mounts.size());
@ -212,9 +222,10 @@ public void testGetMounts() throws IOException {
// Check listing the mount table records at or beneath a path
List<MountTable> records = mountTable.getMounts("/");
assertEquals(8, records.size());
assertEquals(9, records.size());
compareRecords(records, new String[] {"/", "/tmp", "/user", "/usr/bin",
"user/a", "/user/a/demo/a", "/user/a/demo/b", "/user/b/file1.txt"});
"user/a", "/user/a/demo/a", "/user/a/demo/b", "/user/b/file1.txt",
"readonly"});
records = mountTable.getMounts("/user");
assertEquals(5, records.size());
@ -229,6 +240,11 @@ public void testGetMounts() throws IOException {
records = mountTable.getMounts("/tmp");
assertEquals(1, records.size());
compareRecords(records, new String[] {"/tmp"});
records = mountTable.getMounts("/readonly");
assertEquals(1, records.size());
compareRecords(records, new String[] {"/readonly"});
assertTrue(records.get(0).isReadOnly());
}
@Test
@ -237,7 +253,7 @@ public void testRemoveSubTree()
// 3 mount points are present /tmp, /user, /usr
compareLists(mountTable.getMountPoints("/"),
new String[] {"user", "usr", "tmp"});
new String[] {"user", "usr", "tmp", "readonly"});
// /tmp currently points to namespace 2
assertEquals("2", mountTable.getDestinationForPath("/tmp/testfile.txt")
@ -248,7 +264,7 @@ public void testRemoveSubTree()
// Now 2 mount points are present /user, /usr
compareLists(mountTable.getMountPoints("/"),
new String[] {"user", "usr"});
new String[] {"user", "usr", "readonly"});
// /tmp no longer exists, uses default namespace for mapping /
assertEquals("1", mountTable.getDestinationForPath("/tmp/testfile.txt")
@ -261,7 +277,7 @@ public void testRemoveVirtualNode()
// 3 mount points are present /tmp, /user, /usr
compareLists(mountTable.getMountPoints("/"),
new String[] {"user", "usr", "tmp"});
new String[] {"user", "usr", "tmp", "readonly"});
// /usr is virtual, uses namespace 1->/
assertEquals("1", mountTable.getDestinationForPath("/usr/testfile.txt")
@ -272,7 +288,7 @@ public void testRemoveVirtualNode()
// Verify the remove failed
compareLists(mountTable.getMountPoints("/"),
new String[] {"user", "usr", "tmp"});
new String[] {"user", "usr", "tmp", "readonly"});
}
@Test
@ -304,7 +320,7 @@ public void testRefreshEntries()
// Initial table loaded
testDestination();
assertEquals(8, mountTable.getMounts("/").size());
assertEquals(9, mountTable.getMounts("/").size());
// Replace table with /1 and /2
List<MountTable> records = new ArrayList<>();

View File

@ -143,6 +143,37 @@ public void testAddDuplicateMountTable() throws IOException {
assertFalse(addResponse2.getStatus());
}
@Test
public void testAddReadOnlyMountTable() throws IOException {
MountTable newEntry = MountTable.newInstance(
"/readonly", Collections.singletonMap("ns0", "/testdir"),
Time.now(), Time.now());
newEntry.setReadOnly(true);
RouterClient client = routerContext.getAdminClient();
MountTableManager mountTable = client.getMountTableManager();
// Existing mount table size
List<MountTable> records = getMountTableEntries(mountTable);
assertEquals(records.size(), mockMountTable.size());
// Add
AddMountTableEntryRequest addRequest =
AddMountTableEntryRequest.newInstance(newEntry);
AddMountTableEntryResponse addResponse =
mountTable.addMountTableEntry(addRequest);
assertTrue(addResponse.getStatus());
// New mount table size
List<MountTable> records2 = getMountTableEntries(mountTable);
assertEquals(records2.size(), mockMountTable.size() + 1);
// Check that we have the read only entry
MountTable record = getMountTableEntry("/readonly");
assertEquals("/readonly", record.getSourcePath());
assertTrue(record.isReadOnly());
}
@Test
public void testRemoveMountTable() throws IOException {

View File

@ -0,0 +1,143 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.Collections;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.NamenodeContext;
import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.RouterContext;
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* Test a router end-to-end including the MountTable.
*/
public class TestRouterMountTable {
private static StateStoreDFSCluster cluster;
private static NamenodeContext nnContext;
private static RouterContext routerContext;
private static MountTableResolver mountTable;
@BeforeClass
public static void globalSetUp() throws Exception {
// Build and start a federated cluster
cluster = new StateStoreDFSCluster(false, 1);
Configuration conf = new RouterConfigBuilder()
.stateStore()
.admin()
.rpc()
.build();
cluster.addRouterOverrides(conf);
cluster.startCluster();
cluster.startRouters();
cluster.waitClusterUp();
// Get the end points
nnContext = cluster.getRandomNamenode();
routerContext = cluster.getRandomRouter();
Router router = routerContext.getRouter();
mountTable = (MountTableResolver) router.getSubclusterResolver();
}
@AfterClass
public static void tearDown() {
if (cluster != null) {
cluster.stopRouter(routerContext);
cluster.shutdown();
cluster = null;
}
}
@Test
public void testReadOnly() throws Exception {
// Add a read only entry
MountTable readOnlyEntry = MountTable.newInstance(
"/readonly", Collections.singletonMap("ns0", "/testdir"));
readOnlyEntry.setReadOnly(true);
assertTrue(addMountTable(readOnlyEntry));
// Add a regular entry
MountTable regularEntry = MountTable.newInstance(
"/regular", Collections.singletonMap("ns0", "/testdir"));
assertTrue(addMountTable(regularEntry));
// Create a folder which should show in all locations
final FileSystem nnFs = nnContext.getFileSystem();
final FileSystem routerFs = routerContext.getFileSystem();
assertTrue(routerFs.mkdirs(new Path("/regular/newdir")));
FileStatus dirStatusNn =
nnFs.getFileStatus(new Path("/testdir/newdir"));
assertTrue(dirStatusNn.isDirectory());
FileStatus dirStatusRegular =
routerFs.getFileStatus(new Path("/regular/newdir"));
assertTrue(dirStatusRegular.isDirectory());
FileStatus dirStatusReadOnly =
routerFs.getFileStatus(new Path("/readonly/newdir"));
assertTrue(dirStatusReadOnly.isDirectory());
// It should fail writing into a read only path
try {
routerFs.mkdirs(new Path("/readonly/newdirfail"));
fail("We should not be able to write into a read only mount point");
} catch (IOException ioe) {
String msg = ioe.getMessage();
assertTrue(msg.startsWith(
"/readonly/newdirfail is in a read only mount point"));
}
}
/**
* Add a mount table entry to the mount table through the admin API.
* @param entry Mount table entry to add.
* @return If it was succesfully added.
* @throws IOException Problems adding entries.
*/
private boolean addMountTable(final MountTable entry) throws IOException {
RouterClient client = routerContext.getAdminClient();
MountTableManager mountTableManager = client.getMountTableManager();
AddMountTableEntryRequest addRequest =
AddMountTableEntryRequest.newInstance(entry);
AddMountTableEntryResponse addResponse =
mountTableManager.addMountTableEntry(addRequest);
// Reload the Router cache
mountTable.loadCache(true);
return addResponse.getStatus();
}
}