HDFS-13077. [SPS]: Fix review comments of external storage policy satisfier. Contributed by Rakesh R.
This commit is contained in:
parent
5845c36c16
commit
d3de4fb2a0
@ -614,7 +614,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||
public static final String DFS_MOVER_MAX_NO_MOVE_INTERVAL_KEY = "dfs.mover.max-no-move-interval";
|
||||
public static final int DFS_MOVER_MAX_NO_MOVE_INTERVAL_DEFAULT = 60*1000; // One minute
|
||||
|
||||
// SPS related configurations
|
||||
// StoragePolicySatisfier (SPS) related configurations
|
||||
public static final String DFS_STORAGE_POLICY_SATISFIER_MODE_KEY =
|
||||
"dfs.storage.policy.satisfier.mode";
|
||||
public static final String DFS_STORAGE_POLICY_SATISFIER_MODE_DEFAULT =
|
||||
@ -643,6 +643,18 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||
"dfs.storage.policy.satisfier.low.max-streams.preference";
|
||||
public static final boolean DFS_STORAGE_POLICY_SATISFIER_LOW_MAX_STREAMS_PREFERENCE_DEFAULT =
|
||||
true;
|
||||
public static final String DFS_SPS_MAX_OUTSTANDING_PATHS_KEY =
|
||||
"dfs.storage.policy.satisfier.max.outstanding.paths";
|
||||
public static final int DFS_SPS_MAX_OUTSTANDING_PATHS_DEFAULT = 10000;
|
||||
|
||||
// SPS keytab configurations, by default it is disabled.
|
||||
public static final String DFS_SPS_ADDRESS_KEY =
|
||||
"dfs.storage.policy.satisfier.address";
|
||||
public static final String DFS_SPS_ADDRESS_DEFAULT= "0.0.0.0:0";
|
||||
public static final String DFS_SPS_KEYTAB_FILE_KEY =
|
||||
"dfs.storage.policy.satisfier.keytab.file";
|
||||
public static final String DFS_SPS_KERBEROS_PRINCIPAL_KEY =
|
||||
"dfs.storage.policy.satisfier.kerberos.principal";
|
||||
|
||||
public static final String DFS_DATANODE_ADDRESS_KEY = "dfs.datanode.address";
|
||||
public static final int DFS_DATANODE_DEFAULT_PORT = 9866;
|
||||
|
@ -439,6 +439,7 @@ public long getTotalECBlockGroups() {
|
||||
private final boolean storagePolicyEnabled;
|
||||
private StoragePolicySatisfierMode spsMode;
|
||||
private SPSPathIds spsPaths;
|
||||
private final int spsOutstandingPathsLimit;
|
||||
|
||||
/** Minimum live replicas needed for the datanode to be transitioned
|
||||
* from ENTERING_MAINTENANCE to IN_MAINTENANCE.
|
||||
@ -478,14 +479,16 @@ public BlockManager(final Namesystem namesystem, boolean haEnabled,
|
||||
DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY,
|
||||
DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_DEFAULT)
|
||||
* 1000L);
|
||||
|
||||
// StoragePolicySatisfier(SPS) configs
|
||||
storagePolicyEnabled =
|
||||
conf.getBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY,
|
||||
DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_DEFAULT);
|
||||
String spsModeVal =
|
||||
conf.get(
|
||||
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
|
||||
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_DEFAULT);
|
||||
String spsModeVal = conf.get(
|
||||
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
|
||||
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_DEFAULT);
|
||||
spsOutstandingPathsLimit = conf.getInt(
|
||||
DFSConfigKeys.DFS_SPS_MAX_OUTSTANDING_PATHS_KEY,
|
||||
DFSConfigKeys.DFS_SPS_MAX_OUTSTANDING_PATHS_DEFAULT);
|
||||
spsMode = StoragePolicySatisfierMode.fromString(spsModeVal);
|
||||
spsPaths = new SPSPathIds();
|
||||
sps = new StoragePolicySatisfier(conf);
|
||||
@ -5188,6 +5191,12 @@ public boolean isStoragePolicySatisfierRunning() {
|
||||
*/
|
||||
public StoragePolicySatisfyPathStatus checkStoragePolicySatisfyPathStatus(
|
||||
String path) throws IOException {
|
||||
if (spsMode != StoragePolicySatisfierMode.INTERNAL) {
|
||||
LOG.debug("Satisfier is not running inside namenode, so status "
|
||||
+ "can't be returned.");
|
||||
throw new IOException("Satisfier is not running inside namenode, "
|
||||
+ "so status can't be returned.");
|
||||
}
|
||||
return sps.checkStoragePolicySatisfyPathStatus(path);
|
||||
}
|
||||
|
||||
@ -5206,6 +5215,20 @@ public Long getNextSPSPathId() {
|
||||
return spsPaths.pollNext();
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify that satisfier queue limit exceeds allowed outstanding limit.
|
||||
*/
|
||||
public void verifyOutstandingSPSPathQLimit() throws IOException {
|
||||
long size = spsPaths.size();
|
||||
// Checking that the SPS call Q exceeds the allowed limit.
|
||||
if (spsOutstandingPathsLimit - size <= 0) {
|
||||
LOG.debug("Satisifer Q - outstanding limit:{}, current size:{}",
|
||||
spsOutstandingPathsLimit, size);
|
||||
throw new IOException("Outstanding satisfier queue limit: "
|
||||
+ spsOutstandingPathsLimit + " exceeded, try later!");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes the SPS path id from the list of sps paths.
|
||||
*/
|
||||
|
@ -45,6 +45,21 @@ final class FSDirSatisfyStoragePolicyOp {
|
||||
private FSDirSatisfyStoragePolicyOp() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Satisfy storage policy function which will add the entry to SPS call queue
|
||||
* and will perform satisfaction async way.
|
||||
*
|
||||
* @param fsd
|
||||
* fs directory
|
||||
* @param bm
|
||||
* block manager
|
||||
* @param src
|
||||
* source path
|
||||
* @param logRetryCache
|
||||
* whether to record RPC ids in editlog for retry cache rebuilding
|
||||
* @return file status info
|
||||
* @throws IOException
|
||||
*/
|
||||
static FileStatus satisfyStoragePolicy(FSDirectory fsd, BlockManager bm,
|
||||
String src, boolean logRetryCache) throws IOException {
|
||||
|
||||
|
@ -2253,28 +2253,12 @@ void satisfyStoragePolicy(String src, boolean logRetryCache)
|
||||
throws IOException {
|
||||
final String operationName = "satisfyStoragePolicy";
|
||||
FileStatus auditStat;
|
||||
validateStoragePolicySatisfy();
|
||||
checkOperation(OperationCategory.WRITE);
|
||||
writeLock();
|
||||
try {
|
||||
checkOperation(OperationCategory.WRITE);
|
||||
checkNameNodeSafeMode("Cannot satisfy storage policy for " + src);
|
||||
// make sure storage policy is enabled, otherwise
|
||||
// there is no need to satisfy storage policy.
|
||||
if (!dir.isStoragePolicyEnabled()) {
|
||||
throw new IOException(String.format(
|
||||
"Failed to satisfy storage policy since %s is set to false.",
|
||||
DFS_STORAGE_POLICY_ENABLED_KEY));
|
||||
}
|
||||
|
||||
if (!blockManager.isSPSEnabled()
|
||||
|| (blockManager.getSPSMode() == StoragePolicySatisfierMode.INTERNAL
|
||||
&& !blockManager.getStoragePolicySatisfier().isRunning())) {
|
||||
throw new UnsupportedActionException(
|
||||
"Cannot request to satisfy storage policy "
|
||||
+ "when storage policy satisfier feature has been disabled"
|
||||
+ " by admin. Seek for an admin help to enable it "
|
||||
+ "or use Mover tool.");
|
||||
}
|
||||
auditStat = FSDirSatisfyStoragePolicyOp.satisfyStoragePolicy(
|
||||
dir, blockManager, src, logRetryCache);
|
||||
} catch (AccessControlException e) {
|
||||
@ -2287,6 +2271,29 @@ void satisfyStoragePolicy(String src, boolean logRetryCache)
|
||||
logAuditEvent(true, operationName, src, null, auditStat);
|
||||
}
|
||||
|
||||
private void validateStoragePolicySatisfy()
|
||||
throws UnsupportedActionException, IOException {
|
||||
// make sure storage policy is enabled, otherwise
|
||||
// there is no need to satisfy storage policy.
|
||||
if (!dir.isStoragePolicyEnabled()) {
|
||||
throw new IOException(String.format(
|
||||
"Failed to satisfy storage policy since %s is set to false.",
|
||||
DFS_STORAGE_POLICY_ENABLED_KEY));
|
||||
}
|
||||
// checks sps status
|
||||
if (!blockManager.isSPSEnabled()
|
||||
|| (blockManager.getSPSMode() == StoragePolicySatisfierMode.INTERNAL
|
||||
&& !blockManager.getStoragePolicySatisfier().isRunning())) {
|
||||
throw new UnsupportedActionException(
|
||||
"Cannot request to satisfy storage policy "
|
||||
+ "when storage policy satisfier feature has been disabled"
|
||||
+ " by admin. Seek for an admin help to enable it "
|
||||
+ "or use Mover tool.");
|
||||
}
|
||||
// checks SPS Q has many outstanding requests.
|
||||
blockManager.verifyOutstandingSPSPathQLimit();
|
||||
}
|
||||
|
||||
/**
|
||||
* unset storage policy set for a given file or a directory.
|
||||
*
|
||||
|
@ -110,6 +110,7 @@
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.ReencryptAction;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfyPathStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
@ -2578,6 +2579,16 @@ public Long getNextSPSPathId() throws IOException {
|
||||
if (nn.isStandbyState()) {
|
||||
throw new StandbyException("Not supported by Standby Namenode.");
|
||||
}
|
||||
// Check that internal SPS service is running
|
||||
if (namesystem.getBlockManager()
|
||||
.getSPSMode() == StoragePolicySatisfierMode.INTERNAL
|
||||
&& namesystem.getBlockManager().getSPSService().isRunning()) {
|
||||
LOG.debug("SPS service is internally enabled and running inside "
|
||||
+ "namenode, so external SPS is not allowed to fetch the path Ids");
|
||||
throw new IOException("SPS service is internally enabled and running"
|
||||
+ " inside namenode, so external SPS is not allowed to fetch"
|
||||
+ " the path Ids");
|
||||
}
|
||||
return namesystem.getBlockManager().getNextSPSPathId();
|
||||
}
|
||||
|
||||
|
@ -31,7 +31,6 @@
|
||||
public class SPSPathIds {
|
||||
|
||||
// List of pending dir to satisfy the policy
|
||||
// TODO: Make this bounded queue.
|
||||
private final Queue<Long> spsDirsToBeTraveresed = new LinkedList<Long>();
|
||||
|
||||
/**
|
||||
@ -61,4 +60,11 @@ public synchronized void clear() {
|
||||
public synchronized Long pollNext() {
|
||||
return spsDirsToBeTraveresed.poll();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the size of the queue.
|
||||
*/
|
||||
public synchronized long size() {
|
||||
return spsDirsToBeTraveresed.size();
|
||||
}
|
||||
}
|
||||
|
@ -91,7 +91,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
|
||||
private int blockMovementMaxRetry;
|
||||
private Context ctxt;
|
||||
private BlockMoveTaskHandler blockMoveTaskHandler;
|
||||
private Configuration conf;
|
||||
private final Configuration conf;
|
||||
|
||||
public StoragePolicySatisfier(Configuration conf) {
|
||||
this.conf = conf;
|
||||
@ -441,8 +441,8 @@ private BlocksMovingAnalysis analyseBlocksStorageMovementsAndAssignToDN(
|
||||
liveDns, ecPolicy);
|
||||
if (blocksPaired) {
|
||||
status = BlocksMovingAnalysis.Status.BLOCKS_TARGETS_PAIRED;
|
||||
} else
|
||||
if (status != BlocksMovingAnalysis.Status.BLOCKS_TARGETS_PAIRED) {
|
||||
} else if (status !=
|
||||
BlocksMovingAnalysis.Status.BLOCKS_TARGETS_PAIRED) {
|
||||
// Check if the previous block was successfully paired. Here the
|
||||
// status will set to NO_BLOCKS_TARGETS_PAIRED only when none of the
|
||||
// blocks of a file found its eligible targets to satisfy the storage
|
||||
|
@ -175,6 +175,10 @@ public boolean hasLowRedundancyBlocks(long inodeID) {
|
||||
@Override
|
||||
public boolean checkDNSpaceForScheduling(DatanodeInfo dn, StorageType type,
|
||||
long estimatedSize) {
|
||||
// TODO: Instead of calling namenode for checking the available space, it
|
||||
// can be optimized by maintaining local cache of datanode storage report
|
||||
// and do the computations. This local cache can be refreshed per file or
|
||||
// periodic fashion.
|
||||
try {
|
||||
return nnc.getNNProtocolConnection().checkDNSpaceForScheduling(dn, type,
|
||||
estimatedSize);
|
||||
|
@ -20,6 +20,7 @@
|
||||
import static org.apache.hadoop.util.ExitUtil.terminate;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.URI;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
@ -28,6 +29,7 @@
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
@ -36,6 +38,9 @@
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||
import org.apache.hadoop.hdfs.server.namenode.sps.BlockMovementListener;
|
||||
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@ -44,20 +49,25 @@
|
||||
* This class starts and runs external SPS service.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class ExternalStoragePolicySatisfier {
|
||||
public final class ExternalStoragePolicySatisfier {
|
||||
public static final Logger LOG = LoggerFactory
|
||||
.getLogger(ExternalStoragePolicySatisfier.class);
|
||||
|
||||
private ExternalStoragePolicySatisfier() {
|
||||
// This is just a class to start and run external sps.
|
||||
}
|
||||
|
||||
/**
|
||||
* Main method to start SPS service.
|
||||
*/
|
||||
public static void main(String args[]) throws Exception {
|
||||
public static void main(String[] args) throws Exception {
|
||||
NameNodeConnector nnc = null;
|
||||
try {
|
||||
StringUtils.startupShutdownMessage(StoragePolicySatisfier.class, args,
|
||||
LOG);
|
||||
HdfsConfiguration spsConf = new HdfsConfiguration();
|
||||
//TODO : login with SPS keytab
|
||||
// login with SPS keytab
|
||||
secureLogin(spsConf);
|
||||
StoragePolicySatisfier sps = new StoragePolicySatisfier(spsConf);
|
||||
nnc = getNameNodeConnector(spsConf);
|
||||
|
||||
@ -92,6 +102,18 @@ public static void main(String args[]) throws Exception {
|
||||
}
|
||||
}
|
||||
|
||||
private static void secureLogin(Configuration conf)
|
||||
throws IOException {
|
||||
UserGroupInformation.setConfiguration(conf);
|
||||
String addr = conf.get(DFSConfigKeys.DFS_SPS_ADDRESS_KEY,
|
||||
DFSConfigKeys.DFS_SPS_ADDRESS_DEFAULT);
|
||||
InetSocketAddress socAddr = NetUtils.createSocketAddr(addr, 0,
|
||||
DFSConfigKeys.DFS_SPS_ADDRESS_KEY);
|
||||
SecurityUtil.login(conf, DFSConfigKeys.DFS_SPS_KEYTAB_FILE_KEY,
|
||||
DFSConfigKeys.DFS_SPS_KERBEROS_PRINCIPAL_KEY,
|
||||
socAddr.getHostName());
|
||||
}
|
||||
|
||||
private static NameNodeConnector getNameNodeConnector(Configuration conf)
|
||||
throws IOException, InterruptedException {
|
||||
final Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
|
||||
@ -100,7 +122,7 @@ private static NameNodeConnector getNameNodeConnector(Configuration conf)
|
||||
try {
|
||||
final List<NameNodeConnector> nncs = NameNodeConnector
|
||||
.newNameNodeConnectors(namenodes,
|
||||
StoragePolicySatisfier.class.getSimpleName(),
|
||||
ExternalStoragePolicySatisfier.class.getSimpleName(),
|
||||
externalSPSPathId, conf,
|
||||
NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS);
|
||||
return nncs.get(0);
|
||||
|
@ -134,8 +134,9 @@ public MiniDFSCluster getCluster() {
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
public void getFS() throws IOException {
|
||||
public DistributedFileSystem getFS() throws IOException {
|
||||
this.dfs = hdfsCluster.getFileSystem();
|
||||
return this.dfs;
|
||||
}
|
||||
|
||||
@After
|
||||
@ -423,9 +424,9 @@ public void testSatisfyWithExceptions() throws Exception {
|
||||
+ "for %s since %s is set to false.",
|
||||
FILE, DFS_STORAGE_POLICY_ENABLED_KEY));
|
||||
} catch (IOException e) {
|
||||
Assert.assertTrue(e.getMessage().contains(String.format(
|
||||
GenericTestUtils.assertExceptionContains(String.format(
|
||||
"Failed to satisfy storage policy since %s is set to false.",
|
||||
DFS_STORAGE_POLICY_ENABLED_KEY)));
|
||||
DFS_STORAGE_POLICY_ENABLED_KEY), e);
|
||||
}
|
||||
|
||||
hdfsCluster.getConfiguration(0).
|
||||
|
@ -17,17 +17,40 @@
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.sps;
|
||||
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTP_POLICY_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SPS_ADDRESS_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SPS_KERBEROS_PRINCIPAL_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SPS_KEYTAB_FILE_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SPS_MAX_OUTSTANDING_PATHS_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY;
|
||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.StorageType;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
|
||||
@ -39,8 +62,17 @@
|
||||
import org.apache.hadoop.hdfs.server.namenode.sps.SPSService;
|
||||
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
|
||||
import org.apache.hadoop.hdfs.server.namenode.sps.TestStoragePolicySatisfier;
|
||||
import org.apache.hadoop.http.HttpConfig;
|
||||
import org.apache.hadoop.minikdc.MiniKdc;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.authentication.util.KerberosName;
|
||||
import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* Tests the external sps service plugins.
|
||||
@ -52,6 +84,18 @@ public class TestExternalStoragePolicySatisfier
|
||||
{StorageType.DISK, StorageType.DISK},
|
||||
{StorageType.DISK, StorageType.DISK}};
|
||||
private NameNodeConnector nnc;
|
||||
private File keytabFile;
|
||||
private String principal;
|
||||
private MiniKdc kdc;
|
||||
private File baseDir;
|
||||
|
||||
@After
|
||||
public void destroy() throws Exception {
|
||||
if (kdc != null) {
|
||||
kdc.stop();
|
||||
FileUtil.fullyDelete(baseDir);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setUp() {
|
||||
@ -178,6 +222,150 @@ private NameNodeConnector getNameNodeConnector(Configuration conf)
|
||||
}
|
||||
}
|
||||
|
||||
private void initSecureConf(Configuration conf) throws Exception {
|
||||
String username = "externalSPS";
|
||||
baseDir = GenericTestUtils
|
||||
.getTestDir(TestExternalStoragePolicySatisfier.class.getSimpleName());
|
||||
FileUtil.fullyDelete(baseDir);
|
||||
Assert.assertTrue(baseDir.mkdirs());
|
||||
|
||||
Properties kdcConf = MiniKdc.createConf();
|
||||
kdc = new MiniKdc(kdcConf, baseDir);
|
||||
kdc.start();
|
||||
|
||||
SecurityUtil.setAuthenticationMethod(
|
||||
UserGroupInformation.AuthenticationMethod.KERBEROS, conf);
|
||||
UserGroupInformation.setConfiguration(conf);
|
||||
KerberosName.resetDefaultRealm();
|
||||
Assert.assertTrue("Expected configuration to enable security",
|
||||
UserGroupInformation.isSecurityEnabled());
|
||||
|
||||
keytabFile = new File(baseDir, username + ".keytab");
|
||||
String keytab = keytabFile.getAbsolutePath();
|
||||
// Windows will not reverse name lookup "127.0.0.1" to "localhost".
|
||||
String krbInstance = Path.WINDOWS ? "127.0.0.1" : "localhost";
|
||||
principal = username + "/" + krbInstance + "@" + kdc.getRealm();
|
||||
String spnegoPrincipal = "HTTP/" + krbInstance + "@" + kdc.getRealm();
|
||||
kdc.createPrincipal(keytabFile, username, username + "/" + krbInstance,
|
||||
"HTTP/" + krbInstance);
|
||||
|
||||
conf.set(DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, principal);
|
||||
conf.set(DFS_NAMENODE_KEYTAB_FILE_KEY, keytab);
|
||||
conf.set(DFS_DATANODE_KERBEROS_PRINCIPAL_KEY, principal);
|
||||
conf.set(DFS_DATANODE_KEYTAB_FILE_KEY, keytab);
|
||||
conf.set(DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY, spnegoPrincipal);
|
||||
conf.setBoolean(DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
|
||||
conf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, "authentication");
|
||||
conf.set(DFS_HTTP_POLICY_KEY, HttpConfig.Policy.HTTPS_ONLY.name());
|
||||
conf.set(DFS_NAMENODE_HTTPS_ADDRESS_KEY, "localhost:0");
|
||||
conf.set(DFS_DATANODE_HTTPS_ADDRESS_KEY, "localhost:0");
|
||||
conf.setInt(IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY, 10);
|
||||
|
||||
conf.set(DFS_SPS_ADDRESS_KEY, "localhost:0");
|
||||
conf.set(DFS_SPS_KEYTAB_FILE_KEY, keytab);
|
||||
conf.set(DFS_SPS_KERBEROS_PRINCIPAL_KEY, principal);
|
||||
|
||||
String keystoresDir = baseDir.getAbsolutePath();
|
||||
String sslConfDir = KeyStoreTestUtil
|
||||
.getClasspathDir(TestExternalStoragePolicySatisfier.class);
|
||||
KeyStoreTestUtil.setupSSLConfig(keystoresDir, sslConfDir, conf, false);
|
||||
|
||||
conf.set(DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_KEY,
|
||||
KeyStoreTestUtil.getClientSSLConfigFileName());
|
||||
conf.set(DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_KEY,
|
||||
KeyStoreTestUtil.getServerSSLConfigFileName());
|
||||
|
||||
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BLOCK_SIZE);
|
||||
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test SPS runs fine when logging in with a keytab in kerberized env. Reusing
|
||||
* testWhenStoragePolicySetToALLSSD here for basic functionality testing.
|
||||
*/
|
||||
@Test(timeout = 300000)
|
||||
public void testWithKeytabs() throws Exception {
|
||||
try {
|
||||
initSecureConf(getConf());
|
||||
final UserGroupInformation ugi = UserGroupInformation
|
||||
.loginUserFromKeytabAndReturnUGI(principal,
|
||||
keytabFile.getAbsolutePath());
|
||||
ugi.doAs(new PrivilegedExceptionAction<Void>() {
|
||||
@Override
|
||||
public Void run() throws Exception {
|
||||
// verify that sps runs Ok.
|
||||
testWhenStoragePolicySetToALLSSD();
|
||||
// verify that UGI was logged in using keytab.
|
||||
Assert.assertTrue(UserGroupInformation.isLoginKeytabBased());
|
||||
return null;
|
||||
}
|
||||
});
|
||||
} finally {
|
||||
// Reset UGI so that other tests are not affected.
|
||||
UserGroupInformation.reset();
|
||||
UserGroupInformation.setConfiguration(new Configuration());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test verifies that SPS call will throw exception if the call Q exceeds
|
||||
* OutstandingQueueLimit value.
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test(timeout = 300000)
|
||||
public void testOutstandingQueueLimitExceeds() throws Exception {
|
||||
try {
|
||||
getConf().setInt(DFS_SPS_MAX_OUTSTANDING_PATHS_KEY, 3);
|
||||
createCluster();
|
||||
List<String> files = new ArrayList<>();
|
||||
files.add(FILE);
|
||||
DistributedFileSystem fs = getFS();
|
||||
BlockManager blkMgr = getCluster().getNameNode().getNamesystem()
|
||||
.getBlockManager();
|
||||
SPSService spsService = blkMgr.getSPSService();
|
||||
spsService.stopGracefully(); // stops SPS
|
||||
|
||||
// Creates 4 more files. Send all of them for satisfying the storage
|
||||
// policy together.
|
||||
for (int i = 0; i < 3; i++) {
|
||||
String file1 = "/testOutstandingQueueLimitExceeds_" + i;
|
||||
files.add(file1);
|
||||
writeContent(file1);
|
||||
fs.satisfyStoragePolicy(new Path(file1));
|
||||
}
|
||||
String fileExceeds = "/testOutstandingQueueLimitExceeds_" + 4;
|
||||
files.add(fileExceeds);
|
||||
writeContent(fileExceeds);
|
||||
try {
|
||||
fs.satisfyStoragePolicy(new Path(fileExceeds));
|
||||
Assert.fail("Should throw exception as it exceeds "
|
||||
+ "outstanding SPS call Q limit");
|
||||
} catch (IOException ioe) {
|
||||
GenericTestUtils.assertExceptionContains(
|
||||
"Outstanding satisfier queue limit: 3 exceeded, try later!", ioe);
|
||||
}
|
||||
} finally {
|
||||
shutdownCluster();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test verifies status check when Satisfier is not running inside namenode.
|
||||
*/
|
||||
@Test(timeout = 90000)
|
||||
public void testStoragePolicySatisfyPathStatus() throws Exception {
|
||||
createCluster();
|
||||
DistributedFileSystem fs = getFS();
|
||||
try {
|
||||
fs.getClient().checkStoragePolicySatisfyPathStatus(FILE);
|
||||
Assert.fail("Should throw exception as SPS is not running inside NN!");
|
||||
} catch (IOException e) {
|
||||
GenericTestUtils.assertExceptionContains("Satisfier is not running"
|
||||
+ " inside namenode, so status can't be returned.", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This test need not run as external scan is not a batch based scanning right
|
||||
* now.
|
||||
@ -187,13 +375,6 @@ private NameNodeConnector getNameNodeConnector(Configuration conf)
|
||||
public void testBatchProcessingForSPSDirectory() throws Exception {
|
||||
}
|
||||
|
||||
/**
|
||||
* Status won't be supported for external SPS, now. So, ignoring it.
|
||||
*/
|
||||
@Ignore("Status is not supported for external SPS. So, ignoring it.")
|
||||
public void testStoragePolicySatisfyPathStatus() throws Exception {
|
||||
}
|
||||
|
||||
/**
|
||||
* This test case is more specific to internal.
|
||||
*/
|
||||
|
Loading…
Reference in New Issue
Block a user