From a1c9425265d2c94bfc6afb39ab2c16b4ef9e874e Mon Sep 17 00:00:00 2001 From: Vinayakumar B Date: Sat, 18 Apr 2015 23:20:45 +0530 Subject: [PATCH] HDFS-8146. Protobuf changes for BlockECRecoveryCommand and its fields for making it ready for transfer to DN (Contributed by Uma Maheswara Rao G) --- .../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt | 3 + .../hadoop/hdfs/protocolPB/PBHelper.java | 137 +++++++++++++++++- .../blockmanagement/DatanodeDescriptor.java | 31 +--- .../blockmanagement/DatanodeManager.java | 4 +- .../protocol/BlockECRecoveryCommand.java | 80 +++++++++- .../server/protocol/DatanodeProtocol.java | 2 +- .../src/main/proto/DatanodeProtocol.proto | 8 + .../src/main/proto/erasurecoding.proto | 13 ++ .../hadoop/hdfs/protocolPB/TestPBHelper.java | 88 +++++++++++ .../namenode/TestRecoverStripedBlocks.java | 10 +- 10 files changed, 335 insertions(+), 41 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt index 0ed61cd445..40517e79d6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt @@ -87,3 +87,6 @@ startup. (Hui Zheng via szetszwo) HDFS-8167. BlockManager.addBlockCollectionWithCheck should check if the block is a striped block. (Hui Zheng via zhz). + + HDFS-8146. Protobuf changes for BlockECRecoveryCommand and its fields for + making it ready for transfer to DN (Uma Maheswara Rao G via vinayakumarb) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index 9ca73ae6dd..c127b5f5c1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@ -28,6 +28,7 @@ import java.io.InputStream; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.EnumSet; import java.util.HashMap; import java.util.List; @@ -100,7 +101,7 @@ import org.apache.hadoop.hdfs.protocol.proto.AclProtos.AclEntryProto.FsActionProto; import org.apache.hadoop.hdfs.protocol.proto.AclProtos.AclStatusProto; import org.apache.hadoop.hdfs.protocol.proto.AclProtos.GetAclStatusResponseProto; -import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos; +import org.apache.hadoop.hdfs.protocol.proto.*; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveEntryProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveInfoExpirationProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveInfoProto; @@ -121,6 +122,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmSlotProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BalancerBandwidthCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto; +import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockECRecoveryCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockIdCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockRecoveryCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeCommandProto; @@ -132,11 +134,11 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.VolumeFailureSummaryProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportContextProto; +import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.BlockECRecoveryInfoProto; import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.ECInfoProto; import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.ECSchemaOptionEntryProto; import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.ECSchemaProto; import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.ECZoneInfoProto; -import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockKeyProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockStoragePolicyProto; @@ -184,7 +186,6 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypesProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageUuidsProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StripedBlockProto; -import org.apache.hadoop.hdfs.protocol.proto.InotifyProtos; import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalInfoProto; import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.GetXAttrsResponseProto; import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.ListXAttrsResponseProto; @@ -204,8 +205,10 @@ import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature; import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand; import org.apache.hadoop.hdfs.server.protocol.BlockCommand; +import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand; import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand; +import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; @@ -3150,4 +3153,132 @@ public static ECZoneInfo convertECZoneInfo(ECZoneInfoProto ecZoneInfoProto) { return new ECZoneInfo(ecZoneInfoProto.getDir(), convertECSchema(ecZoneInfoProto.getSchema())); } + + public static BlockECRecoveryInfo convertBlockECRecoveryInfo( + BlockECRecoveryInfoProto blockEcRecoveryInfoProto) { + ExtendedBlockProto blockProto = blockEcRecoveryInfoProto.getBlock(); + ExtendedBlock block = convert(blockProto); + + DatanodeInfosProto sourceDnInfosProto = blockEcRecoveryInfoProto + .getSourceDnInfos(); + DatanodeInfo[] sourceDnInfos = convert(sourceDnInfosProto); + + DatanodeInfosProto targetDnInfosProto = blockEcRecoveryInfoProto + .getTargetDnInfos(); + DatanodeInfo[] targetDnInfos = convert(targetDnInfosProto); + + StorageUuidsProto targetStorageUuidsProto = blockEcRecoveryInfoProto + .getTargetStorageUuids(); + String[] targetStorageUuids = convert(targetStorageUuidsProto); + + StorageTypesProto targetStorageTypesProto = blockEcRecoveryInfoProto + .getTargetStorageTypes(); + StorageType[] convertStorageTypes = convertStorageTypes( + targetStorageTypesProto.getStorageTypesList(), targetStorageTypesProto + .getStorageTypesList().size()); + + List liveBlockIndicesList = blockEcRecoveryInfoProto + .getLiveBlockIndicesList(); + short[] liveBlkIndices = new short[liveBlockIndicesList.size()]; + for (int i = 0; i < liveBlockIndicesList.size(); i++) { + liveBlkIndices[i] = liveBlockIndicesList.get(i).shortValue(); + } + + return new BlockECRecoveryInfo(block, sourceDnInfos, targetDnInfos, + targetStorageUuids, convertStorageTypes, liveBlkIndices); + } + + public static BlockECRecoveryInfoProto convertBlockECRecoveryInfo( + BlockECRecoveryInfo blockEcRecoveryInfo) { + BlockECRecoveryInfoProto.Builder builder = BlockECRecoveryInfoProto + .newBuilder(); + builder.setBlock(convert(blockEcRecoveryInfo.getExtendedBlock())); + + DatanodeInfo[] sourceDnInfos = blockEcRecoveryInfo.getSourceDnInfos(); + builder.setSourceDnInfos(convertToDnInfosProto(sourceDnInfos)); + + DatanodeInfo[] targetDnInfos = blockEcRecoveryInfo.getTargetDnInfos(); + builder.setTargetDnInfos(convertToDnInfosProto(targetDnInfos)); + + String[] targetStorageIDs = blockEcRecoveryInfo.getTargetStorageIDs(); + builder.setTargetStorageUuids(convertStorageIDs(targetStorageIDs)); + + StorageType[] targetStorageTypes = blockEcRecoveryInfo + .getTargetStorageTypes(); + builder.setTargetStorageTypes(convertStorageTypesProto(targetStorageTypes)); + + short[] liveBlockIndices = blockEcRecoveryInfo.getLiveBlockIndices(); + builder.addAllLiveBlockIndices(convertIntArray(liveBlockIndices)); + + return builder.build(); + } + + private static List convertIntArray(short[] liveBlockIndices) { + List liveBlockIndicesList = new ArrayList(); + for (short s : liveBlockIndices) { + liveBlockIndicesList.add((int) s); + } + return liveBlockIndicesList; + } + + private static StorageTypesProto convertStorageTypesProto( + StorageType[] targetStorageTypes) { + StorageTypesProto.Builder builder = StorageTypesProto.newBuilder(); + for (StorageType storageType : targetStorageTypes) { + builder.addStorageTypes(convertStorageType(storageType)); + } + return builder.build(); + } + + private static StorageUuidsProto convertStorageIDs(String[] targetStorageIDs) { + StorageUuidsProto.Builder builder = StorageUuidsProto.newBuilder(); + for (String storageUuid : targetStorageIDs) { + builder.addStorageUuids(storageUuid); + } + return builder.build(); + } + + private static DatanodeInfosProto convertToDnInfosProto(DatanodeInfo[] dnInfos) { + DatanodeInfosProto.Builder builder = DatanodeInfosProto.newBuilder(); + for (DatanodeInfo datanodeInfo : dnInfos) { + builder.addDatanodes(convert(datanodeInfo)); + } + return builder.build(); + } + + private static String[] convert(StorageUuidsProto targetStorageUuidsProto) { + List storageUuidsList = targetStorageUuidsProto + .getStorageUuidsList(); + String[] storageUuids = new String[storageUuidsList.size()]; + for (int i = 0; i < storageUuidsList.size(); i++) { + storageUuids[i] = storageUuidsList.get(i); + } + return storageUuids; + } + + public static BlockECRecoveryCommandProto convert( + BlockECRecoveryCommand blkECRecoveryCmd) { + BlockECRecoveryCommandProto.Builder builder = BlockECRecoveryCommandProto + .newBuilder(); + Collection blockECRecoveryInfos = blkECRecoveryCmd + .getECTasks(); + for (BlockECRecoveryInfo blkECRecoveryInfo : blockECRecoveryInfos) { + builder + .addBlockECRecoveryinfo(convertBlockECRecoveryInfo(blkECRecoveryInfo)); + } + return builder.build(); + } + + public static BlockECRecoveryCommand convert( + BlockECRecoveryCommandProto blkECRecoveryCmdProto) { + Collection blkECRecoveryInfos = new ArrayList(); + List blockECRecoveryinfoList = blkECRecoveryCmdProto + .getBlockECRecoveryinfoList(); + for (BlockECRecoveryInfoProto blockECRecoveryInfoProto : blockECRecoveryinfoList) { + blkECRecoveryInfos + .add(convertBlockECRecoveryInfo(blockECRecoveryInfoProto)); + } + return new BlockECRecoveryCommand(DatanodeProtocol.DNA_ERASURE_CODING_RECOVERY, + blkECRecoveryInfos); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java index 7ec71a27e0..35cc31b708 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java @@ -32,8 +32,8 @@ import java.util.Arrays; import com.google.common.annotations.VisibleForTesting; - import com.google.common.collect.ImmutableList; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -44,6 +44,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.namenode.CachedBlock; +import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo; import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.StorageReport; @@ -99,34 +100,6 @@ public static class BlockTargetPair { } } - /** Block and targets pair */ - @InterfaceAudience.Private - @InterfaceStability.Evolving - public static class BlockECRecoveryInfo { - public final ExtendedBlock block; - public final DatanodeDescriptor[] sources; - public final DatanodeStorageInfo[] targets; - public final short[] liveBlockIndices; - - BlockECRecoveryInfo(ExtendedBlock block, DatanodeDescriptor[] sources, - DatanodeStorageInfo[] targets, short[] liveBlockIndices) { - this.block = block; - this.sources = sources; - this.targets = targets; - this.liveBlockIndices = liveBlockIndices; - } - - @Override - public String toString() { - return new StringBuilder().append("BlockECRecoveryInfo(\n "). - append("Recovering ").append(block). - append(" From: ").append(Arrays.asList(sources)). - append(" To: ").append(Arrays.asList(targets)).append(")\n"). - append(" Block Indices: ").append(Arrays.asList(liveBlockIndices)). - toString(); - } - } - /** A BlockTargetPair queue. */ private static class BlockQueue { private final Queue blockq = new LinkedList(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index 8a78a0be99..6e84b3ed7b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -34,12 +34,12 @@ import org.apache.hadoop.hdfs.protocol.*; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair; -import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockECRecoveryInfo; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList; import org.apache.hadoop.hdfs.server.namenode.CachedBlock; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.Namesystem; import org.apache.hadoop.hdfs.server.protocol.*; +import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.net.*; @@ -1442,7 +1442,7 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg, List pendingECList = nodeinfo.getErasureCodeCommand(maxTransfers); if (pendingECList != null) { - cmds.add(new BlockECRecoveryCommand(DatanodeProtocol.DNA_CODEC, + cmds.add(new BlockECRecoveryCommand(DatanodeProtocol.DNA_ERASURE_CODING_RECOVERY, pendingECList)); } //check block invalidation diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockECRecoveryCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockECRecoveryCommand.java index f7f02fdea4..9a387dd9e0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockECRecoveryCommand.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockECRecoveryCommand.java @@ -18,10 +18,15 @@ package org.apache.hadoop.hdfs.server.protocol; import com.google.common.base.Joiner; + import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockECRecoveryInfo; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; +import java.util.Arrays; import java.util.Collection; /** @@ -60,4 +65,77 @@ public String toString() { sb.append("\n)"); return sb.toString(); } + + /** Block and targets pair */ + @InterfaceAudience.Private + @InterfaceStability.Evolving + public static class BlockECRecoveryInfo { + private final ExtendedBlock block; + private final DatanodeInfo[] sources; + private DatanodeInfo[] targets; + private String[] targetStorageIDs; + private StorageType[] targetStorageTypes; + private final short[] liveBlockIndices; + + public BlockECRecoveryInfo(ExtendedBlock block, DatanodeInfo[] sources, + DatanodeStorageInfo[] targetDnStorageInfo, short[] liveBlockIndices) { + this.block = block; + this.sources = sources; + this.targets = DatanodeStorageInfo.toDatanodeInfos(targetDnStorageInfo); + this.targetStorageIDs = DatanodeStorageInfo + .toStorageIDs(targetDnStorageInfo); + this.targetStorageTypes = DatanodeStorageInfo + .toStorageTypes(targetDnStorageInfo); + this.liveBlockIndices = liveBlockIndices; + } + + public BlockECRecoveryInfo(ExtendedBlock block, DatanodeInfo[] sources, + DatanodeInfo[] targets, String[] targetStorageIDs, + StorageType[] targetStorageTypes, short[] liveBlockIndices) { + this.block = block; + this.sources = sources; + this.targets = targets; + this.targetStorageIDs = targetStorageIDs; + this.targetStorageTypes = targetStorageTypes; + this.liveBlockIndices = liveBlockIndices; + } + + public ExtendedBlock getExtendedBlock() { + return block; + } + + public DatanodeInfo[] getSourceDnInfos() { + return sources; + } + + public DatanodeInfo[] getTargetDnInfos() { + return targets; + } + + public String[] getTargetStorageIDs() { + return targetStorageIDs; + } + + public StorageType[] getTargetStorageTypes() { + return targetStorageTypes; + } + + public short[] getLiveBlockIndices() { + return liveBlockIndices; + } + + @Override + public String toString() { + return new StringBuilder().append("BlockECRecoveryInfo(\n ") + .append("Recovering ").append(block).append(" From: ") + .append(Arrays.asList(sources)).append(" To: [") + .append(Arrays.asList(targets)).append(")\n") + .append(" Block Indices: ").append(Arrays.asList(liveBlockIndices)) + .toString(); + } + } + + public Collection getECTasks() { + return this.ecTasks; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java index b8ac165db5..1411fa9200 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java @@ -76,7 +76,7 @@ public interface DatanodeProtocol { final static int DNA_BALANCERBANDWIDTHUPDATE = 8; // update balancer bandwidth final static int DNA_CACHE = 9; // cache blocks final static int DNA_UNCACHE = 10; // uncache blocks - final static int DNA_CODEC = 11; // uncache blocks + final static int DNA_ERASURE_CODING_RECOVERY = 11; // erasure coding recovery command /** * Register Datanode. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto index 3083dc90f6..ac9ab460bd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto @@ -33,6 +33,7 @@ package hadoop.hdfs.datanode; import "HAServiceProtocol.proto"; import "hdfs.proto"; +import "erasurecoding.proto"; /** * Information to identify a datanode to a namenode @@ -144,6 +145,13 @@ message RegisterCommandProto { // void } +/** + * Block Erasure coding recovery command + */ +message BlockECRecoveryCommandProto { + repeated BlockECRecoveryInfoProto blockECRecoveryinfo = 1; +} + /** * registration - Information of the datanode registering with the namenode */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/erasurecoding.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/erasurecoding.proto index d888f71ef9..59bd9497ef 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/erasurecoding.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/erasurecoding.proto @@ -21,6 +21,7 @@ option java_outer_classname = "ErasureCodingProtos"; option java_generate_equals_and_hash = true; package hadoop.hdfs; +import "hdfs.proto"; /** * ECSchema options entry @@ -86,4 +87,16 @@ message GetECZoneInfoRequestProto { message GetECZoneInfoResponseProto { optional ECZoneInfoProto ECZoneInfo = 1; +} + +/** + * Block erasure coding recovery info + */ +message BlockECRecoveryInfoProto { + required ExtendedBlockProto block = 1; + required DatanodeInfosProto sourceDnInfos = 2; + required DatanodeInfosProto targetDnInfos = 3; + required StorageUuidsProto targetStorageUuids = 4; + required StorageTypesProto targetStorageTypes = 5; + repeated uint32 liveBlockIndices = 6; } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java index 4b42f4cd3e..4ec4ea5214 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java @@ -24,6 +24,8 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; import java.util.List; import org.apache.hadoop.fs.permission.AclEntry; @@ -40,6 +42,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto; +import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockECRecoveryCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockRecoveryCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeRegistrationProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos; @@ -63,15 +66,20 @@ import org.apache.hadoop.hdfs.security.token.block.BlockKey; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType; import org.apache.hadoop.hdfs.server.common.StorageInfo; +import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature; import org.apache.hadoop.hdfs.server.protocol.BlockCommand; +import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations; +import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; @@ -639,4 +647,84 @@ public void testAclStatusProto() { .build(); Assert.assertEquals(s, PBHelper.convert(PBHelper.convert(s))); } + + @Test + public void testBlockECRecoveryCommand() { + DatanodeInfo[] dnInfos0 = new DatanodeInfo[] { + DFSTestUtil.getLocalDatanodeInfo(), DFSTestUtil.getLocalDatanodeInfo() }; + DatanodeStorageInfo targetDnInfos_0 = BlockManagerTestUtil + .newDatanodeStorageInfo(DFSTestUtil.getLocalDatanodeDescriptor(), + new DatanodeStorage("s00")); + DatanodeStorageInfo targetDnInfos_1 = BlockManagerTestUtil + .newDatanodeStorageInfo(DFSTestUtil.getLocalDatanodeDescriptor(), + new DatanodeStorage("s01")); + DatanodeStorageInfo[] targetDnInfos0 = new DatanodeStorageInfo[] { + targetDnInfos_0, targetDnInfos_1 }; + short[] liveBlkIndices0 = new short[2]; + BlockECRecoveryInfo blkECRecoveryInfo0 = new BlockECRecoveryInfo( + new ExtendedBlock("bp1", 1234), dnInfos0, targetDnInfos0, + liveBlkIndices0); + DatanodeInfo[] dnInfos1 = new DatanodeInfo[] { + DFSTestUtil.getLocalDatanodeInfo(), DFSTestUtil.getLocalDatanodeInfo() }; + DatanodeStorageInfo targetDnInfos_2 = BlockManagerTestUtil + .newDatanodeStorageInfo(DFSTestUtil.getLocalDatanodeDescriptor(), + new DatanodeStorage("s02")); + DatanodeStorageInfo targetDnInfos_3 = BlockManagerTestUtil + .newDatanodeStorageInfo(DFSTestUtil.getLocalDatanodeDescriptor(), + new DatanodeStorage("s03")); + DatanodeStorageInfo[] targetDnInfos1 = new DatanodeStorageInfo[] { + targetDnInfos_2, targetDnInfos_3 }; + short[] liveBlkIndices1 = new short[2]; + BlockECRecoveryInfo blkECRecoveryInfo1 = new BlockECRecoveryInfo( + new ExtendedBlock("bp2", 3256), dnInfos1, targetDnInfos1, + liveBlkIndices1); + List blkRecoveryInfosList = new ArrayList(); + blkRecoveryInfosList.add(blkECRecoveryInfo0); + blkRecoveryInfosList.add(blkECRecoveryInfo1); + BlockECRecoveryCommand blkECRecoveryCmd = new BlockECRecoveryCommand( + DatanodeProtocol.DNA_ERASURE_CODING_RECOVERY, blkRecoveryInfosList); + BlockECRecoveryCommandProto blkECRecoveryCmdProto = PBHelper + .convert(blkECRecoveryCmd); + blkECRecoveryCmd = PBHelper.convert(blkECRecoveryCmdProto); + Iterator iterator = blkECRecoveryCmd.getECTasks() + .iterator(); + assertBlockECRecoveryInfoEquals(blkECRecoveryInfo0, iterator.next()); + assertBlockECRecoveryInfoEquals(blkECRecoveryInfo1, iterator.next()); + } + + private void assertBlockECRecoveryInfoEquals( + BlockECRecoveryInfo blkECRecoveryInfo1, + BlockECRecoveryInfo blkECRecoveryInfo2) { + assertEquals(blkECRecoveryInfo1.getExtendedBlock(), + blkECRecoveryInfo2.getExtendedBlock()); + + DatanodeInfo[] sourceDnInfos1 = blkECRecoveryInfo1.getSourceDnInfos(); + DatanodeInfo[] sourceDnInfos2 = blkECRecoveryInfo2.getSourceDnInfos(); + assertDnInfosEqual(sourceDnInfos1, sourceDnInfos2); + + DatanodeInfo[] targetDnInfos1 = blkECRecoveryInfo1.getTargetDnInfos(); + DatanodeInfo[] targetDnInfos2 = blkECRecoveryInfo2.getTargetDnInfos(); + assertDnInfosEqual(targetDnInfos1, targetDnInfos2); + + String[] targetStorageIDs1 = blkECRecoveryInfo1.getTargetStorageIDs(); + String[] targetStorageIDs2 = blkECRecoveryInfo2.getTargetStorageIDs(); + assertEquals(targetStorageIDs1.length, targetStorageIDs2.length); + for (int i = 0; i < targetStorageIDs1.length; i++) { + assertEquals(targetStorageIDs1[i], targetStorageIDs2[i]); + } + + short[] liveBlockIndices1 = blkECRecoveryInfo1.getLiveBlockIndices(); + short[] liveBlockIndices2 = blkECRecoveryInfo2.getLiveBlockIndices(); + for (int i = 0; i < liveBlockIndices1.length; i++) { + assertEquals(liveBlockIndices1[i], liveBlockIndices2[i]); + } + } + + private void assertDnInfosEqual(DatanodeInfo[] dnInfos1, + DatanodeInfo[] dnInfos2) { + assertEquals(dnInfos1.length, dnInfos2.length); + for (int i = 0; i < dnInfos1.length; i++) { + compare(dnInfos1[i], dnInfos2[i]); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java index ea18c3ee66..ca4fbbc99c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java @@ -29,9 +29,9 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; -import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockECRecoveryInfo; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -115,10 +115,10 @@ public void testMissingStripedBlock() throws Exception { last.getNumberOfBlocksToBeErasureCoded()); List recovery = last.getErasureCodeCommand(numBlocks); for (BlockECRecoveryInfo info : recovery) { - assertEquals(1, info.targets.length); - assertEquals(last, info.targets[0].getDatanodeDescriptor()); - assertEquals(GROUP_SIZE - 1, info.sources.length); - assertEquals(GROUP_SIZE - 1, info.liveBlockIndices.length); + assertEquals(1, info.getTargetDnInfos().length); + assertEquals(last, info.getTargetDnInfos()[0]); + assertEquals(GROUP_SIZE - 1, info.getSourceDnInfos().length); + assertEquals(GROUP_SIZE - 1, info.getLiveBlockIndices().length); } } }