HDFS-6670. Add block storage policy support with default HOT, WARM and COLD policies.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-6584@1610529 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2014-07-14 21:11:22 +00:00
parent 6f41baa623
commit 4f1fae88f8
9 changed files with 432 additions and 2 deletions

View File

@ -5,6 +5,9 @@ HDFS-6584: Archival Storage
HDFS-6677. Change INodeFile and FSImage to support storage policy ID. HDFS-6677. Change INodeFile and FSImage to support storage policy ID.
(szetszwo) (szetszwo)
HDFS-6670. Add block storage policy support with default HOT, WARM and COLD
policies. (szetszwo)
Trunk (Unreleased) Trunk (Unreleased)
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -0,0 +1,175 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import java.util.Arrays;
import java.util.EnumSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
/**
* A block storage policy describes how to select the storage types
* for the replicas of a block.
*/
@InterfaceAudience.Private
public class BlockStoragePolicy {
public static final Log LOG = LogFactory.getLog(BlockStoragePolicy.class);
public static final String DFS_BLOCK_STORAGE_POLICIES_KEY
= "dfs.block.storage.policies";
public static final String DFS_BLOCK_STORAGE_POLICY_KEY_PREFIX
= "dfs.block.storage.policy.";
public static final String DFS_BLOCK_STORAGE_POLICY_CREATION_FALLBACK_KEY_PREFIX
= "dfs.block.storage.policy.creation-fallback.";
public static final String DFS_BLOCK_STORAGE_POLICY_REPLICATION_FALLBACK_KEY_PREFIX
= "dfs.block.storage.policy.replication-fallback.";
public static final int ID_BIT_LENGTH = 4;
public static final int ID_MAX = (1 << ID_BIT_LENGTH) - 1;
/** A 4-bit policy ID */
private final byte id;
/** Policy name */
private final String name;
/** The storage types to store the replicas of a new block. */
private final StorageType[] storageTypes;
/** The fallback storage type for block creation. */
private final StorageType[] creationFallbacks;
/** The fallback storage type for replication. */
private final StorageType[] replicationFallbacks;
BlockStoragePolicy(byte id, String name, StorageType[] storageTypes,
StorageType[] creationFallbacks, StorageType[] replicationFallbacks) {
this.id = id;
this.name = name;
this.storageTypes = storageTypes;
this.creationFallbacks = creationFallbacks;
this.replicationFallbacks = replicationFallbacks;
}
/**
* @return a list of {@link StorageType}s for storing the replicas of a block.
*/
StorageType[] getStoragteTypes(short replication) {
final StorageType[] types = new StorageType[replication];
int i = 0;
for(; i < types.length && i < storageTypes.length; i++) {
types[i] = storageTypes[i];
}
final StorageType last = storageTypes[storageTypes.length - 1];
for(; i < types.length; i++) {
types[i] = last;
}
return types;
}
/** @return the fallback {@link StorageType} for creation. */
StorageType getCreationFallback(EnumSet<StorageType> unavailables) {
return getFallback(unavailables, creationFallbacks);
}
/** @return the fallback {@link StorageType} for replication. */
StorageType getReplicationFallback(EnumSet<StorageType> unavailables) {
return getFallback(unavailables, replicationFallbacks);
}
@Override
public String toString() {
return getClass().getSimpleName() + "{" + name + ":" + id
+ ", storageTypes=" + Arrays.asList(storageTypes)
+ ", creationFallbacks=" + Arrays.asList(creationFallbacks)
+ ", replicationFallbacks=" + Arrays.asList(replicationFallbacks);
}
private static StorageType getFallback(EnumSet<StorageType> unavailables,
StorageType[] fallbacks) {
for(StorageType fb : fallbacks) {
if (!unavailables.contains(fb)) {
return fb;
}
}
return null;
}
private static byte parseID(String string) {
final byte id = Byte.parseByte(string);
if (id < 1) {
throw new IllegalArgumentException(
"Invalid block storage policy ID: id = " + id + " < 1");
}
if (id > 15) {
throw new IllegalArgumentException(
"Invalid block storage policy ID: id = " + id + " > MAX = " + ID_MAX);
}
return id;
}
private static StorageType[] parseStorageTypes(String[] strings) {
if (strings == null) {
return StorageType.EMPTY_ARRAY;
}
final StorageType[] types = new StorageType[strings.length];
for(int i = 0; i < types.length; i++) {
types[i] = StorageType.valueOf(strings[i].trim().toUpperCase());
}
return types;
}
private static StorageType[] readStorageTypes(byte id, String keyPrefix,
Configuration conf) {
final String[] values = conf.getStrings(keyPrefix + id);
return parseStorageTypes(values);
}
public static BlockStoragePolicy readBlockStoragePolicy(byte id, String name,
Configuration conf) {
final StorageType[] storageTypes = readStorageTypes(id,
DFS_BLOCK_STORAGE_POLICY_KEY_PREFIX, conf);
final StorageType[] creationFallbacks = readStorageTypes(id,
DFS_BLOCK_STORAGE_POLICY_CREATION_FALLBACK_KEY_PREFIX, conf);
final StorageType[] replicationFallbacks = readStorageTypes(id,
DFS_BLOCK_STORAGE_POLICY_REPLICATION_FALLBACK_KEY_PREFIX, conf);
return new BlockStoragePolicy(id, name, storageTypes, creationFallbacks,
replicationFallbacks);
}
public static BlockStoragePolicy[] readBlockStoragePolicies(
Configuration conf) {
final BlockStoragePolicy[] policies = new BlockStoragePolicy[ID_MAX + 1];
final String[] values = conf.getStrings(DFS_BLOCK_STORAGE_POLICIES_KEY);
for(String v : values) {
v = v.trim();
final int i = v.indexOf(':');
final String name = v.substring(0, i);
final byte id = parseID(v.substring(i + 1));
if (policies[id] != null) {
throw new IllegalArgumentException(
"Policy duplication: ID " + id + " appears more than once in "
+ DFS_BLOCK_STORAGE_POLICIES_KEY);
}
policies[id] = readBlockStoragePolicy(id, name, conf);
LOG.info(policies[id]);
}
return policies;
}
}

View File

@ -421,6 +421,15 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final Class<BlockPlacementPolicyDefault> DFS_BLOCK_REPLICATOR_CLASSNAME_DEFAULT = BlockPlacementPolicyDefault.class; public static final Class<BlockPlacementPolicyDefault> DFS_BLOCK_REPLICATOR_CLASSNAME_DEFAULT = BlockPlacementPolicyDefault.class;
public static final String DFS_REPLICATION_MAX_KEY = "dfs.replication.max"; public static final String DFS_REPLICATION_MAX_KEY = "dfs.replication.max";
public static final int DFS_REPLICATION_MAX_DEFAULT = 512; public static final int DFS_REPLICATION_MAX_DEFAULT = 512;
public static final String DFS_BLOCK_STORAGE_POLICIES_KEY
= BlockStoragePolicy.DFS_BLOCK_STORAGE_POLICIES_KEY;
public static final String DFS_BLOCK_STORAGE_POLICY_KEY_PREFIX
= BlockStoragePolicy.DFS_BLOCK_STORAGE_POLICY_KEY_PREFIX;
public static final String DFS_BLOCK_STORAGE_POLICY_CREATION_FALLBACK_KEY_PREFIX
= BlockStoragePolicy.DFS_BLOCK_STORAGE_POLICY_CREATION_FALLBACK_KEY_PREFIX;
public static final String DFS_BLOCK_STORAGE_POLICY_REPLICATION_FALLBACK_KEY_PREFIX
= BlockStoragePolicy.DFS_BLOCK_STORAGE_POLICY_REPLICATION_FALLBACK_KEY_PREFIX;
public static final String DFS_DF_INTERVAL_KEY = "dfs.df.interval"; public static final String DFS_DF_INTERVAL_KEY = "dfs.df.interval";
public static final int DFS_DF_INTERVAL_DEFAULT = 60000; public static final int DFS_DF_INTERVAL_DEFAULT = 60000;
public static final String DFS_BLOCKREPORT_INTERVAL_MSEC_KEY = "dfs.blockreport.intervalMsec"; public static final String DFS_BLOCKREPORT_INTERVAL_MSEC_KEY = "dfs.blockreport.intervalMsec";

View File

@ -29,7 +29,10 @@
@InterfaceStability.Unstable @InterfaceStability.Unstable
public enum StorageType { public enum StorageType {
DISK, DISK,
SSD; SSD,
ARCHIVE;
public static final StorageType DEFAULT = DISK; public static final StorageType DEFAULT = DISK;
public static final StorageType[] EMPTY_ARRAY = {};
} }

View File

@ -1612,6 +1612,8 @@ private static StorageTypeProto convertStorageType(
return StorageTypeProto.DISK; return StorageTypeProto.DISK;
case SSD: case SSD:
return StorageTypeProto.SSD; return StorageTypeProto.SSD;
case ARCHIVE:
return StorageTypeProto.ARCHIVE;
default: default:
throw new IllegalStateException( throw new IllegalStateException(
"BUG: StorageType not found, type=" + type); "BUG: StorageType not found, type=" + type);
@ -1640,6 +1642,8 @@ private static StorageType convertType(StorageTypeProto type) {
return StorageType.DISK; return StorageType.DISK;
case SSD: case SSD:
return StorageType.SSD; return StorageType.SSD;
case ARCHIVE:
return StorageType.ARCHIVE;
default: default:
throw new IllegalStateException( throw new IllegalStateException(
"BUG: StorageTypeProto not found, type=" + type); "BUG: StorageTypeProto not found, type=" + type);

View File

@ -134,6 +134,7 @@ message FsPermissionProto {
enum StorageTypeProto { enum StorageTypeProto {
DISK = 1; DISK = 1;
SSD = 2; SSD = 2;
ARCHIVE = 3;
} }
/** /**

View File

@ -0,0 +1,117 @@
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!-- Do not modify this file directly. Instead, copy entries that you wish -->
<!-- to modify from this file into blockStoragePolicy-site.xml and change -->
<!-- there. If blockStoragePolicy-site.xml does not exist, create it. -->
<configuration>
<property>
<name>dfs.block.storage.policies</name>
<value>HOT:12, WARM:8, COLD:4</value>
<description>
A list of block storage policy names and IDs. The syntax is
NAME_1:ID_1, NAME_2:ID_2, ..., NAME_n:ID_n
where ID is an integer in the range [1,15] and NAME is case insensitive.
</description>
</property>
<!-- Block Storage Policy HOT:12 -->
<property>
<name>dfs.block.storage.policy.12</name>
<value>DISK</value>
<description>
A list of storage types for storing the block replicas such as
STORAGE_TYPE_1, STORAGE_TYPE_2, ..., STORAGE_TYPE_n
When creating a block, the i-th replica is stored using i-th storage type
for i less than or equal to n, and
the j-th replica is stored using n-th storage type for j greater than n.
The value cannot specified as an empty list.
Examples:
DISK : all replicas stored using DISK.
DISK, ARCHIVE : the first replica is stored using DISK and all the
remaining are stored using ARCHIVE.
</description>
</property>
<property>
<name>dfs.block.storage.policy.creation-fallback.12</name>
<value></value>
<description>
A list of storage types for creation fallback storage.
STORAGE_TYPE_1, STORAGE_TYPE_2, ..., STORAGE_TYPE_n
When creating a block, if a particular storage type specified in the policy
is unavailable, the fallback STORAGE_TYPE_1 is used. Further, if
STORAGE_TYPE_i is also unavailable, the fallback STORAGE_TYPE_(i+1) is used.
In case that all fallback storages are unavailabe, the block will be created
with number of replicas less than the specified replication factor.
An empty list indicates that there is no fallback storage.
</description>
</property>
<property>
<name>dfs.block.storage.policy.replication-fallback.12</name>
<value>ARCHIVE</value>
<description>
Similar to dfs.block.storage.policy.creation-fallback.x but for replication.
</description>
</property>
<!-- Block Storage Policy WARM:8 -->
<property>
<name>dfs.block.storage.policy.8</name>
<value>DISK, ARCHIVE</value>
</property>
<property>
<name>dfs.block.storage.policy.creation-fallback.8</name>
<value>DISK, ARCHIVE</value>
</property>
<property>
<name>dfs.block.storage.policy.replication-fallback.8</name>
<value>DISK, ARCHIVE</value>
</property>
<!-- Block Storage Policy COLD:4 -->
<property>
<name>dfs.block.storage.policy.4</name>
<value>ARCHIVE</value>
</property>
<property>
<name>dfs.block.storage.policy.creation-fallback.4</name>
<value></value>
</property>
<property>
<name>dfs.block.storage.policy.replication-fallback.4</name>
<value></value>
</property>
</configuration>

View File

@ -22,7 +22,8 @@
<!-- wish to modify from this file into hdfs-site.xml and change them --> <!-- wish to modify from this file into hdfs-site.xml and change them -->
<!-- there. If hdfs-site.xml does not already exist, create it. --> <!-- there. If hdfs-site.xml does not already exist, create it. -->
<configuration> <configuration xmlns:xi="http://www.w3.org/2001/XInclude">
<xi:include href="blockStoragePolicy-default.xml" />
<property> <property>
<name>hadoop.hdfs.configuration.version</name> <name>hadoop.hdfs.configuration.version</name>

View File

@ -0,0 +1,117 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.junit.Assert;
import org.junit.Test;
/** Test {@link BlockStoragePolicy} */
public class TestBlockStoragePolicy {
static final EnumSet<StorageType> none = EnumSet.noneOf(StorageType.class);
static final EnumSet<StorageType> archive = EnumSet.of(StorageType.ARCHIVE);
static final EnumSet<StorageType> disk = EnumSet.of(StorageType.DISK);
static final EnumSet<StorageType> both = EnumSet.of(StorageType.DISK, StorageType.ARCHIVE);
static {
HdfsConfiguration.init();
}
@Test
public void testDefaultPolicies() throws Exception {
final byte COLD = (byte)4;
final byte WARM = (byte)8;
final byte HOT = (byte)12;
final Map<Byte, String> expectedPolicyStrings = new HashMap<Byte, String>();
expectedPolicyStrings.put(COLD,
"BlockStoragePolicy{COLD:4, storageTypes=[ARCHIVE], creationFallbacks=[], replicationFallbacks=[]");
expectedPolicyStrings.put(WARM,
"BlockStoragePolicy{WARM:8, storageTypes=[DISK, ARCHIVE], creationFallbacks=[DISK, ARCHIVE], replicationFallbacks=[DISK, ARCHIVE]");
expectedPolicyStrings.put(HOT,
"BlockStoragePolicy{HOT:12, storageTypes=[DISK], creationFallbacks=[], replicationFallbacks=[ARCHIVE]");
final Configuration conf = new Configuration();
final BlockStoragePolicy[] policies = BlockStoragePolicy.readBlockStoragePolicies(conf);
for(int i = 0; i < policies.length; i++) {
if (policies[i] != null) {
final String s = policies[i].toString();
Assert.assertEquals(expectedPolicyStrings.get((byte)i), s);
}
}
{ // check Cold policy
final BlockStoragePolicy cold = policies[COLD];
for(short replication = 1; replication < 6; replication++) {
final StorageType[] computed = cold.getStoragteTypes(replication);
assertStorageType(computed, replication, StorageType.ARCHIVE);
}
assertCreationFallback(cold, null, null, null);
assertReplicationFallback(cold, null, null, null);
}
{ // check Warm policy
final BlockStoragePolicy warm = policies[WARM];
for(short replication = 1; replication < 6; replication++) {
final StorageType[] computed = warm.getStoragteTypes(replication);
assertStorageType(computed, replication, StorageType.DISK, StorageType.ARCHIVE);
}
assertCreationFallback(warm, StorageType.DISK, StorageType.DISK, StorageType.ARCHIVE);
assertReplicationFallback(warm, StorageType.DISK, StorageType.DISK, StorageType.ARCHIVE);
}
{ // check Hot policy
final BlockStoragePolicy hot = policies[HOT];
for(short replication = 1; replication < 6; replication++) {
final StorageType[] computed = hot.getStoragteTypes(replication);
assertStorageType(computed, replication, StorageType.DISK);
}
assertCreationFallback(hot, null, null, null);
assertReplicationFallback(hot, StorageType.ARCHIVE, null, StorageType.ARCHIVE);
}
}
static void assertStorageType(StorageType[] computed, short replication,
StorageType... answers) {
Assert.assertEquals(replication, computed.length);
final StorageType last = answers[answers.length - 1];
for(int i = 0; i < computed.length; i++) {
final StorageType expected = i < answers.length? answers[i]: last;
Assert.assertEquals(expected, computed[i]);
}
}
static void assertCreationFallback(BlockStoragePolicy policy, StorageType noneExpected,
StorageType archiveExpected, StorageType diskExpected) {
Assert.assertEquals(noneExpected, policy.getCreationFallback(none));
Assert.assertEquals(archiveExpected, policy.getCreationFallback(archive));
Assert.assertEquals(diskExpected, policy.getCreationFallback(disk));
Assert.assertEquals(null, policy.getCreationFallback(both));
}
static void assertReplicationFallback(BlockStoragePolicy policy, StorageType noneExpected,
StorageType archiveExpected, StorageType diskExpected) {
Assert.assertEquals(noneExpected, policy.getReplicationFallback(none));
Assert.assertEquals(archiveExpected, policy.getReplicationFallback(archive));
Assert.assertEquals(diskExpected, policy.getReplicationFallback(disk));
Assert.assertEquals(null, policy.getReplicationFallback(both));
}
}