HDFS-15154. Allow only hdfs superusers the ability to assign HDFS storage policies. Contributed by Siddharth Wagle.

Change-Id: I32d6dd2837945b8fc026a759aa367c55daefe348
This commit is contained in:
Arpit Agarwal 2020-03-25 10:28:30 -07:00
parent e3fbdcbc14
commit a700803a18
No known key found for this signature in database
GPG Key ID: E4B09E903FDF9E98
6 changed files with 222 additions and 34 deletions

View File

@ -1114,6 +1114,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_STORAGE_POLICY_ENABLED_KEY = "dfs.storage.policy.enabled"; public static final String DFS_STORAGE_POLICY_ENABLED_KEY = "dfs.storage.policy.enabled";
public static final boolean DFS_STORAGE_POLICY_ENABLED_DEFAULT = true; public static final boolean DFS_STORAGE_POLICY_ENABLED_DEFAULT = true;
public static final String DFS_STORAGE_POLICY_PERMISSIONS_SUPERUSER_ONLY_KEY =
"dfs.storage.policy.permissions.superuser-only";
public static final boolean
DFS_STORAGE_POLICY_PERMISSIONS_SUPERUSER_ONLY_DEFAULT = false;
public static final String DFS_QUOTA_BY_STORAGETYPE_ENABLED_KEY = "dfs.quota.by.storage.type.enabled"; public static final String DFS_QUOTA_BY_STORAGETYPE_ENABLED_KEY = "dfs.quota.by.storage.type.enabled";
public static final boolean DFS_QUOTA_BY_STORAGETYPE_ENABLED_DEFAULT = true; public static final boolean DFS_QUOTA_BY_STORAGETYPE_ENABLED_DEFAULT = true;

View File

@ -47,7 +47,6 @@
import java.util.List; import java.util.List;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_QUOTA_BY_STORAGETYPE_ENABLED_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_QUOTA_BY_STORAGETYPE_ENABLED_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY;
public class FSDirAttrOp { public class FSDirAttrOp {
static FileStatus setPermission( static FileStatus setPermission(
@ -151,7 +150,7 @@ static boolean setReplication(
static FileStatus unsetStoragePolicy(FSDirectory fsd, FSPermissionChecker pc, static FileStatus unsetStoragePolicy(FSDirectory fsd, FSPermissionChecker pc,
BlockManager bm, String src) throws IOException { BlockManager bm, String src) throws IOException {
return setStoragePolicy(fsd, pc, bm, src, return setStoragePolicy(fsd, pc, bm, src,
HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED, "unset"); HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED);
} }
static FileStatus setStoragePolicy(FSDirectory fsd, FSPermissionChecker pc, static FileStatus setStoragePolicy(FSDirectory fsd, FSPermissionChecker pc,
@ -162,17 +161,12 @@ static FileStatus setStoragePolicy(FSDirectory fsd, FSPermissionChecker pc,
throw new HadoopIllegalArgumentException( throw new HadoopIllegalArgumentException(
"Cannot find a block policy with the name " + policyName); "Cannot find a block policy with the name " + policyName);
} }
return setStoragePolicy(fsd, pc, bm, src, policy.getId(), "set"); return setStoragePolicy(fsd, pc, bm, src, policy.getId());
} }
static FileStatus setStoragePolicy(FSDirectory fsd, FSPermissionChecker pc, static FileStatus setStoragePolicy(FSDirectory fsd, FSPermissionChecker pc,
BlockManager bm, String src, final byte policyId, final String operation) BlockManager bm, String src, final byte policyId)
throws IOException { throws IOException {
if (!fsd.isStoragePolicyEnabled()) {
throw new IOException(String.format(
"Failed to %s storage policy since %s is set to false.", operation,
DFS_STORAGE_POLICY_ENABLED_KEY));
}
INodesInPath iip; INodesInPath iip;
fsd.writeLock(); fsd.writeLock();
try { try {

View File

@ -88,8 +88,6 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_QUOTA_BY_STORAGETYPE_ENABLED_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_QUOTA_BY_STORAGETYPE_ENABLED_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_QUOTA_BY_STORAGETYPE_ENABLED_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_QUOTA_BY_STORAGETYPE_ENABLED_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.CRYPTO_XATTR_ENCRYPTION_ZONE; import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.CRYPTO_XATTR_ENCRYPTION_ZONE;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.SECURITY_XATTR_UNREADABLE_BY_SUPERUSER; import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.SECURITY_XATTR_UNREADABLE_BY_SUPERUSER;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY; import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY;
@ -188,8 +186,6 @@ private static INodeDirectory createRoot(FSNamesystem namesystem) {
// precision of access times. // precision of access times.
private final long accessTimePrecision; private final long accessTimePrecision;
// whether setStoragePolicy is allowed.
private final boolean storagePolicyEnabled;
// whether quota by storage type is allowed // whether quota by storage type is allowed
private final boolean quotaByStorageTypeEnabled; private final boolean quotaByStorageTypeEnabled;
@ -318,10 +314,6 @@ public enum DirOp {
DFS_NAMENODE_ACCESSTIME_PRECISION_KEY, DFS_NAMENODE_ACCESSTIME_PRECISION_KEY,
DFS_NAMENODE_ACCESSTIME_PRECISION_DEFAULT); DFS_NAMENODE_ACCESSTIME_PRECISION_DEFAULT);
this.storagePolicyEnabled =
conf.getBoolean(DFS_STORAGE_POLICY_ENABLED_KEY,
DFS_STORAGE_POLICY_ENABLED_DEFAULT);
this.quotaByStorageTypeEnabled = this.quotaByStorageTypeEnabled =
conf.getBoolean(DFS_QUOTA_BY_STORAGETYPE_ENABLED_KEY, conf.getBoolean(DFS_QUOTA_BY_STORAGETYPE_ENABLED_KEY,
DFS_QUOTA_BY_STORAGETYPE_ENABLED_DEFAULT); DFS_QUOTA_BY_STORAGETYPE_ENABLED_DEFAULT);
@ -438,7 +430,7 @@ HdfsFileStatus[] getReservedStatuses() {
* These statuses are solely for listing purpose. All other operations * These statuses are solely for listing purpose. All other operations
* on the reserved dirs are disallowed. * on the reserved dirs are disallowed.
* Operations on sub directories are resolved by * Operations on sub directories are resolved by
* {@link FSDirectory#resolvePath(String, byte[][], FSDirectory)} * {@link FSDirectory#resolvePath(String, FSDirectory)}
* and conducted directly, without the need to check the reserved dirs. * and conducted directly, without the need to check the reserved dirs.
* *
* This method should only be invoked once during namenode initialization. * This method should only be invoked once during namenode initialization.
@ -568,9 +560,6 @@ boolean isXattrsEnabled() {
return xattrsEnabled; return xattrsEnabled;
} }
int getXattrMaxSize() { return xattrMaxSize; } int getXattrMaxSize() { return xattrMaxSize; }
boolean isStoragePolicyEnabled() {
return storagePolicyEnabled;
}
boolean isAccessTimeSupported() { boolean isAccessTimeSupported() {
return accessTimePrecision > 0; return accessTimePrecision > 0;
} }

View File

@ -30,6 +30,9 @@
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_PERMISSIONS_SUPERUSER_ONLY_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_PERMISSIONS_SUPERUSER_ONLY_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT;
@ -92,6 +95,8 @@
import static org.apache.hadoop.hdfs.DFSUtil.isParentEntry; import static org.apache.hadoop.hdfs.DFSUtil.isParentEntry;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.text.CaseUtils;
import org.apache.hadoop.hdfs.protocol.ECTopologyVerifierResult; import org.apache.hadoop.hdfs.protocol.ECTopologyVerifierResult;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY;
@ -449,6 +454,8 @@ private void logAuditEvent(boolean succeeded,
private final int maxCorruptFileBlocksReturn; private final int maxCorruptFileBlocksReturn;
private final boolean isPermissionEnabled; private final boolean isPermissionEnabled;
private final boolean isStoragePolicyEnabled;
private final boolean isStoragePolicySuperuserOnly;
private final UserGroupInformation fsOwner; private final UserGroupInformation fsOwner;
private final String supergroup; private final String supergroup;
private final boolean standbyShouldCheckpoint; private final boolean standbyShouldCheckpoint;
@ -825,6 +832,14 @@ static FSNamesystem loadFromDisk(Configuration conf) throws IOException {
DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT); DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT);
this.isPermissionEnabled = conf.getBoolean(DFS_PERMISSIONS_ENABLED_KEY, this.isPermissionEnabled = conf.getBoolean(DFS_PERMISSIONS_ENABLED_KEY,
DFS_PERMISSIONS_ENABLED_DEFAULT); DFS_PERMISSIONS_ENABLED_DEFAULT);
this.isStoragePolicyEnabled =
conf.getBoolean(DFS_STORAGE_POLICY_ENABLED_KEY,
DFS_STORAGE_POLICY_ENABLED_DEFAULT);
this.isStoragePolicySuperuserOnly =
conf.getBoolean(DFS_STORAGE_POLICY_PERMISSIONS_SUPERUSER_ONLY_KEY,
DFS_STORAGE_POLICY_PERMISSIONS_SUPERUSER_ONLY_DEFAULT);
this.snapshotDiffReportLimit = this.snapshotDiffReportLimit =
conf.getInt(DFS_NAMENODE_SNAPSHOT_DIFF_LISTING_LIMIT, conf.getInt(DFS_NAMENODE_SNAPSHOT_DIFF_LISTING_LIMIT,
DFS_NAMENODE_SNAPSHOT_DIFF_LISTING_LIMIT_DEFAULT); DFS_NAMENODE_SNAPSHOT_DIFF_LISTING_LIMIT_DEFAULT);
@ -832,6 +847,7 @@ static FSNamesystem loadFromDisk(Configuration conf) throws IOException {
LOG.info("fsOwner = " + fsOwner); LOG.info("fsOwner = " + fsOwner);
LOG.info("supergroup = " + supergroup); LOG.info("supergroup = " + supergroup);
LOG.info("isPermissionEnabled = " + isPermissionEnabled); LOG.info("isPermissionEnabled = " + isPermissionEnabled);
LOG.info("isStoragePolicyEnabled = " + isStoragePolicyEnabled);
// block allocation has to be persisted in HA using a shared edits directory // block allocation has to be persisted in HA using a shared edits directory
// so that the standby has up-to-date namespace information // so that the standby has up-to-date namespace information
@ -2326,6 +2342,27 @@ boolean setReplication(final String src, final short replication)
return success; return success;
} }
/**
* Verify storage policies are enabled and if only super user is allowed to
* set storage policies.
*
* @param operationNameReadable Name of storage policy for exception text
* @param checkSuperUser Whether to check for super user privilege
* @throws IOException
*/
private void checkStoragePolicyEnabled(final String operationNameReadable,
boolean checkSuperUser) throws IOException {
if (!isStoragePolicyEnabled) {
throw new IOException(String.format(
"Failed to %s since %s is set to false.", operationNameReadable,
DFS_STORAGE_POLICY_ENABLED_KEY));
}
if (checkSuperUser && isStoragePolicySuperuserOnly) {
checkSuperuserPrivilege(
CaseUtils.toCamelCase(operationNameReadable, false));
}
}
/** /**
* Set the storage policy for a file or a directory. * Set the storage policy for a file or a directory.
* *
@ -2335,10 +2372,11 @@ boolean setReplication(final String src, final short replication)
*/ */
void setStoragePolicy(String src, String policyName) throws IOException { void setStoragePolicy(String src, String policyName) throws IOException {
final String operationName = "setStoragePolicy"; final String operationName = "setStoragePolicy";
FileStatus auditStat;
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
checkStoragePolicyEnabled("set storage policy", true);
final FSPermissionChecker pc = getPermissionChecker(); final FSPermissionChecker pc = getPermissionChecker();
FSPermissionChecker.setOperationType(operationName); FSPermissionChecker.setOperationType(operationName);
FileStatus auditStat;
try { try {
writeLock(); writeLock();
try { try {
@ -2366,9 +2404,12 @@ void setStoragePolicy(String src, String policyName) throws IOException {
void satisfyStoragePolicy(String src, boolean logRetryCache) void satisfyStoragePolicy(String src, boolean logRetryCache)
throws IOException { throws IOException {
final String operationName = "satisfyStoragePolicy"; final String operationName = "satisfyStoragePolicy";
checkOperation(OperationCategory.WRITE);
// make sure storage policy is enabled, otherwise
// there is no need to satisfy storage policy.
checkStoragePolicyEnabled("satisfy storage policy", false);
FileStatus auditStat; FileStatus auditStat;
validateStoragePolicySatisfy(); validateStoragePolicySatisfy();
checkOperation(OperationCategory.WRITE);
try { try {
writeLock(); writeLock();
try { try {
@ -2389,13 +2430,6 @@ void satisfyStoragePolicy(String src, boolean logRetryCache)
private void validateStoragePolicySatisfy() private void validateStoragePolicySatisfy()
throws UnsupportedActionException, IOException { throws UnsupportedActionException, IOException {
// make sure storage policy is enabled, otherwise
// there is no need to satisfy storage policy.
if (!dir.isStoragePolicyEnabled()) {
throw new IOException(String.format(
"Failed to satisfy storage policy since %s is set to false.",
DFS_STORAGE_POLICY_ENABLED_KEY));
}
// checks sps status // checks sps status
boolean disabled = (blockManager.getSPSManager() == null); boolean disabled = (blockManager.getSPSManager() == null);
if (disabled) { if (disabled) {
@ -2418,10 +2452,11 @@ private void validateStoragePolicySatisfy()
*/ */
void unsetStoragePolicy(String src) throws IOException { void unsetStoragePolicy(String src) throws IOException {
final String operationName = "unsetStoragePolicy"; final String operationName = "unsetStoragePolicy";
FileStatus auditStat;
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
checkStoragePolicyEnabled("unset storage policy", true);
final FSPermissionChecker pc = getPermissionChecker(); final FSPermissionChecker pc = getPermissionChecker();
FSPermissionChecker.setOperationType(operationName); FSPermissionChecker.setOperationType(operationName);
FileStatus auditStat;
try { try {
writeLock(); writeLock();
try { try {

View File

@ -3375,6 +3375,15 @@
</description> </description>
</property> </property>
<property>
<name>dfs.storage.policy.permissions.superuser-only</name>
<value>false</value>
<description>
Allow only superuser role to change the storage policy on files and
directories.
</description>
</property>
<property> <property>
<name>dfs.namenode.legacy-oiv-image.dir</name> <name>dfs.namenode.legacy-oiv-image.dir</name>
<value></value> <value></value>

View File

@ -0,0 +1,157 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.LambdaTestUtils;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
public class TestStoragePolicyPermissionSettings {
private static final short REPL = 1;
private static final int SIZE = 128;
private static Configuration conf;
private static MiniDFSCluster cluster;
private static DistributedFileSystem fs;
private static BlockStoragePolicySuite suite;
private static BlockStoragePolicy cold;
private static UserGroupInformation nonAdmin;
private static UserGroupInformation admin;
@BeforeClass
public static void clusterSetUp() throws IOException {
conf = new HdfsConfiguration();
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(REPL).build();
cluster.waitActive();
fs = cluster.getFileSystem();
suite = BlockStoragePolicySuite.createDefaultSuite();
cold = suite.getPolicy("COLD");
nonAdmin = UserGroupInformation.createUserForTesting(
"user1", new String[] {"test"});
admin = UserGroupInformation.createUserForTesting("user2",
new String[]{"supergroup"});
}
@AfterClass
public static void clusterShutdown() throws IOException {
if (fs != null) {
fs.close();
fs = null;
}
if (cluster != null) {
cluster.shutdown();
cluster = null;
}
}
private void setFSNameSystemFinalField(String field, boolean value)
throws NoSuchFieldException, IllegalAccessException {
Field f = FSNamesystem.class.getDeclaredField(field);
f.setAccessible(true);
Field modifiersField = Field.class.getDeclaredField("modifiers");
modifiersField.setAccessible(true);
modifiersField.setInt(f, f.getModifiers() & ~Modifier.FINAL);
f.set(cluster.getNamesystem(), value);
}
private void setStoragePolicyPermissions(boolean isStoragePolicyEnabled,
boolean isStoragePolicySuperuserOnly)
throws NoSuchFieldException, IllegalAccessException {
setFSNameSystemFinalField("isStoragePolicyEnabled", isStoragePolicyEnabled);
setFSNameSystemFinalField("isStoragePolicySuperuserOnly",
isStoragePolicySuperuserOnly);
}
@Test
public void testStoragePolicyPermissionDefault() throws Exception {
Path foo = new Path("/foo");
DFSTestUtil.createFile(fs, foo, SIZE, REPL, 0);
setStoragePolicyPermissions(true, false);
// Test default user fails
final FileSystem fileSystemNonAdmin =
DFSTestUtil.getFileSystemAs(nonAdmin, conf);
LambdaTestUtils.intercept(AccessControlException.class,
"Permission denied: user=user1",
"Only super user can set storage policy.",
() -> fileSystemNonAdmin.setStoragePolicy(foo, cold.getName()));
// widen privilege
fs.setPermission(foo, new FsPermission("777"));
assertNotEquals(fs.getStoragePolicy(foo), cold);
LambdaTestUtils.eval(
() -> fileSystemNonAdmin.setStoragePolicy(foo, cold.getName()));
assertEquals(fs.getStoragePolicy(foo), cold);
}
@Test
public void testStoragePolicyPermissionAdmins() throws Exception {
Path foo = new Path("/foo");
DFSTestUtil.createFile(fs, foo, SIZE, REPL, 0);
fs.setPermission(foo, new FsPermission("777"));
// Test only super can set storage policies
setStoragePolicyPermissions(true, true);
final FileSystem fileSystemNonAdmin =
DFSTestUtil.getFileSystemAs(nonAdmin, conf);
LambdaTestUtils.intercept(AccessControlException.class,
"Access denied for user user1. Superuser privilege is required",
"Only super user can set storage policy.",
() -> fileSystemNonAdmin.setStoragePolicy(foo, cold.getName()));
final FileSystem fileSystemAdmin =
DFSTestUtil.getFileSystemAs(admin, conf);
assertNotEquals(fs.getStoragePolicy(foo), cold);
LambdaTestUtils.eval(
() -> fileSystemAdmin.setStoragePolicy(foo, cold.getName()));
assertEquals(fs.getStoragePolicy(foo), cold);
}
@Test
public void testStoragePolicyPermissionDisabled() throws Exception {
Path foo = new Path("/foo");
DFSTestUtil.createFile(fs, foo, SIZE, REPL, 0);
fs.setPermission(foo, new FsPermission("777"));
setStoragePolicyPermissions(false, false);
final FileSystem fileSystemAdmin =
DFSTestUtil.getFileSystemAs(admin, conf);
LambdaTestUtils.intercept(IOException.class,
"Failed to set storage policy " +
"since dfs.storage.policy.enabled is set to false.",
"Storage policy settings are disabled.",
() -> fileSystemAdmin.setStoragePolicy(foo, cold.getName()));
assertEquals(suite.getDefaultPolicy(), fs.getStoragePolicy(foo));
}
}