HDFS-13776. RBF: Add Storage policies related ClientProtocol APIs. Contributed by Dibyendu Karmakar.
This commit is contained in:
parent
f4bd1114ff
commit
f2355c7063
@ -121,6 +121,8 @@ public class RouterClientProtocol implements ClientProtocol {
|
|||||||
private final String superGroup;
|
private final String superGroup;
|
||||||
/** Erasure coding calls. */
|
/** Erasure coding calls. */
|
||||||
private final ErasureCoding erasureCoding;
|
private final ErasureCoding erasureCoding;
|
||||||
|
/** StoragePolicy calls. **/
|
||||||
|
private final RouterStoragePolicy storagePolicy;
|
||||||
|
|
||||||
RouterClientProtocol(Configuration conf, RouterRpcServer rpcServer) {
|
RouterClientProtocol(Configuration conf, RouterRpcServer rpcServer) {
|
||||||
this.rpcServer = rpcServer;
|
this.rpcServer = rpcServer;
|
||||||
@ -144,6 +146,7 @@ public class RouterClientProtocol implements ClientProtocol {
|
|||||||
DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY,
|
DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY,
|
||||||
DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT);
|
DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT);
|
||||||
this.erasureCoding = new ErasureCoding(rpcServer);
|
this.erasureCoding = new ErasureCoding(rpcServer);
|
||||||
|
this.storagePolicy = new RouterStoragePolicy(rpcServer);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -278,22 +281,12 @@ public boolean setReplication(String src, short replication)
|
|||||||
@Override
|
@Override
|
||||||
public void setStoragePolicy(String src, String policyName)
|
public void setStoragePolicy(String src, String policyName)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
|
storagePolicy.setStoragePolicy(src, policyName);
|
||||||
|
|
||||||
List<RemoteLocation> locations = rpcServer.getLocationsForPath(src, true);
|
|
||||||
RemoteMethod method = new RemoteMethod("setStoragePolicy",
|
|
||||||
new Class<?>[] {String.class, String.class},
|
|
||||||
new RemoteParam(), policyName);
|
|
||||||
rpcClient.invokeSequential(locations, method, null, null);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public BlockStoragePolicy[] getStoragePolicies() throws IOException {
|
public BlockStoragePolicy[] getStoragePolicies() throws IOException {
|
||||||
rpcServer.checkOperation(NameNode.OperationCategory.READ);
|
return storagePolicy.getStoragePolicies();
|
||||||
|
|
||||||
RemoteMethod method = new RemoteMethod("getStoragePolicies");
|
|
||||||
String ns = subclusterResolver.getDefaultNamespace();
|
|
||||||
return (BlockStoragePolicy[]) rpcClient.invokeSingle(ns, method);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -1463,13 +1456,12 @@ public void reportBadBlocks(LocatedBlock[] blocks) throws IOException {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void unsetStoragePolicy(String src) throws IOException {
|
public void unsetStoragePolicy(String src) throws IOException {
|
||||||
rpcServer.checkOperation(NameNode.OperationCategory.WRITE, false);
|
storagePolicy.unsetStoragePolicy(src);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public BlockStoragePolicy getStoragePolicy(String path) throws IOException {
|
public BlockStoragePolicy getStoragePolicy(String path) throws IOException {
|
||||||
rpcServer.checkOperation(NameNode.OperationCategory.READ, false);
|
return storagePolicy.getStoragePolicy(path);
|
||||||
return null;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -1557,7 +1549,7 @@ public void msync() throws IOException {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void satisfyStoragePolicy(String path) throws IOException {
|
public void satisfyStoragePolicy(String path) throws IOException {
|
||||||
rpcServer.checkOperation(NameNode.OperationCategory.WRITE, false);
|
storagePolicy.satisfyStoragePolicy(path);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -0,0 +1,98 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.federation.router;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Module that implements all the RPC calls in
|
||||||
|
* {@link org.apache.hadoop.hdfs.protocol.ClientProtocol} related to
|
||||||
|
* Storage Policy in the {@link RouterRpcServer}.
|
||||||
|
*/
|
||||||
|
public class RouterStoragePolicy {
|
||||||
|
|
||||||
|
/** RPC server to receive client calls. */
|
||||||
|
private final RouterRpcServer rpcServer;
|
||||||
|
/** RPC clients to connect to the Namenodes. */
|
||||||
|
private final RouterRpcClient rpcClient;
|
||||||
|
/** Interface to map global name space to HDFS subcluster name spaces. */
|
||||||
|
private final FileSubclusterResolver subclusterResolver;
|
||||||
|
|
||||||
|
public RouterStoragePolicy(RouterRpcServer server) {
|
||||||
|
this.rpcServer = server;
|
||||||
|
this.rpcClient = this.rpcServer.getRPCClient();
|
||||||
|
this.subclusterResolver = this.rpcServer.getSubclusterResolver();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setStoragePolicy(String src, String policyName)
|
||||||
|
throws IOException {
|
||||||
|
rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
|
||||||
|
|
||||||
|
List<RemoteLocation> locations = rpcServer.getLocationsForPath(src, true);
|
||||||
|
RemoteMethod method = new RemoteMethod("setStoragePolicy",
|
||||||
|
new Class<?>[] {String.class, String.class},
|
||||||
|
new RemoteParam(),
|
||||||
|
policyName);
|
||||||
|
rpcClient.invokeSequential(locations, method, null, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
public BlockStoragePolicy[] getStoragePolicies() throws IOException {
|
||||||
|
rpcServer.checkOperation(NameNode.OperationCategory.READ);
|
||||||
|
|
||||||
|
RemoteMethod method = new RemoteMethod("getStoragePolicies");
|
||||||
|
String ns = subclusterResolver.getDefaultNamespace();
|
||||||
|
return (BlockStoragePolicy[]) rpcClient.invokeSingle(ns, method);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void unsetStoragePolicy(String src) throws IOException {
|
||||||
|
rpcServer.checkOperation(NameNode.OperationCategory.WRITE, true);
|
||||||
|
|
||||||
|
List<RemoteLocation> locations = rpcServer.getLocationsForPath(src, true);
|
||||||
|
RemoteMethod method = new RemoteMethod("unsetStoragePolicy",
|
||||||
|
new Class<?>[] {String.class},
|
||||||
|
new RemoteParam());
|
||||||
|
rpcClient.invokeSequential(locations, method);
|
||||||
|
}
|
||||||
|
|
||||||
|
public BlockStoragePolicy getStoragePolicy(String path)
|
||||||
|
throws IOException {
|
||||||
|
rpcServer.checkOperation(NameNode.OperationCategory.READ, true);
|
||||||
|
|
||||||
|
List<RemoteLocation> locations = rpcServer.getLocationsForPath(path, false);
|
||||||
|
RemoteMethod method = new RemoteMethod("getStoragePolicy",
|
||||||
|
new Class<?>[] {String.class},
|
||||||
|
new RemoteParam());
|
||||||
|
return (BlockStoragePolicy) rpcClient.invokeSequential(locations, method);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void satisfyStoragePolicy(String path) throws IOException {
|
||||||
|
rpcServer.checkOperation(NameNode.OperationCategory.READ, true);
|
||||||
|
|
||||||
|
List<RemoteLocation> locations = rpcServer.getLocationsForPath(path, true);
|
||||||
|
RemoteMethod method = new RemoteMethod("satisfyStoragePolicy",
|
||||||
|
new Class<?>[] {String.class},
|
||||||
|
new RemoteParam());
|
||||||
|
rpcClient.invokeSequential(locations, method);
|
||||||
|
}
|
||||||
|
}
|
@ -67,6 +67,7 @@
|
|||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.StorageType;
|
||||||
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
||||||
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
||||||
import org.apache.hadoop.hdfs.DFSClient;
|
import org.apache.hadoop.hdfs.DFSClient;
|
||||||
@ -118,6 +119,8 @@ public class MiniRouterDFSCluster {
|
|||||||
private boolean highAvailability;
|
private boolean highAvailability;
|
||||||
/** Number of datanodes per nameservice. */
|
/** Number of datanodes per nameservice. */
|
||||||
private int numDatanodesPerNameservice = 2;
|
private int numDatanodesPerNameservice = 2;
|
||||||
|
/** Custom storage type for each datanode. */
|
||||||
|
private StorageType[][] storageTypes = null;
|
||||||
|
|
||||||
/** Mini cluster. */
|
/** Mini cluster. */
|
||||||
private MiniDFSCluster cluster;
|
private MiniDFSCluster cluster;
|
||||||
@ -614,6 +617,15 @@ public void setNumDatanodesPerNameservice(int num) {
|
|||||||
this.numDatanodesPerNameservice = num;
|
this.numDatanodesPerNameservice = num;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set custom storage type configuration for each datanode.
|
||||||
|
* If storageTypes is uninitialized or passed null then
|
||||||
|
* StorageType.DEFAULT is used.
|
||||||
|
*/
|
||||||
|
public void setStorageTypes(StorageType[][] storageTypes) {
|
||||||
|
this.storageTypes = storageTypes;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set the DNs to belong to only one subcluster.
|
* Set the DNs to belong to only one subcluster.
|
||||||
*/
|
*/
|
||||||
@ -767,6 +779,7 @@ public void startCluster(Configuration overrideConf) {
|
|||||||
.numDataNodes(numDNs)
|
.numDataNodes(numDNs)
|
||||||
.nnTopology(topology)
|
.nnTopology(topology)
|
||||||
.dataNodeConfOverlays(dnConfs)
|
.dataNodeConfOverlays(dnConfs)
|
||||||
|
.storageTypes(storageTypes)
|
||||||
.build();
|
.build();
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
|
|
||||||
|
@ -72,6 +72,7 @@
|
|||||||
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
||||||
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo;
|
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyState;
|
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyState;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||||
@ -769,6 +770,62 @@ public void testProxyStoragePolicy() throws Exception {
|
|||||||
m, new Object[] {badPath, "badpolicy"});
|
m, new Object[] {badPath, "badpolicy"});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testProxyGetAndUnsetStoragePolicy() throws Exception {
|
||||||
|
String file = "/testGetStoragePolicy";
|
||||||
|
String nnFilePath = cluster.getNamenodeTestDirectoryForNS(ns) + file;
|
||||||
|
String routerFilePath = cluster.getFederatedTestDirectoryForNS(ns) + file;
|
||||||
|
|
||||||
|
createFile(routerFS, routerFilePath, 32);
|
||||||
|
|
||||||
|
// Get storage policy via router
|
||||||
|
BlockStoragePolicy policy = routerProtocol.getStoragePolicy(routerFilePath);
|
||||||
|
// Verify default policy is HOT
|
||||||
|
assertEquals(HdfsConstants.HOT_STORAGE_POLICY_NAME, policy.getName());
|
||||||
|
assertEquals(HdfsConstants.HOT_STORAGE_POLICY_ID, policy.getId());
|
||||||
|
|
||||||
|
// Get storage policies via router
|
||||||
|
BlockStoragePolicy[] policies = routerProtocol.getStoragePolicies();
|
||||||
|
BlockStoragePolicy[] nnPolicies = namenode.getClient().getStoragePolicies();
|
||||||
|
// Verify policie returned by router is same as policies returned by NN
|
||||||
|
assertArrayEquals(nnPolicies, policies);
|
||||||
|
|
||||||
|
BlockStoragePolicy newPolicy = policies[0];
|
||||||
|
while (newPolicy.isCopyOnCreateFile()) {
|
||||||
|
// Pick a non copy on create policy. Beacuse if copyOnCreateFile is set
|
||||||
|
// then the policy cannot be changed after file creation.
|
||||||
|
Random rand = new Random();
|
||||||
|
int randIndex = rand.nextInt(policies.length);
|
||||||
|
newPolicy = policies[randIndex];
|
||||||
|
}
|
||||||
|
routerProtocol.setStoragePolicy(routerFilePath, newPolicy.getName());
|
||||||
|
|
||||||
|
// Get storage policy via router
|
||||||
|
policy = routerProtocol.getStoragePolicy(routerFilePath);
|
||||||
|
// Verify default policy
|
||||||
|
assertEquals(newPolicy.getName(), policy.getName());
|
||||||
|
assertEquals(newPolicy.getId(), policy.getId());
|
||||||
|
|
||||||
|
// Verify policy via NN
|
||||||
|
BlockStoragePolicy nnPolicy =
|
||||||
|
namenode.getClient().getStoragePolicy(nnFilePath);
|
||||||
|
assertEquals(nnPolicy.getName(), policy.getName());
|
||||||
|
assertEquals(nnPolicy.getId(), policy.getId());
|
||||||
|
|
||||||
|
// Unset storage policy via router
|
||||||
|
routerProtocol.unsetStoragePolicy(routerFilePath);
|
||||||
|
|
||||||
|
// Get storage policy
|
||||||
|
policy = routerProtocol.getStoragePolicy(routerFilePath);
|
||||||
|
assertEquals(HdfsConstants.HOT_STORAGE_POLICY_NAME, policy.getName());
|
||||||
|
assertEquals(HdfsConstants.HOT_STORAGE_POLICY_ID, policy.getId());
|
||||||
|
|
||||||
|
// Verify policy via NN
|
||||||
|
nnPolicy = namenode.getClient().getStoragePolicy(nnFilePath);
|
||||||
|
assertEquals(nnPolicy.getName(), policy.getName());
|
||||||
|
assertEquals(nnPolicy.getId(), policy.getId());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testProxyGetPreferedBlockSize() throws Exception {
|
public void testProxyGetPreferedBlockSize() throws Exception {
|
||||||
|
|
||||||
|
@ -0,0 +1,149 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.federation.router;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.StorageType;
|
||||||
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
|
import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
|
||||||
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.metrics.NamenodeBeanMetrics;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.sps.Context;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
|
||||||
|
import org.apache.hadoop.hdfs.server.sps.ExternalSPSContext;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test StoragePolicySatisfy through router rpc calls.
|
||||||
|
*/
|
||||||
|
public class TestRouterRpcStoragePolicySatisfier {
|
||||||
|
|
||||||
|
/** Federated HDFS cluster. */
|
||||||
|
private static MiniRouterDFSCluster cluster;
|
||||||
|
|
||||||
|
/** Client interface to the Router. */
|
||||||
|
private static ClientProtocol routerProtocol;
|
||||||
|
|
||||||
|
/** Filesystem interface to the Router. */
|
||||||
|
private static FileSystem routerFS;
|
||||||
|
/** Filesystem interface to the Namenode. */
|
||||||
|
private static FileSystem nnFS;
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void globalSetUp() throws Exception {
|
||||||
|
cluster = new MiniRouterDFSCluster(false, 1);
|
||||||
|
// Set storage types for the cluster
|
||||||
|
StorageType[][] newtypes = new StorageType[][] {
|
||||||
|
{StorageType.ARCHIVE, StorageType.DISK}};
|
||||||
|
cluster.setStorageTypes(newtypes);
|
||||||
|
|
||||||
|
Configuration conf = cluster.getNamenodes().get(0).getConf();
|
||||||
|
conf.set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
|
||||||
|
HdfsConstants.StoragePolicySatisfierMode.EXTERNAL.toString());
|
||||||
|
// Reduced refresh cycle to update latest datanodes.
|
||||||
|
conf.setLong(DFSConfigKeys.DFS_SPS_DATANODE_CACHE_REFRESH_INTERVAL_MS,
|
||||||
|
1000);
|
||||||
|
cluster.addNamenodeOverrides(conf);
|
||||||
|
|
||||||
|
cluster.setNumDatanodesPerNameservice(1);
|
||||||
|
|
||||||
|
// Start NNs and DNs and wait until ready
|
||||||
|
cluster.startCluster();
|
||||||
|
|
||||||
|
// Start routers with only an RPC service
|
||||||
|
Configuration routerConf = new RouterConfigBuilder()
|
||||||
|
.metrics()
|
||||||
|
.rpc()
|
||||||
|
.build();
|
||||||
|
// We decrease the DN cache times to make the test faster
|
||||||
|
routerConf.setTimeDuration(
|
||||||
|
RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS);
|
||||||
|
cluster.addRouterOverrides(routerConf);
|
||||||
|
cluster.startRouters();
|
||||||
|
|
||||||
|
// Register and verify all NNs with all routers
|
||||||
|
cluster.registerNamenodes();
|
||||||
|
cluster.waitNamenodeRegistration();
|
||||||
|
|
||||||
|
// Create mock locations
|
||||||
|
cluster.installMockLocations();
|
||||||
|
|
||||||
|
// Random router for this test
|
||||||
|
MiniRouterDFSCluster.RouterContext rndRouter = cluster.getRandomRouter();
|
||||||
|
|
||||||
|
routerProtocol = rndRouter.getClient().getNamenode();
|
||||||
|
routerFS = rndRouter.getFileSystem();
|
||||||
|
nnFS = cluster.getNamenodes().get(0).getFileSystem();
|
||||||
|
|
||||||
|
NameNodeConnector nnc = DFSTestUtil.getNameNodeConnector(conf,
|
||||||
|
HdfsServerConstants.MOVER_ID_PATH, 1, false);
|
||||||
|
|
||||||
|
StoragePolicySatisfier externalSps = new StoragePolicySatisfier(conf);
|
||||||
|
Context externalCtxt = new ExternalSPSContext(externalSps, nnc);
|
||||||
|
|
||||||
|
externalSps.init(externalCtxt);
|
||||||
|
externalSps.start(HdfsConstants.StoragePolicySatisfierMode.EXTERNAL);
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void tearDown() {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testStoragePolicySatisfier() throws Exception {
|
||||||
|
final String file = "/testStoragePolicySatisfierCommand";
|
||||||
|
short repl = 1;
|
||||||
|
int size = 32;
|
||||||
|
DFSTestUtil.createFile(routerFS, new Path(file), size, repl, 0);
|
||||||
|
// Varify storage type is DISK
|
||||||
|
DFSTestUtil.waitExpectedStorageType(file, StorageType.DISK, 1, 20000,
|
||||||
|
(DistributedFileSystem) routerFS);
|
||||||
|
// Set storage policy as COLD
|
||||||
|
routerProtocol
|
||||||
|
.setStoragePolicy(file, HdfsConstants.COLD_STORAGE_POLICY_NAME);
|
||||||
|
// Verify storage policy is set properly
|
||||||
|
BlockStoragePolicy storagePolicy = routerProtocol.getStoragePolicy(file);
|
||||||
|
assertEquals(HdfsConstants.COLD_STORAGE_POLICY_NAME,
|
||||||
|
storagePolicy.getName());
|
||||||
|
// Invoke satisfy storage policy
|
||||||
|
routerProtocol.satisfyStoragePolicy(file);
|
||||||
|
// Verify storage type is ARCHIVE
|
||||||
|
DFSTestUtil.waitExpectedStorageType(file, StorageType.ARCHIVE, 1, 20000,
|
||||||
|
(DistributedFileSystem) routerFS);
|
||||||
|
|
||||||
|
// Verify storage type via NN
|
||||||
|
DFSTestUtil.waitExpectedStorageType(file, StorageType.ARCHIVE, 1, 20000,
|
||||||
|
(DistributedFileSystem) nnFS);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user