HDFS-4985. Add storage type to the protocol and expose it in block report and block locations

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2832@1516323 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arpit Agarwal 2013-08-21 23:07:29 +00:00
parent 075995a201
commit 5e27288c1e
7 changed files with 206 additions and 19 deletions

View File

@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* Defines the types of supported storage media. The default storage
* medium is assumed to be DISK.
*
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public enum StorageType {
DISK(1),
SSD(2);
public static StorageType DEFAULT = DISK;
private final int storageType;
StorageType(int medium) {
storageType = medium;
}
public int getStorageType() {
return this.storageType;
}
}

View File

@ -19,6 +19,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.security.token.Token;
@ -34,6 +35,8 @@ public class LocatedBlock {
private ExtendedBlock b;
private long offset; // offset of the first byte of the block in the file
private DatanodeInfo[] locs;
// Storage type for each replica, if reported.
private StorageType[] storageTypes;
// corrupt flag is true if all of the replicas of a block are corrupt.
// else false. If block has few corrupt replicas, they are filtered and
// their locations are not part of this object
@ -50,6 +53,12 @@ public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs, long startOffset) {
public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs, long startOffset,
boolean corrupt) {
this(b, locs, null, startOffset, corrupt);
}
public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs,
StorageType[] storageTypes, long startOffset,
boolean corrupt) {
this.b = b;
this.offset = startOffset;
this.corrupt = corrupt;
@ -58,6 +67,7 @@ public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs, long startOffset,
} else {
this.locs = locs;
}
this.storageTypes = storageTypes;
}
public Token<BlockTokenIdentifier> getBlockToken() {
@ -75,6 +85,14 @@ public ExtendedBlock getBlock() {
public DatanodeInfo[] getLocations() {
return locs;
}
public void setStorageTypes(StorageType[] storageTypes) {
this.storageTypes = storageTypes;
}
public StorageType[] getStorageTypes() {
return storageTypes;
}
public long getStartOffset() {
return offset;

View File

@ -25,9 +25,11 @@
import java.util.EnumSet;
import java.util.List;
import com.google.common.base.Preconditions;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos;
@ -92,6 +94,7 @@
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto.Builder;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlocksProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypeProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeRegistrationProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeRegistrationProto.NamenodeRoleProto;
@ -566,6 +569,12 @@ public static LocatedBlockProto convert(LocatedBlock b) {
for (int i = 0; i < locs.length; i++) {
builder.addLocs(i, PBHelper.convert(locs[i]));
}
StorageType[] storageTypes = b.getStorageTypes();
if (storageTypes != null) {
for (int i = 0; i < storageTypes.length; ++i) {
builder.addStorageTypes(PBHelper.convertStorageType(storageTypes[i]));
}
}
return builder.setB(PBHelper.convert(b.getBlock()))
.setBlockToken(PBHelper.convert(b.getBlockToken()))
.setCorrupt(b.isCorrupt()).setOffset(b.getStartOffset()).build();
@ -578,8 +587,22 @@ public static LocatedBlock convert(LocatedBlockProto proto) {
for (int i = 0; i < locs.size(); i++) {
targets[i] = PBHelper.convert(locs.get(i));
}
List<StorageTypeProto> storageTypesList = proto.getStorageTypesList();
StorageType[] storageTypes = new StorageType[locs.size()];
// The media should correspond to targets 1:1. If not then
// ignore the media information (left as default).
if ((storageTypesList != null) &&
(storageTypesList.size() == locs.size())) {
for (int i = 0; i < storageTypesList.size(); ++i) {
storageTypes[i] = PBHelper.convertType(storageTypesList.get(i));
}
}
LocatedBlock lb = new LocatedBlock(PBHelper.convert(proto.getB()), targets,
proto.getOffset(), proto.getCorrupt());
storageTypes, proto.getOffset(), proto.getCorrupt());
lb.setBlockToken(PBHelper.convert(proto.getBlockToken()));
return lb;
}
@ -1327,11 +1350,12 @@ public static NNHAStatusHeartbeatProto convert(NNHAStatusHeartbeat hb) {
public static DatanodeStorageProto convert(DatanodeStorage s) {
return DatanodeStorageProto.newBuilder()
.setState(PBHelper.convert(s.getState()))
.setState(PBHelper.convertState(s.getState()))
.setStorageType(PBHelper.convertStorageType(s.getStorageType()))
.setStorageID(s.getStorageID()).build();
}
private static StorageState convert(State state) {
private static StorageState convertState(State state) {
switch(state) {
case READ_ONLY:
return StorageState.READ_ONLY;
@ -1341,11 +1365,34 @@ private static StorageState convert(State state) {
}
}
public static DatanodeStorage convert(DatanodeStorageProto s) {
return new DatanodeStorage(s.getStorageID(), PBHelper.convert(s.getState()));
private static StorageTypeProto convertStorageType(
StorageType type) {
switch(type) {
case DISK:
return StorageTypeProto.DISK;
case SSD:
return StorageTypeProto.SSD;
default:
Preconditions.checkState(
false,
"Failed to update StorageTypeProto with new StorageType " +
type.toString());
return StorageTypeProto.DISK;
}
}
private static State convert(StorageState state) {
public static DatanodeStorage convert(DatanodeStorageProto s) {
if (s.hasStorageType()) {
return new DatanodeStorage(s.getStorageID(),
PBHelper.convertState(s.getState()),
PBHelper.convertType(s.getStorageType()));
} else {
return new DatanodeStorage(s.getStorageID(),
PBHelper.convertState(s.getState()));
}
}
private static State convertState(StorageState state) {
switch(state) {
case READ_ONLY:
return DatanodeStorage.State.READ_ONLY;
@ -1355,6 +1402,17 @@ private static State convert(StorageState state) {
}
}
private static StorageType convertType(StorageTypeProto type) {
switch(type) {
case DISK:
return StorageType.DISK;
case SSD:
return StorageType.SSD;
default:
return StorageType.DEFAULT;
}
}
public static StorageReportProto convert(StorageReport r) {
return StorageReportProto.newBuilder()
.setBlockPoolUsed(r.getBlockPoolUsed()).setCapacity(r.getCapacity())

View File

@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hdfs.server.protocol;
import org.apache.hadoop.hdfs.StorageType;
/**
* Class captures information of a storage in Datanode.
*/
@ -29,18 +31,26 @@ public enum State {
private final String storageID;
private final State state;
private final StorageType storageType;
/**
* Create a storage with {@link State#NORMAL}.
* Create a storage with {@link State#NORMAL} and
* {@link org.apache.hadoop.hdfs.StorageType#DEFAULT}.
*
* @param storageID
*/
public DatanodeStorage(String storageID) {
this(storageID, State.NORMAL);
this(storageID, State.NORMAL, StorageType.DEFAULT);
}
public DatanodeStorage(String sid, State s) {
storageID = sid;
state = s;
this(sid, s, StorageType.DEFAULT);
}
public DatanodeStorage(String sid, State s, StorageType sm) {
this.storageID = sid;
this.state = s;
this.storageType = sm;
}
public String getStorageID() {
@ -50,4 +60,8 @@ public String getStorageID() {
public State getState() {
return state;
}
public StorageType getStorageType() {
return storageType;
}
}

View File

@ -55,6 +55,7 @@ message DatanodeStorageProto {
required string storageID = 1; // Unique identifier for the storage
optional StorageState state = 2 [default = NORMAL];
optional StorageTypeProto storageType = 3;
}
/**

View File

@ -113,6 +113,13 @@ message FsPermissionProto {
required uint32 perm = 1; // Actually a short - only 16bits used
}
/**
* Types of recognized storage media.
*/
enum StorageTypeProto {
DISK = 1;
SSD = 2;
}
/**
* A LocatedBlock gives information about a block and its location.
@ -126,6 +133,7 @@ message LocatedBlockProto {
// their locations are not part of this object
required hadoop.common.TokenProto blockToken = 5;
repeated StorageTypeProto storageTypes = 6;
}
message DataEncryptionKeyProto {

View File

@ -17,13 +17,16 @@
*/
package org.apache.hadoop.hdfs.protocolPB;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
@ -35,6 +38,7 @@
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockRecoveryCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeRegistrationProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeStorageProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockKeyProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockWithLocationsProto;
@ -57,17 +61,9 @@
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
import org.apache.hadoop.hdfs.server.protocol.*;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
import org.apache.hadoop.security.token.Token;
@ -155,6 +151,12 @@ void compare(DatanodeID dn, DatanodeID dn2) {
assertEquals(dn.getIpcPort(), dn2.getIpcPort());
}
void compare(DatanodeStorage dns1, DatanodeStorage dns2) {
assertThat(dns2.getStorageID(), is(dns1.getStorageID()));
assertThat(dns2.getState(), is(dns1.getState()));
assertThat(dns2.getStorageType(), is(dns1.getStorageType()));
}
@Test
public void testConvertBlock() {
Block b = new Block(1, 100, 3);
@ -428,6 +430,29 @@ private LocatedBlock createLocatedBlock() {
DFSTestUtil.getLocalDatanodeInfo("127.0.0.1", "h3",
AdminStates.NORMAL)
};
StorageType[] media = {
StorageType.DISK,
StorageType.SSD,
StorageType.DISK
};
LocatedBlock lb = new LocatedBlock(
new ExtendedBlock("bp12", 12345, 10, 53), dnInfos, 5, false);
lb.setBlockToken(new Token<BlockTokenIdentifier>(
"identifier".getBytes(), "password".getBytes(), new Text("kind"),
new Text("service")));
lb.setStorageTypes(media);
return lb;
}
private LocatedBlock createLocatedBlockNoStorageMedia() {
DatanodeInfo[] dnInfos = {
DFSTestUtil.getLocalDatanodeInfo("127.0.0.1", "h1",
AdminStates.DECOMMISSION_INPROGRESS),
DFSTestUtil.getLocalDatanodeInfo("127.0.0.1", "h2",
AdminStates.DECOMMISSIONED),
DFSTestUtil.getLocalDatanodeInfo("127.0.0.1", "h3",
AdminStates.NORMAL)
};
LocatedBlock lb = new LocatedBlock(
new ExtendedBlock("bp12", 12345, 10, 53), dnInfos, 5, false);
lb.setBlockToken(new Token<BlockTokenIdentifier>(
@ -444,6 +469,14 @@ public void testConvertLocatedBlock() {
compare(lb,lb2);
}
@Test
public void testConvertLocatedBlockNoStorageMedia() {
LocatedBlock lb = createLocatedBlockNoStorageMedia();
LocatedBlockProto lbProto = PBHelper.convert(lb);
LocatedBlock lb2 = PBHelper.convert(lbProto);
compare(lb,lb2);
}
@Test
public void testConvertLocatedBlockList() {
ArrayList<LocatedBlock> lbl = new ArrayList<LocatedBlock>();
@ -487,6 +520,16 @@ public void testConvertDatanodeRegistration() {
compare(reg, reg2);
assertEquals(reg.getSoftwareVersion(), reg2.getSoftwareVersion());
}
@Test
public void TestConvertDatanodeStorage() {
DatanodeStorage dns1 = new DatanodeStorage(
"id1", DatanodeStorage.State.NORMAL, StorageType.SSD);
DatanodeStorageProto proto = PBHelper.convert(dns1);
DatanodeStorage dns2 = PBHelper.convert(proto);
compare(dns1, dns2);
}
@Test
public void testConvertBlockCommand() {