From 922631f04f4bc79f4ceb17958030f80beb4f0576 Mon Sep 17 00:00:00 2001 From: Zhe Zhang Date: Mon, 20 Apr 2015 14:19:12 -0700 Subject: [PATCH] HDFS-8188. Erasure coding: refactor client-related code to sync with HDFS-8082 and HDFS-8169. Contributed by Zhe Zhang. --- .../hdfs/client/HdfsClientConfigKeys.java | 12 +++++++++ .../hdfs/protocol/LocatedStripedBlock.java | 9 ------- .../org/apache/hadoop/hdfs/DFSClient.java | 19 ++++---------- .../hdfs/client/impl/DfsClientConf.java | 21 ++++++++++++++-- .../server/blockmanagement/BlockManager.java | 25 ++++++++++++------- .../server/namenode/TestStripedINodeFile.java | 3 ++- 6 files changed, 54 insertions(+), 35 deletions(-) rename hadoop-hdfs-project/{hadoop-hdfs => hadoop-hdfs-client}/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedStripedBlock.java (84%) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java index 26283aade8..6006d71faf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java @@ -177,6 +177,18 @@ interface HedgedRead { int THREADPOOL_SIZE_DEFAULT = 0; } + /** dfs.client.read.striped configuration properties */ + interface StripedRead { + String PREFIX = Read.PREFIX + "striped."; + + String THREADPOOL_SIZE_KEY = PREFIX + "threadpool.size"; + /** + * With default 6+3 schema, each normal read could span 6 DNs. So this + * default value accommodates 3 read streams + */ + int THREADPOOL_SIZE_DEFAULT = 18; + } + /** dfs.http.client configuration properties */ interface HttpClient { String PREFIX = "dfs.http.client."; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedStripedBlock.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedStripedBlock.java similarity index 84% rename from hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedStripedBlock.java rename to hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedStripedBlock.java index 98614db446..93a5948007 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedStripedBlock.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedStripedBlock.java @@ -20,7 +20,6 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.StorageType; -import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; import java.util.Arrays; @@ -43,14 +42,6 @@ public LocatedStripedBlock(ExtendedBlock b, DatanodeInfo[] locs, System.arraycopy(indices, 0, blockIndices, 0, indices.length); } - public LocatedStripedBlock(ExtendedBlock b, DatanodeStorageInfo[] storages, - int[] indices, long startOffset, boolean corrupt) { - this(b, DatanodeStorageInfo.toDatanodeInfos(storages), - DatanodeStorageInfo.toStorageIDs(storages), - DatanodeStorageInfo.toStorageTypes(storages), indices, - startOffset, corrupt, EMPTY_LOCS); - } - @Override public String toString() { return getClass().getSimpleName() + "{" + getBlock() diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 9c17a4ef17..0a67cf7550 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -382,21 +382,12 @@ public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode, dfsClientConf); if (dfsClientConf.getHedgedReadThreadpoolSize() > 0) { - this.initThreadsNumForHedgedReads(dfsClientConf.getHedgedReadThreadpoolSize()); + this.initThreadsNumForHedgedReads(dfsClientConf. + getHedgedReadThreadpoolSize()); } - numThreads = conf.getInt( - DFSConfigKeys.DFS_CLIENT_STRIPED_READ_THREADPOOL_MAX_SIZE, - DFSConfigKeys.DFS_CLIENT_STRIPED_READ_THREADPOOL_MAX_DEFAULT_SIZE); - if (numThreads <= 0) { - LOG.warn("The value of " - + DFSConfigKeys.DFS_CLIENT_STRIPED_READ_THREADPOOL_MAX_SIZE - + " must be greater than 0. The current setting is " + numThreads - + ". Reset it to the default value " - + DFSConfigKeys.DFS_CLIENT_STRIPED_READ_THREADPOOL_MAX_DEFAULT_SIZE); - numThreads = - DFSConfigKeys.DFS_CLIENT_STRIPED_READ_THREADPOOL_MAX_DEFAULT_SIZE; - } - this.initThreadsNumForStripedReads(numThreads); + + this.initThreadsNumForStripedReads(dfsClientConf. + getStripedReadThreadpoolSize()); this.saslClient = new SaslDataTransferClient( conf, DataTransferSaslUtil.getSaslPropertiesResolver(conf), TrustedChannelResolver.getInstance(conf), nnFallbackToSimpleAuth); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java index a257e32bed..32a3da0a7c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java @@ -38,6 +38,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY; +import com.google.common.base.Preconditions; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.HadoopIllegalArgumentException; @@ -101,6 +102,8 @@ public class DfsClientConf { private final long hedgedReadThresholdMillis; private final int hedgedReadThreadpoolSize; + private final int stripedReadThreadpoolSize; + public DfsClientConf(Configuration conf) { // The hdfsTimeout is currently the same as the ipc timeout hdfsTimeout = Client.getTimeout(conf); @@ -191,7 +194,7 @@ public DfsClientConf(Configuration conf) { connectToDnViaHostname = conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME, DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT); hdfsBlocksMetadataEnabled = conf.getBoolean( - DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, + DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT); fileBlockStorageLocationsNumThreads = conf.getInt( DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS, @@ -215,6 +218,13 @@ public DfsClientConf(Configuration conf) { hedgedReadThreadpoolSize = conf.getInt( HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_KEY, HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_DEFAULT); + + stripedReadThreadpoolSize = conf.getInt( + HdfsClientConfigKeys.StripedRead.THREADPOOL_SIZE_KEY, + HdfsClientConfigKeys.StripedRead.THREADPOOL_SIZE_DEFAULT); + Preconditions.checkArgument(stripedReadThreadpoolSize > 0, "The value of " + + HdfsClientConfigKeys.StripedRead.THREADPOOL_SIZE_KEY + + " must be greater than 0."); } private DataChecksum.Type getChecksumType(Configuration conf) { @@ -491,6 +501,13 @@ public int getHedgedReadThreadpoolSize() { return hedgedReadThreadpoolSize; } + /** + * @return the stripedReadThreadpoolSize + */ + public int getStripedReadThreadpoolSize() { + return stripedReadThreadpoolSize; + } + /** * @return the shortCircuitConf */ @@ -744,4 +761,4 @@ public String confAsString() { return builder.toString(); } } -} \ No newline at end of file +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 90507e99bc..6657e5b369 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -874,7 +874,7 @@ private LocatedBlock createLocatedBlock(final BlockInfo blk, final long pos) { final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations(); final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(), blk); - return new LocatedStripedBlock(eb, storages, uc.getBlockIndices(), pos, + return newLocatedStripedBlock(eb, storages, uc.getBlockIndices(), pos, false); } else { assert blk instanceof BlockInfoContiguousUnderConstruction; @@ -883,13 +883,8 @@ private LocatedBlock createLocatedBlock(final BlockInfo blk, final long pos) { final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations(); final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(), blk); - return new LocatedBlock(eb, storages, pos, false); + return newLocatedBlock(eb, storages, pos, false); } - final BlockInfoContiguousUnderConstruction uc = - (BlockInfoContiguousUnderConstruction) blk; - final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations(); - final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(), blk); - return newLocatedBlock(eb, storages, pos, false); } // get block locations @@ -932,7 +927,7 @@ private LocatedBlock createLocatedBlock(final BlockInfo blk, final long pos) { final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(), blk); return blockIndices == null ? newLocatedBlock(eb, machines, pos, isCorrupt) : - new LocatedStripedBlock(eb, machines, blockIndices, pos, isCorrupt); + newLocatedStripedBlock(eb, machines, blockIndices, pos, isCorrupt); } /** Create a LocatedBlocks. */ @@ -3514,7 +3509,7 @@ boolean isNodeHealthyForDecommission(DatanodeDescriptor node) { if (pendingReplicationBlocksCount == 0 && underReplicatedBlocksCount == 0) { LOG.info("Node {} is dead and there are no under-replicated" + - " blocks or blocks pending replication. Safe to decommission.", + " blocks or blocks pending replication. Safe to decommission.", node); return true; } @@ -3920,6 +3915,18 @@ public static LocatedBlock newLocatedBlock( null); } + public static LocatedStripedBlock newLocatedStripedBlock( + ExtendedBlock b, DatanodeStorageInfo[] storages, + int[] indices, long startOffset, boolean corrupt) { + // startOffset is unknown + return new LocatedStripedBlock( + b, DatanodeStorageInfo.toDatanodeInfos(storages), + DatanodeStorageInfo.toStorageIDs(storages), + DatanodeStorageInfo.toStorageTypes(storages), + indices, startOffset, corrupt, + null); + } + /** * This class is used internally by {@link this#computeRecoveryWorkForBlocks} * to represent a task to recover a block through replication or erasure diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStripedINodeFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStripedINodeFile.java index d251c30ad8..4a6d6cc80d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStripedINodeFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStripedINodeFile.java @@ -28,6 +28,7 @@ import org.apache.hadoop.fs.permission.PermissionStatus; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.HdfsConstantsClient; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStripedUnderConstruction; import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; @@ -45,7 +46,7 @@ public class TestStripedINodeFile { "userName", null, FsPermission.getDefault()); private static INodeFile createStripedINodeFile() { - return new INodeFile(INodeId.GRANDFATHER_INODE_ID, null, perm, 0L, 0L, + return new INodeFile(HdfsConstantsClient.GRANDFATHER_INODE_ID, null, perm, 0L, 0L, null, (short)0, 1024L, HdfsConstants.COLD_STORAGE_POLICY_ID); }