diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt index 77272e7adb..faec023add 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt @@ -164,3 +164,5 @@ HDFS-8281. Erasure Coding: implement parallel stateful reading for striped layout. (jing9) + + HDFS-8137. Send the EC schema to DataNode via EC encoding/recovering command(umamahesh) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index 3cd3e03d1b..e230232767 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@ -3191,8 +3191,10 @@ public static BlockECRecoveryInfo convertBlockECRecoveryInfo( liveBlkIndices[i] = liveBlockIndicesList.get(i).shortValue(); } + ECSchema ecSchema = convertECSchema(blockEcRecoveryInfoProto.getEcSchema()); + return new BlockECRecoveryInfo(block, sourceDnInfos, targetDnInfos, - targetStorageUuids, convertStorageTypes, liveBlkIndices); + targetStorageUuids, convertStorageTypes, liveBlkIndices, ecSchema); } public static BlockECRecoveryInfoProto convertBlockECRecoveryInfo( @@ -3217,6 +3219,8 @@ public static BlockECRecoveryInfoProto convertBlockECRecoveryInfo( short[] liveBlockIndices = blockEcRecoveryInfo.getLiveBlockIndices(); builder.addAllLiveBlockIndices(convertIntArray(liveBlockIndices)); + builder.setEcSchema(convertECSchema(blockEcRecoveryInfo.getECSchema())); + return builder.build(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 6657e5b369..a3d75b5ae7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -66,7 +66,6 @@ import org.apache.hadoop.hdfs.server.blockmanagement.CorruptReplicasMap.Reason; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo.AddBlockResult; import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo; -import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.namenode.NameNode; @@ -84,7 +83,10 @@ import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; import org.apache.hadoop.hdfs.util.LightWeightLinkedSet; +import org.apache.hadoop.io.erasurecode.ECSchema; + import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getInternalBlockLength; + import org.apache.hadoop.net.Node; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Daemon; @@ -94,6 +96,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Sets; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1554,10 +1557,25 @@ int computeRecoveryWorkForBlocks(List> blocksToRecover) { if (block.isStriped()) { assert rw instanceof ErasureCodingWork; assert rw.targets.length > 0; + String src = block.getBlockCollection().getName(); + ECSchema ecSchema = null; + try { + ecSchema = namesystem.getECSchemaForPath(src); + } catch (IOException e) { + blockLog + .warn("Failed to get the EC schema for the file {} ", src); + } + if (ecSchema == null) { + blockLog.warn("No EC schema found for the file {}. " + + "So cannot proceed for recovery", src); + // TODO: we may have to revisit later for what we can do better to + // handle this case. + continue; + } rw.targets[0].getDatanodeDescriptor().addBlockToBeErasureCoded( new ExtendedBlock(namesystem.getBlockPoolId(), block), rw.srcNodes, rw.targets, - ((ErasureCodingWork) rw).liveBlockIndicies); + ((ErasureCodingWork) rw).liveBlockIndicies, ecSchema); } else { rw.srcNodes[0].addBlockToBeReplicated(block, targets); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java index 35cc31b708..83d33035bd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java @@ -29,7 +29,6 @@ import java.util.Map; import java.util.Queue; import java.util.Set; -import java.util.Arrays; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; @@ -51,6 +50,7 @@ import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; import org.apache.hadoop.hdfs.util.EnumCounters; import org.apache.hadoop.hdfs.util.LightWeightHashSet; +import org.apache.hadoop.io.erasurecode.ECSchema; import org.apache.hadoop.util.IntrusiveCollection; import org.apache.hadoop.util.Time; @@ -608,15 +608,15 @@ void addBlockToBeReplicated(Block block, DatanodeStorageInfo[] targets) { /** * Store block erasure coding work. */ - void addBlockToBeErasureCoded(ExtendedBlock block, DatanodeDescriptor[] sources, - DatanodeStorageInfo[] targets, short[] liveBlockIndices) { - assert(block != null && sources != null && sources.length > 0); + void addBlockToBeErasureCoded(ExtendedBlock block, + DatanodeDescriptor[] sources, DatanodeStorageInfo[] targets, + short[] liveBlockIndices, ECSchema ecSchema) { + assert (block != null && sources != null && sources.length > 0); BlockECRecoveryInfo task = new BlockECRecoveryInfo(block, sources, targets, - liveBlockIndices); + liveBlockIndices, ecSchema); erasurecodeBlocks.offer(task); - BlockManager.LOG.debug("Adding block recovery task " + task + - "to " + getName() + ", current queue size is " + - erasurecodeBlocks.size()); + BlockManager.LOG.debug("Adding block recovery task " + task + "to " + + getName() + ", current queue size is " + erasurecodeBlocks.size()); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 3fa88184a9..79dd3d7803 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -7581,25 +7581,9 @@ void createErasureCodingZone(final String srcArg, final ECSchema schema, */ ECInfo getErasureCodingInfo(String src) throws AccessControlException, UnresolvedLinkException, IOException { - checkOperation(OperationCategory.READ); - final byte[][] pathComponents = FSDirectory - .getPathComponentsForReservedPath(src); - final FSPermissionChecker pc = getPermissionChecker(); - readLock(); - try { - checkOperation(OperationCategory.READ); - src = dir.resolvePath(pc, src, pathComponents); - final INodesInPath iip = dir.getINodesInPath(src, true); - if (isPermissionEnabled) { - dir.checkPathAccess(pc, iip, FsAction.READ); - } - // Get schema set for the zone - ECSchema schema = dir.getECSchema(iip); - if (schema != null) { - return new ECInfo(src, schema); - } - } finally { - readUnlock(); + ECSchema schema = getECSchemaForPath(src); + if (schema != null) { + return new ECInfo(src, schema); } return null; } @@ -7841,5 +7825,26 @@ private static void enableAsyncAuditLog() { } } + @Override + public ECSchema getECSchemaForPath(String src) throws IOException { + checkOperation(OperationCategory.READ); + final byte[][] pathComponents = FSDirectory + .getPathComponentsForReservedPath(src); + final FSPermissionChecker pc = getPermissionChecker(); + readLock(); + try { + checkOperation(OperationCategory.READ); + src = dir.resolvePath(pc, src, pathComponents); + final INodesInPath iip = dir.getINodesInPath(src, true); + if (isPermissionEnabled) { + dir.checkPathAccess(pc, iip, FsAction.READ); + } + // Get schema set for the zone + return dir.getECSchema(iip); + } finally { + readUnlock(); + } + } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java index 4695c3f7db..e6c7fc0f17 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java @@ -17,12 +17,14 @@ */ package org.apache.hadoop.hdfs.server.namenode; +import java.io.IOException; + import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection; -import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction; import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory; import org.apache.hadoop.hdfs.util.RwLock; +import org.apache.hadoop.io.erasurecode.ECSchema; import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.security.AccessControlException; @@ -47,4 +49,14 @@ public interface Namesystem extends RwLock, SafeMode { public void checkOperation(OperationCategory read) throws StandbyException; public boolean isInSnapshot(BlockCollection bc); + + /** + * Gets the ECSchema for the specified path + * + * @param src + * - path + * @return ECSchema + * @throws IOException + */ + public ECSchema getECSchemaForPath(String src) throws IOException; } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockECRecoveryCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockECRecoveryCommand.java index 9a387dd9e0..61e49e933d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockECRecoveryCommand.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockECRecoveryCommand.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; +import org.apache.hadoop.io.erasurecode.ECSchema; import java.util.Arrays; import java.util.Collection; @@ -76,9 +77,11 @@ public static class BlockECRecoveryInfo { private String[] targetStorageIDs; private StorageType[] targetStorageTypes; private final short[] liveBlockIndices; + private final ECSchema ecSchema; public BlockECRecoveryInfo(ExtendedBlock block, DatanodeInfo[] sources, - DatanodeStorageInfo[] targetDnStorageInfo, short[] liveBlockIndices) { + DatanodeStorageInfo[] targetDnStorageInfo, short[] liveBlockIndices, + ECSchema ecSchema) { this.block = block; this.sources = sources; this.targets = DatanodeStorageInfo.toDatanodeInfos(targetDnStorageInfo); @@ -87,17 +90,20 @@ public BlockECRecoveryInfo(ExtendedBlock block, DatanodeInfo[] sources, this.targetStorageTypes = DatanodeStorageInfo .toStorageTypes(targetDnStorageInfo); this.liveBlockIndices = liveBlockIndices; + this.ecSchema = ecSchema; } public BlockECRecoveryInfo(ExtendedBlock block, DatanodeInfo[] sources, DatanodeInfo[] targets, String[] targetStorageIDs, - StorageType[] targetStorageTypes, short[] liveBlockIndices) { + StorageType[] targetStorageTypes, short[] liveBlockIndices, + ECSchema ecSchema) { this.block = block; this.sources = sources; this.targets = targets; this.targetStorageIDs = targetStorageIDs; this.targetStorageTypes = targetStorageTypes; this.liveBlockIndices = liveBlockIndices; + this.ecSchema = ecSchema; } public ExtendedBlock getExtendedBlock() { @@ -123,6 +129,10 @@ public StorageType[] getTargetStorageTypes() { public short[] getLiveBlockIndices() { return liveBlockIndices; } + + public ECSchema getECSchema() { + return ecSchema; + } @Override public String toString() { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/erasurecoding.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/erasurecoding.proto index 59bd9497ef..702f6fd44c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/erasurecoding.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/erasurecoding.proto @@ -99,4 +99,5 @@ message BlockECRecoveryInfoProto { required StorageUuidsProto targetStorageUuids = 4; required StorageTypesProto targetStorageTypes = 5; repeated uint32 liveBlockIndices = 6; + required ECSchemaProto ecSchema = 7; } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java index 4ec4ea5214..f580cbbebd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java @@ -24,7 +24,6 @@ import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Iterator; import java.util.List; @@ -71,8 +70,8 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType; import org.apache.hadoop.hdfs.server.common.StorageInfo; -import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature; +import org.apache.hadoop.hdfs.server.namenode.ECSchemaManager; import org.apache.hadoop.hdfs.server.protocol.BlockCommand; import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand; @@ -88,6 +87,7 @@ import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.erasurecode.ECSchema; import org.apache.hadoop.security.proto.SecurityProtos.TokenProto; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.DataChecksum; @@ -663,7 +663,7 @@ public void testBlockECRecoveryCommand() { short[] liveBlkIndices0 = new short[2]; BlockECRecoveryInfo blkECRecoveryInfo0 = new BlockECRecoveryInfo( new ExtendedBlock("bp1", 1234), dnInfos0, targetDnInfos0, - liveBlkIndices0); + liveBlkIndices0, ECSchemaManager.getSystemDefaultSchema()); DatanodeInfo[] dnInfos1 = new DatanodeInfo[] { DFSTestUtil.getLocalDatanodeInfo(), DFSTestUtil.getLocalDatanodeInfo() }; DatanodeStorageInfo targetDnInfos_2 = BlockManagerTestUtil @@ -677,7 +677,7 @@ public void testBlockECRecoveryCommand() { short[] liveBlkIndices1 = new short[2]; BlockECRecoveryInfo blkECRecoveryInfo1 = new BlockECRecoveryInfo( new ExtendedBlock("bp2", 3256), dnInfos1, targetDnInfos1, - liveBlkIndices1); + liveBlkIndices1, ECSchemaManager.getSystemDefaultSchema()); List blkRecoveryInfosList = new ArrayList(); blkRecoveryInfosList.add(blkECRecoveryInfo0); blkRecoveryInfosList.add(blkECRecoveryInfo1); @@ -718,6 +718,19 @@ private void assertBlockECRecoveryInfoEquals( for (int i = 0; i < liveBlockIndices1.length; i++) { assertEquals(liveBlockIndices1[i], liveBlockIndices2[i]); } + + ECSchema ecSchema1 = blkECRecoveryInfo1.getECSchema(); + ECSchema ecSchema2 = blkECRecoveryInfo2.getECSchema(); + // Compare ECSchemas same as default ECSchema as we used system default + // ECSchema used in this test + compareECSchemas(ECSchemaManager.getSystemDefaultSchema(), ecSchema1); + compareECSchemas(ECSchemaManager.getSystemDefaultSchema(), ecSchema2); + } + + private void compareECSchemas(ECSchema ecSchema1, ECSchema ecSchema2) { + assertEquals(ecSchema1.getSchemaName(), ecSchema2.getSchemaName()); + assertEquals(ecSchema1.getNumDataUnits(), ecSchema2.getNumDataUnits()); + assertEquals(ecSchema1.getNumParityUnits(), ecSchema2.getNumParityUnits()); } private void assertDnInfosEqual(DatanodeInfo[] dnInfos1,