HDFS-8345. Storage policy APIs must be exposed via the FileSystem interface. (Arpit Agarwal)

This commit is contained in:
Arpit Agarwal 2015-05-18 11:36:29 -07:00
parent 060c84ea86
commit a2190bf15d
16 changed files with 308 additions and 38 deletions

View File

@ -23,6 +23,7 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
@ -1221,6 +1222,32 @@ public void deleteSnapshot(final Path snapshotDir, final String snapshotName)
+ " doesn't support deleteSnapshot");
}
/**
* Set the storage policy for a given file or directory.
*
* @param path file or directory path.
* @param policyName the name of the target storage policy. The list
* of supported Storage policies can be retrieved
* via {@link #getAllStoragePolicies}.
*/
public void setStoragePolicy(final Path path, final String policyName)
throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
+ " doesn't support setStoragePolicy");
}
/**
* Retrieve all the storage policies supported by this file system.
*
* @return all storage policies supported by this filesystem.
* @throws IOException
*/
public Collection<? extends BlockStoragePolicySpi> getAllStoragePolicies()
throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
+ " doesn't support getAllStoragePolicies");
}
@Override //Object
public int hashCode() {
return myUri.hashCode();

View File

@ -0,0 +1,72 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* A storage policy specifies the placement of block replicas on specific
* storage types.
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface BlockStoragePolicySpi {
/**
* Return the name of the storage policy. Policies are uniquely
* identified by name.
*
* @return the name of the storage policy.
*/
String getName();
/**
* Return the preferred storage types associated with this policy. These
* storage types are used sequentially for successive block replicas.
*
* @return preferred storage types used for placing block replicas.
*/
StorageType[] getStorageTypes();
/**
* Get the fallback storage types for creating new block replicas. Fallback
* storage types are used if the preferred storage types are not available.
*
* @return fallback storage types for new block replicas..
*/
StorageType[] getCreationFallbacks();
/**
* Get the fallback storage types for replicating existing block replicas.
* Fallback storage types are used if the preferred storage types are not
* available.
*
* @return fallback storage types for replicating existing block replicas.
*/
StorageType[] getReplicationFallbacks();
/**
* Returns true if the policy is inherit-only and cannot be changed for
* an existing file.
*
* @return true if the policy is inherit-only.
*/
boolean isCopyOnCreateFile();
}

View File

@ -24,6 +24,7 @@
import java.net.URI;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.IdentityHashMap;
@ -2665,4 +2666,36 @@ public Void next(final AbstractFileSystem fs, final Path p)
}
}.resolve(this, absF);
}
/**
* Set the storage policy for a given file or directory.
*
* @param path file or directory path.
* @param policyName the name of the target storage policy. The list
* of supported Storage policies can be retrieved
* via {@link #getAllStoragePolicies}.
*/
public void setStoragePolicy(final Path path, final String policyName)
throws IOException {
final Path absF = fixRelativePart(path);
new FSLinkResolver<Void>() {
@Override
public Void next(final AbstractFileSystem fs, final Path p)
throws IOException {
fs.setStoragePolicy(path, policyName);
return null;
}
}.resolve(this, absF);
}
/**
* Retrieve all the storage policies supported by this file system.
*
* @return all storage policies supported by this filesystem.
* @throws IOException
*/
public Collection<? extends BlockStoragePolicySpi> getAllStoragePolicies()
throws IOException {
return defaultFS.getAllStoragePolicies();
}
}

View File

@ -26,6 +26,7 @@
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
@ -2609,6 +2610,33 @@ public void removeXAttr(Path path, String name) throws IOException {
+ " doesn't support removeXAttr");
}
/**
* Set the storage policy for a given file or directory.
*
* @param src file or directory path.
* @param policyName the name of the target storage policy. The list
* of supported Storage policies can be retrieved
* via {@link #getAllStoragePolicies}.
* @throws IOException
*/
public void setStoragePolicy(final Path src, final String policyName)
throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
+ " doesn't support setStoragePolicy");
}
/**
* Retrieve all the storage policies supported by this file system.
*
* @return all storage policies supported by this filesystem.
* @throws IOException
*/
public Collection<? extends BlockStoragePolicySpi> getAllStoragePolicies()
throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
+ " doesn't support getAllStoragePolicies");
}
// making it volatile to be able to do a double checked locking
private volatile static boolean FILE_SYSTEMS_LOADED = false;

View File

@ -21,6 +21,7 @@
import java.io.*;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collection;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
@ -619,4 +620,16 @@ public List<String> listXAttrs(Path path) throws IOException {
public void removeXAttr(Path path, String name) throws IOException {
fs.removeXAttr(path, name);
}
@Override
public void setStoragePolicy(Path src, String policyName)
throws IOException {
fs.setStoragePolicy(src, policyName);
}
@Override
public Collection<? extends BlockStoragePolicySpi> getAllStoragePolicies()
throws IOException {
return fs.getAllStoragePolicies();
}
}

View File

@ -20,6 +20,7 @@
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collection;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
@ -397,4 +398,16 @@ public void deleteSnapshot(final Path path, final String snapshotName)
throws IOException {
myFs.deleteSnapshot(path, snapshotName);
}
@Override
public void setStoragePolicy(Path path, String policyName)
throws IOException {
myFs.setStoragePolicy(path, policyName);
}
@Override
public Collection<? extends BlockStoragePolicySpi> getAllStoragePolicies()
throws IOException {
return myFs.getAllStoragePolicies();
}
}

View File

@ -20,6 +20,7 @@
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collection;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
@ -28,6 +29,7 @@
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.AbstractFileSystem;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.BlockStoragePolicySpi;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
@ -377,6 +379,18 @@ public void deleteSnapshot(Path snapshotDir, String snapshotName)
myFs.deleteSnapshot(fullPath(snapshotDir), snapshotName);
}
@Override
public void setStoragePolicy(Path path, String policyName)
throws IOException {
myFs.setStoragePolicy(fullPath(path), policyName);
}
@Override
public Collection<? extends BlockStoragePolicySpi> getAllStoragePolicies()
throws IOException {
return myFs.getAllStoragePolicies();
}
@Override
public void setVerifyChecksum(final boolean verifyChecksum)
throws IOException, UnresolvedLinkException {

View File

@ -740,6 +740,14 @@ public void deleteSnapshot(Path path, String snapshotName) throws IOException {
res.targetFileSystem.deleteSnapshot(res.remainingPath, snapshotName);
}
@Override
public void setStoragePolicy(final Path path, final String policyName)
throws IOException {
InodeTree.ResolveResult<AbstractFileSystem> res =
fsState.resolve(getUriPath(path), true);
res.targetFileSystem.setStoragePolicy(res.remainingPath, policyName);
}
/*
* An instance of this class represents an internal dir of the viewFs
* ie internal dir of the mount table.
@ -1070,5 +1078,11 @@ public void deleteSnapshot(Path path, String snapshotName)
checkPathIsSlash(path);
throw readOnlyMountTable("deleteSnapshot", path);
}
@Override
public void setStoragePolicy(Path path, String policyName)
throws IOException {
throw readOnlyMountTable("setStoragePolicy", path);
}
}
}

View File

@ -34,6 +34,7 @@
import java.io.IOException;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.Collection;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.List;
@ -205,6 +206,12 @@ public Map<String, byte[]> getXAttrs(Path path, List<String> names)
public AclStatus getAclStatus(Path path) throws IOException;
public void access(Path path, FsAction mode) throws IOException;
public void setStoragePolicy(Path src, String policyName)
throws IOException;
public Collection<? extends BlockStoragePolicySpi> getAllStoragePolicies()
throws IOException;
}
@Test

View File

@ -25,6 +25,7 @@
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.BlockStoragePolicySpi;
import org.apache.hadoop.fs.StorageType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -34,7 +35,7 @@
* for the replicas of a block.
*/
@InterfaceAudience.Private
public class BlockStoragePolicy {
public class BlockStoragePolicy implements BlockStoragePolicySpi {
public static final Logger LOG = LoggerFactory.getLogger(BlockStoragePolicy
.class);
@ -239,18 +240,22 @@ public byte getId() {
return id;
}
@Override
public String getName() {
return name;
}
@Override
public StorageType[] getStorageTypes() {
return this.storageTypes;
}
@Override
public StorageType[] getCreationFallbacks() {
return this.creationFallbacks;
}
@Override
public StorageType[] getReplicationFallbacks() {
return this.replicationFallbacks;
}
@ -265,6 +270,7 @@ private static StorageType getFallback(EnumSet<StorageType> unavailables,
return null;
}
@Override
public boolean isCopyOnCreateFile() {
return copyOnCreateFile;
}

View File

@ -567,6 +567,9 @@ Release 2.8.0 - UNRELEASED
HDFS-8157. Writes to RAM DISK reserve locked memory for block files.
(Arpit Agarwal)
HDFS-8345. Storage policy APIs must be exposed via the FileSystem
interface. (Arpit Agarwal)
OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -22,6 +22,8 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
@ -465,6 +467,17 @@ public void access(Path path, final FsAction mode) throws IOException {
dfs.checkAccess(getUriPath(path), mode);
}
@Override
public void setStoragePolicy(Path path, String policyName) throws IOException {
dfs.setStoragePolicy(getUriPath(path), policyName);
}
@Override
public Collection<? extends BlockStoragePolicySpi> getAllStoragePolicies()
throws IOException {
return Arrays.asList(dfs.getStoragePolicies());
}
/**
* Renew an existing delegation token.
*

View File

@ -23,6 +23,8 @@
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
@ -32,6 +34,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.BlockStorageLocation;
import org.apache.hadoop.fs.BlockStoragePolicySpi;
import org.apache.hadoop.fs.CacheFlag;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag;
@ -532,6 +535,7 @@ public Boolean next(final FileSystem fs, final Path p)
* @param src The source path referring to either a directory or a file.
* @param policyName The name of the storage policy.
*/
@Override
public void setStoragePolicy(final Path src, final String policyName)
throws IOException {
statistics.incrementWriteOps(1);
@ -546,19 +550,24 @@ public Void doCall(final Path p)
@Override
public Void next(final FileSystem fs, final Path p)
throws IOException {
if (fs instanceof DistributedFileSystem) {
((DistributedFileSystem) fs).setStoragePolicy(p, policyName);
fs.setStoragePolicy(p, policyName);
return null;
} else {
throw new UnsupportedOperationException(
"Cannot perform setStoragePolicy on a non-DistributedFileSystem: "
+ src + " -> " + p);
}
}
}.resolve(this, absF);
}
/** Get all the existing storage policies */
@Override
public Collection<BlockStoragePolicy> getAllStoragePolicies()
throws IOException {
return Arrays.asList(dfs.getStoragePolicies());
}
/**
* Deprecated. Prefer {@link FileSystem#getAllStoragePolicies()}
* @return
* @throws IOException
*/
@Deprecated
public BlockStoragePolicy[] getStoragePolicies() throws IOException {
statistics.incrementReadOps(1);
return dfs.getStoragePolicies();

View File

@ -27,6 +27,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.BlockStoragePolicySpi;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.HdfsConfiguration;
@ -152,8 +153,8 @@ void init() throws IOException {
}
private void initStoragePolicies() throws IOException {
BlockStoragePolicy[] policies = dispatcher.getDistributedFileSystem()
.getStoragePolicies();
Collection<BlockStoragePolicy> policies =
dispatcher.getDistributedFileSystem().getAllStoragePolicies();
for (BlockStoragePolicy policy : policies) {
this.blockStoragePolicies[policy.getId()] = policy;
}

View File

@ -30,6 +30,7 @@
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
@ -97,7 +98,7 @@ public String getLongUsage() {
public int run(Configuration conf, List<String> args) throws IOException {
final DistributedFileSystem dfs = AdminHelper.getDFS(conf);
try {
BlockStoragePolicy[] policies = dfs.getStoragePolicies();
Collection<BlockStoragePolicy> policies = dfs.getAllStoragePolicies();
System.out.println("Block Storage Policies:");
for (BlockStoragePolicy policy : policies) {
if (policy != null) {
@ -155,7 +156,7 @@ public int run(Configuration conf, List<String> args) throws IOException {
System.out.println("The storage policy of " + path + " is unspecified");
return 0;
}
BlockStoragePolicy[] policies = dfs.getStoragePolicies();
Collection<BlockStoragePolicy> policies = dfs.getAllStoragePolicies();
for (BlockStoragePolicy p : policies) {
if (p.getId() == storagePolicyId) {
System.out.println("The storage policy of " + path + ":\n" + p);

View File

@ -25,7 +25,9 @@
import java.util.*;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockStoragePolicySpi;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -1150,30 +1152,6 @@ public void testChooseTargetWithTopology() throws Exception {
Assert.assertEquals(3, targets.length);
}
/**
* Test getting all the storage policies from the namenode
*/
@Test
public void testGetAllStoragePolicies() throws Exception {
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(0).build();
cluster.waitActive();
final DistributedFileSystem fs = cluster.getFileSystem();
try {
BlockStoragePolicy[] policies = fs.getStoragePolicies();
Assert.assertEquals(6, policies.length);
Assert.assertEquals(POLICY_SUITE.getPolicy(COLD).toString(),
policies[0].toString());
Assert.assertEquals(POLICY_SUITE.getPolicy(WARM).toString(),
policies[1].toString());
Assert.assertEquals(POLICY_SUITE.getPolicy(HOT).toString(),
policies[2].toString());
} finally {
IOUtils.cleanup(null, fs);
cluster.shutdown();
}
}
@Test
public void testGetFileStoragePolicyAfterRestartNN() throws Exception {
//HDFS8219
@ -1217,4 +1195,42 @@ public void testGetFileStoragePolicyAfterRestartNN() throws Exception {
cluster.shutdown();
}
}
/**
* Verify that {@link FileSystem#getAllStoragePolicies} returns all
* known storage policies for DFS.
*
* @throws IOException
*/
@Test
public void testGetAllStoragePoliciesFromFs() throws IOException {
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(REPLICATION)
.storageTypes(
new StorageType[] {StorageType.DISK, StorageType.ARCHIVE})
.build();
try {
cluster.waitActive();
// Get policies via {@link FileSystem#getAllStoragePolicies}
Set<String> policyNamesSet1 = new HashSet<>();
for (BlockStoragePolicySpi policy :
cluster.getFileSystem().getAllStoragePolicies()) {
policyNamesSet1.add(policy.getName());
}
// Get policies from the default BlockStoragePolicySuite.
BlockStoragePolicySuite suite = BlockStoragePolicySuite.createDefaultSuite();
Set<String> policyNamesSet2 = new HashSet<>();
for (BlockStoragePolicy policy : suite.getAllPolicies()) {
policyNamesSet2.add(policy.getName());
}
// Ensure that we got the same set of policies in both cases.
Assert.assertTrue(Sets.difference(policyNamesSet1, policyNamesSet2).isEmpty());
Assert.assertTrue(Sets.difference(policyNamesSet2, policyNamesSet1).isEmpty());
} finally {
cluster.shutdown();
}
}
}