From f2355c706361594b7b2ef8b65b37060eab1d66df Mon Sep 17 00:00:00 2001 From: Brahma Reddy Battula Date: Thu, 22 Nov 2018 00:34:08 +0530 Subject: [PATCH] HDFS-13776. RBF: Add Storage policies related ClientProtocol APIs. Contributed by Dibyendu Karmakar. --- .../router/RouterClientProtocol.java | 24 +-- .../router/RouterStoragePolicy.java | 98 ++++++++++++ .../federation/MiniRouterDFSCluster.java | 13 ++ .../federation/router/TestRouterRpc.java | 57 +++++++ .../TestRouterRpcStoragePolicySatisfier.java | 149 ++++++++++++++++++ 5 files changed, 325 insertions(+), 16 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStoragePolicy.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpcStoragePolicySatisfier.java diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java index c8b7cdd1aa..303eedf106 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java @@ -121,6 +121,8 @@ public class RouterClientProtocol implements ClientProtocol { private final String superGroup; /** Erasure coding calls. */ private final ErasureCoding erasureCoding; + /** StoragePolicy calls. **/ + private final RouterStoragePolicy storagePolicy; RouterClientProtocol(Configuration conf, RouterRpcServer rpcServer) { this.rpcServer = rpcServer; @@ -144,6 +146,7 @@ public class RouterClientProtocol implements ClientProtocol { DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY, DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT); this.erasureCoding = new ErasureCoding(rpcServer); + this.storagePolicy = new RouterStoragePolicy(rpcServer); } @Override @@ -278,22 +281,12 @@ public boolean setReplication(String src, short replication) @Override public void setStoragePolicy(String src, String policyName) throws IOException { - rpcServer.checkOperation(NameNode.OperationCategory.WRITE); - - List locations = rpcServer.getLocationsForPath(src, true); - RemoteMethod method = new RemoteMethod("setStoragePolicy", - new Class[] {String.class, String.class}, - new RemoteParam(), policyName); - rpcClient.invokeSequential(locations, method, null, null); + storagePolicy.setStoragePolicy(src, policyName); } @Override public BlockStoragePolicy[] getStoragePolicies() throws IOException { - rpcServer.checkOperation(NameNode.OperationCategory.READ); - - RemoteMethod method = new RemoteMethod("getStoragePolicies"); - String ns = subclusterResolver.getDefaultNamespace(); - return (BlockStoragePolicy[]) rpcClient.invokeSingle(ns, method); + return storagePolicy.getStoragePolicies(); } @Override @@ -1463,13 +1456,12 @@ public void reportBadBlocks(LocatedBlock[] blocks) throws IOException { @Override public void unsetStoragePolicy(String src) throws IOException { - rpcServer.checkOperation(NameNode.OperationCategory.WRITE, false); + storagePolicy.unsetStoragePolicy(src); } @Override public BlockStoragePolicy getStoragePolicy(String path) throws IOException { - rpcServer.checkOperation(NameNode.OperationCategory.READ, false); - return null; + return storagePolicy.getStoragePolicy(path); } @Override @@ -1557,7 +1549,7 @@ public void msync() throws IOException { @Override public void satisfyStoragePolicy(String path) throws IOException { - rpcServer.checkOperation(NameNode.OperationCategory.WRITE, false); + storagePolicy.satisfyStoragePolicy(path); } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStoragePolicy.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStoragePolicy.java new file mode 100644 index 0000000000..7145940cca --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStoragePolicy.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; +import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; +import org.apache.hadoop.hdfs.server.namenode.NameNode; + +import java.io.IOException; +import java.util.List; + +/** + * Module that implements all the RPC calls in + * {@link org.apache.hadoop.hdfs.protocol.ClientProtocol} related to + * Storage Policy in the {@link RouterRpcServer}. + */ +public class RouterStoragePolicy { + + /** RPC server to receive client calls. */ + private final RouterRpcServer rpcServer; + /** RPC clients to connect to the Namenodes. */ + private final RouterRpcClient rpcClient; + /** Interface to map global name space to HDFS subcluster name spaces. */ + private final FileSubclusterResolver subclusterResolver; + + public RouterStoragePolicy(RouterRpcServer server) { + this.rpcServer = server; + this.rpcClient = this.rpcServer.getRPCClient(); + this.subclusterResolver = this.rpcServer.getSubclusterResolver(); + } + + public void setStoragePolicy(String src, String policyName) + throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.WRITE); + + List locations = rpcServer.getLocationsForPath(src, true); + RemoteMethod method = new RemoteMethod("setStoragePolicy", + new Class[] {String.class, String.class}, + new RemoteParam(), + policyName); + rpcClient.invokeSequential(locations, method, null, null); + } + + public BlockStoragePolicy[] getStoragePolicies() throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.READ); + + RemoteMethod method = new RemoteMethod("getStoragePolicies"); + String ns = subclusterResolver.getDefaultNamespace(); + return (BlockStoragePolicy[]) rpcClient.invokeSingle(ns, method); + } + + public void unsetStoragePolicy(String src) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.WRITE, true); + + List locations = rpcServer.getLocationsForPath(src, true); + RemoteMethod method = new RemoteMethod("unsetStoragePolicy", + new Class[] {String.class}, + new RemoteParam()); + rpcClient.invokeSequential(locations, method); + } + + public BlockStoragePolicy getStoragePolicy(String path) + throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.READ, true); + + List locations = rpcServer.getLocationsForPath(path, false); + RemoteMethod method = new RemoteMethod("getStoragePolicy", + new Class[] {String.class}, + new RemoteParam()); + return (BlockStoragePolicy) rpcClient.invokeSequential(locations, method); + } + + public void satisfyStoragePolicy(String path) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.READ, true); + + List locations = rpcServer.getLocationsForPath(path, true); + RemoteMethod method = new RemoteMethod("satisfyStoragePolicy", + new Class[] {String.class}, + new RemoteParam()); + rpcClient.invokeSequential(locations, method); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java index a5693a6f2c..2df883cff6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java @@ -67,6 +67,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.hdfs.DFSClient; @@ -118,6 +119,8 @@ public class MiniRouterDFSCluster { private boolean highAvailability; /** Number of datanodes per nameservice. */ private int numDatanodesPerNameservice = 2; + /** Custom storage type for each datanode. */ + private StorageType[][] storageTypes = null; /** Mini cluster. */ private MiniDFSCluster cluster; @@ -614,6 +617,15 @@ public void setNumDatanodesPerNameservice(int num) { this.numDatanodesPerNameservice = num; } + /** + * Set custom storage type configuration for each datanode. + * If storageTypes is uninitialized or passed null then + * StorageType.DEFAULT is used. + */ + public void setStorageTypes(StorageType[][] storageTypes) { + this.storageTypes = storageTypes; + } + /** * Set the DNs to belong to only one subcluster. */ @@ -767,6 +779,7 @@ public void startCluster(Configuration overrideConf) { .numDataNodes(numDNs) .nnTopology(topology) .dataNodeConfOverlays(dnConfs) + .storageTypes(storageTypes) .build(); cluster.waitActive(); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java index 204366e48d..8632203b06 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java @@ -72,6 +72,7 @@ import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyState; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; @@ -769,6 +770,62 @@ public void testProxyStoragePolicy() throws Exception { m, new Object[] {badPath, "badpolicy"}); } + @Test + public void testProxyGetAndUnsetStoragePolicy() throws Exception { + String file = "/testGetStoragePolicy"; + String nnFilePath = cluster.getNamenodeTestDirectoryForNS(ns) + file; + String routerFilePath = cluster.getFederatedTestDirectoryForNS(ns) + file; + + createFile(routerFS, routerFilePath, 32); + + // Get storage policy via router + BlockStoragePolicy policy = routerProtocol.getStoragePolicy(routerFilePath); + // Verify default policy is HOT + assertEquals(HdfsConstants.HOT_STORAGE_POLICY_NAME, policy.getName()); + assertEquals(HdfsConstants.HOT_STORAGE_POLICY_ID, policy.getId()); + + // Get storage policies via router + BlockStoragePolicy[] policies = routerProtocol.getStoragePolicies(); + BlockStoragePolicy[] nnPolicies = namenode.getClient().getStoragePolicies(); + // Verify policie returned by router is same as policies returned by NN + assertArrayEquals(nnPolicies, policies); + + BlockStoragePolicy newPolicy = policies[0]; + while (newPolicy.isCopyOnCreateFile()) { + // Pick a non copy on create policy. Beacuse if copyOnCreateFile is set + // then the policy cannot be changed after file creation. + Random rand = new Random(); + int randIndex = rand.nextInt(policies.length); + newPolicy = policies[randIndex]; + } + routerProtocol.setStoragePolicy(routerFilePath, newPolicy.getName()); + + // Get storage policy via router + policy = routerProtocol.getStoragePolicy(routerFilePath); + // Verify default policy + assertEquals(newPolicy.getName(), policy.getName()); + assertEquals(newPolicy.getId(), policy.getId()); + + // Verify policy via NN + BlockStoragePolicy nnPolicy = + namenode.getClient().getStoragePolicy(nnFilePath); + assertEquals(nnPolicy.getName(), policy.getName()); + assertEquals(nnPolicy.getId(), policy.getId()); + + // Unset storage policy via router + routerProtocol.unsetStoragePolicy(routerFilePath); + + // Get storage policy + policy = routerProtocol.getStoragePolicy(routerFilePath); + assertEquals(HdfsConstants.HOT_STORAGE_POLICY_NAME, policy.getName()); + assertEquals(HdfsConstants.HOT_STORAGE_POLICY_ID, policy.getId()); + + // Verify policy via NN + nnPolicy = namenode.getClient().getStoragePolicy(nnFilePath); + assertEquals(nnPolicy.getName(), policy.getName()); + assertEquals(nnPolicy.getId(), policy.getId()); + } + @Test public void testProxyGetPreferedBlockSize() throws Exception { diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpcStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpcStoragePolicySatisfier.java new file mode 100644 index 0000000000..fa1079a4ed --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpcStoragePolicySatisfier.java @@ -0,0 +1,149 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; +import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster; +import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; +import org.apache.hadoop.hdfs.server.federation.metrics.NamenodeBeanMetrics; +import org.apache.hadoop.hdfs.server.namenode.sps.Context; +import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier; +import org.apache.hadoop.hdfs.server.sps.ExternalSPSContext; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; + +/** + * Test StoragePolicySatisfy through router rpc calls. + */ +public class TestRouterRpcStoragePolicySatisfier { + + /** Federated HDFS cluster. */ + private static MiniRouterDFSCluster cluster; + + /** Client interface to the Router. */ + private static ClientProtocol routerProtocol; + + /** Filesystem interface to the Router. */ + private static FileSystem routerFS; + /** Filesystem interface to the Namenode. */ + private static FileSystem nnFS; + + @BeforeClass + public static void globalSetUp() throws Exception { + cluster = new MiniRouterDFSCluster(false, 1); + // Set storage types for the cluster + StorageType[][] newtypes = new StorageType[][] { + {StorageType.ARCHIVE, StorageType.DISK}}; + cluster.setStorageTypes(newtypes); + + Configuration conf = cluster.getNamenodes().get(0).getConf(); + conf.set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY, + HdfsConstants.StoragePolicySatisfierMode.EXTERNAL.toString()); + // Reduced refresh cycle to update latest datanodes. + conf.setLong(DFSConfigKeys.DFS_SPS_DATANODE_CACHE_REFRESH_INTERVAL_MS, + 1000); + cluster.addNamenodeOverrides(conf); + + cluster.setNumDatanodesPerNameservice(1); + + // Start NNs and DNs and wait until ready + cluster.startCluster(); + + // Start routers with only an RPC service + Configuration routerConf = new RouterConfigBuilder() + .metrics() + .rpc() + .build(); + // We decrease the DN cache times to make the test faster + routerConf.setTimeDuration( + RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS); + cluster.addRouterOverrides(routerConf); + cluster.startRouters(); + + // Register and verify all NNs with all routers + cluster.registerNamenodes(); + cluster.waitNamenodeRegistration(); + + // Create mock locations + cluster.installMockLocations(); + + // Random router for this test + MiniRouterDFSCluster.RouterContext rndRouter = cluster.getRandomRouter(); + + routerProtocol = rndRouter.getClient().getNamenode(); + routerFS = rndRouter.getFileSystem(); + nnFS = cluster.getNamenodes().get(0).getFileSystem(); + + NameNodeConnector nnc = DFSTestUtil.getNameNodeConnector(conf, + HdfsServerConstants.MOVER_ID_PATH, 1, false); + + StoragePolicySatisfier externalSps = new StoragePolicySatisfier(conf); + Context externalCtxt = new ExternalSPSContext(externalSps, nnc); + + externalSps.init(externalCtxt); + externalSps.start(HdfsConstants.StoragePolicySatisfierMode.EXTERNAL); + } + + @AfterClass + public static void tearDown() { + cluster.shutdown(); + } + + @Test + public void testStoragePolicySatisfier() throws Exception { + final String file = "/testStoragePolicySatisfierCommand"; + short repl = 1; + int size = 32; + DFSTestUtil.createFile(routerFS, new Path(file), size, repl, 0); + // Varify storage type is DISK + DFSTestUtil.waitExpectedStorageType(file, StorageType.DISK, 1, 20000, + (DistributedFileSystem) routerFS); + // Set storage policy as COLD + routerProtocol + .setStoragePolicy(file, HdfsConstants.COLD_STORAGE_POLICY_NAME); + // Verify storage policy is set properly + BlockStoragePolicy storagePolicy = routerProtocol.getStoragePolicy(file); + assertEquals(HdfsConstants.COLD_STORAGE_POLICY_NAME, + storagePolicy.getName()); + // Invoke satisfy storage policy + routerProtocol.satisfyStoragePolicy(file); + // Verify storage type is ARCHIVE + DFSTestUtil.waitExpectedStorageType(file, StorageType.ARCHIVE, 1, 20000, + (DistributedFileSystem) routerFS); + + // Verify storage type via NN + DFSTestUtil.waitExpectedStorageType(file, StorageType.ARCHIVE, 1, 20000, + (DistributedFileSystem) nnFS); + } +}