From a3954ccab148bddc290cb96528e63ff19799bcc9 Mon Sep 17 00:00:00 2001 From: Chris Douglas Date: Fri, 5 May 2017 12:01:26 -0700 Subject: [PATCH] HDFS-9807. Add an optional StorageID to writes. Contributed by Ewan Higgs --- .../org/apache/hadoop/hdfs/DataStreamer.java | 35 +- .../hadoop/hdfs/StripedDataStreamer.java | 10 +- .../datatransfer/DataTransferProtocol.java | 19 +- .../hdfs/protocol/datatransfer/Sender.java | 29 +- .../hdfs/protocolPB/PBHelperClient.java | 13 + .../token/block/BlockTokenIdentifier.java | 36 +- .../src/main/proto/datatransfer.proto | 4 + .../src/main/proto/hdfs.proto | 1 + .../hdfs/protocol/datatransfer/Receiver.java | 20 +- .../block/BlockPoolTokenSecretManager.java | 22 +- .../token/block/BlockTokenSecretManager.java | 55 +-- .../hdfs/server/balancer/Dispatcher.java | 5 +- .../hdfs/server/balancer/KeyManager.java | 4 +- .../server/blockmanagement/BlockManager.java | 6 +- .../hdfs/server/datanode/BPOfferService.java | 3 +- .../hdfs/server/datanode/BlockReceiver.java | 12 +- .../hadoop/hdfs/server/datanode/DataNode.java | 42 ++- .../hdfs/server/datanode/DataXceiver.java | 68 ++-- .../erasurecode/ErasureCodingWorker.java | 3 +- .../erasurecode/StripedBlockReader.java | 2 +- .../erasurecode/StripedBlockWriter.java | 10 +- .../StripedReconstructionInfo.java | 16 +- .../datanode/erasurecode/StripedWriter.java | 5 +- .../AvailableSpaceVolumeChoosingPolicy.java | 20 +- .../datanode/fsdataset/FsDatasetSpi.java | 6 +- .../RoundRobinVolumeChoosingPolicy.java | 2 +- .../fsdataset/VolumeChoosingPolicy.java | 5 +- .../fsdataset/impl/FsDatasetImpl.java | 21 +- .../datanode/fsdataset/impl/FsVolumeList.java | 19 +- .../org/apache/hadoop/hdfs/DFSTestUtil.java | 3 +- .../hadoop/hdfs/TestBlockStoragePolicy.java | 75 +++- .../hadoop/hdfs/TestDataTransferProtocol.java | 3 +- .../TestWriteBlockGetsBlockLengthHint.java | 6 +- .../security/token/block/TestBlockToken.java | 98 +++--- .../server/datanode/BlockReportTestBase.java | 2 +- .../server/datanode/SimulatedFSDataset.java | 19 +- .../server/datanode/TestBlockRecovery.java | 6 +- .../server/datanode/TestBlockReplacement.java | 2 +- .../TestDataXceiverLazyPersistHint.java | 4 +- .../hdfs/server/datanode/TestDiskError.java | 5 +- .../datanode/TestSimulatedFSDataset.java | 4 +- .../extdataset/ExternalDatasetImpl.java | 10 +- ...estAvailableSpaceVolumeChoosingPolicy.java | 76 ++-- .../TestRoundRobinVolumeChoosingPolicy.java | 29 +- .../fsdataset/impl/TestFsDatasetImpl.java | 4 +- .../fsdataset/impl/TestFsVolumeList.java | 2 +- .../fsdataset/impl/TestWriteToReplica.java | 29 +- .../TestNamenodeStorageDirectives.java | 330 ++++++++++++++++++ 48 files changed, 903 insertions(+), 297 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeStorageDirectives.java diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index 026853736d..49c17b9b73 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -174,10 +174,12 @@ private class StreamerStreams implements java.io.Closeable { void sendTransferBlock(final DatanodeInfo[] targets, final StorageType[] targetStorageTypes, + final String[] targetStorageIDs, final Token blockToken) throws IOException { //send the TRANSFER_BLOCK request new Sender(out).transferBlock(block.getCurrentBlock(), blockToken, - dfsClient.clientName, targets, targetStorageTypes); + dfsClient.clientName, targets, targetStorageTypes, + targetStorageIDs); out.flush(); //ack BlockOpResponseProto transferResponse = BlockOpResponseProto @@ -1367,9 +1369,11 @@ private void addDatanode2ExistingPipeline() throws IOException { final DatanodeInfo src = original[tried % original.length]; final DatanodeInfo[] targets = {nodes[d]}; final StorageType[] targetStorageTypes = {storageTypes[d]}; + final String[] targetStorageIDs = {storageIDs[d]}; try { - transfer(src, targets, targetStorageTypes, lb.getBlockToken()); + transfer(src, targets, targetStorageTypes, targetStorageIDs, + lb.getBlockToken()); } catch (IOException ioe) { DFSClient.LOG.warn("Error transferring data from " + src + " to " + nodes[d] + ": " + ioe.getMessage()); @@ -1400,6 +1404,7 @@ private long computeTransferReadTimeout() { private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets, final StorageType[] targetStorageTypes, + final String[] targetStorageIDs, final Token blockToken) throws IOException { //transfer replica to the new datanode @@ -1412,7 +1417,8 @@ private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets, streams = new StreamerStreams(src, writeTimeout, readTimeout, blockToken); - streams.sendTransferBlock(targets, targetStorageTypes, blockToken); + streams.sendTransferBlock(targets, targetStorageTypes, + targetStorageIDs, blockToken); return; } catch (InvalidEncryptionKeyException e) { policy.recordFailure(e); @@ -1440,11 +1446,12 @@ private void setupPipelineForAppendOrRecovery() throws IOException { streamerClosed = true; return; } - setupPipelineInternal(nodes, storageTypes); + setupPipelineInternal(nodes, storageTypes, storageIDs); } protected void setupPipelineInternal(DatanodeInfo[] datanodes, - StorageType[] nodeStorageTypes) throws IOException { + StorageType[] nodeStorageTypes, String[] nodeStorageIDs) + throws IOException { boolean success = false; long newGS = 0L; while (!success && !streamerClosed && dfsClient.clientRunning) { @@ -1465,7 +1472,8 @@ protected void setupPipelineInternal(DatanodeInfo[] datanodes, accessToken = lb.getBlockToken(); // set up the pipeline again with the remaining nodes - success = createBlockOutputStream(nodes, storageTypes, newGS, isRecovery); + success = createBlockOutputStream(nodes, storageTypes, storageIDs, newGS, + isRecovery); failPacket4Testing(); @@ -1601,7 +1609,8 @@ DatanodeInfo[] getExcludedNodes() { protected LocatedBlock nextBlockOutputStream() throws IOException { LocatedBlock lb; DatanodeInfo[] nodes; - StorageType[] storageTypes; + StorageType[] nextStorageTypes; + String[] nextStorageIDs; int count = dfsClient.getConf().getNumBlockWriteRetry(); boolean success; final ExtendedBlock oldBlock = block.getCurrentBlock(); @@ -1617,10 +1626,12 @@ protected LocatedBlock nextBlockOutputStream() throws IOException { bytesSent = 0; accessToken = lb.getBlockToken(); nodes = lb.getLocations(); - storageTypes = lb.getStorageTypes(); + nextStorageTypes = lb.getStorageTypes(); + nextStorageIDs = lb.getStorageIDs(); // Connect to first DataNode in the list. - success = createBlockOutputStream(nodes, storageTypes, 0L, false); + success = createBlockOutputStream(nodes, nextStorageTypes, nextStorageIDs, + 0L, false); if (!success) { LOG.warn("Abandoning " + block); @@ -1643,7 +1654,8 @@ protected LocatedBlock nextBlockOutputStream() throws IOException { // Returns true if success, otherwise return failure. // boolean createBlockOutputStream(DatanodeInfo[] nodes, - StorageType[] nodeStorageTypes, long newGS, boolean recoveryFlag) { + StorageType[] nodeStorageTypes, String[] nodeStorageIDs, + long newGS, boolean recoveryFlag) { if (nodes.length == 0) { LOG.info("nodes are empty for write pipeline of " + block); return false; @@ -1696,7 +1708,8 @@ boolean createBlockOutputStream(DatanodeInfo[] nodes, dfsClient.clientName, nodes, nodeStorageTypes, null, bcs, nodes.length, block.getNumBytes(), bytesSent, newGS, checksum4WriteBlock, cachingStrategy.get(), isLazyPersistFile, - (targetPinnings != null && targetPinnings[0]), targetPinnings); + (targetPinnings != null && targetPinnings[0]), targetPinnings, + nodeStorageIDs[0], nodeStorageIDs); // receive ack for connect BlockOpResponseProto resp = BlockOpResponseProto.parseFrom( diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java index b457edbf8d..d920f18e24 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java @@ -100,9 +100,11 @@ protected LocatedBlock nextBlockOutputStream() throws IOException { DatanodeInfo[] nodes = lb.getLocations(); StorageType[] storageTypes = lb.getStorageTypes(); + String[] storageIDs = lb.getStorageIDs(); // Connect to the DataNode. If fail the internal error state will be set. - success = createBlockOutputStream(nodes, storageTypes, 0L, false); + success = createBlockOutputStream(nodes, storageTypes, storageIDs, 0L, + false); if (!success) { block.setCurrentBlock(null); @@ -121,7 +123,8 @@ LocatedBlock peekFollowingBlock() { @Override protected void setupPipelineInternal(DatanodeInfo[] nodes, - StorageType[] nodeStorageTypes) throws IOException { + StorageType[] nodeStorageTypes, String[] nodeStorageIDs) + throws IOException { boolean success = false; while (!success && !streamerClosed() && dfsClient.clientRunning) { if (!handleRestartingDatanode()) { @@ -141,7 +144,8 @@ protected void setupPipelineInternal(DatanodeInfo[] nodes, // set up the pipeline again with the remaining nodes. when a striped // data streamer comes here, it must be in external error state. assert getErrorState().hasExternalError(); - success = createBlockOutputStream(nodes, nodeStorageTypes, newGS, true); + success = createBlockOutputStream(nodes, nodeStorageTypes, + nodeStorageIDs, newGS, true); failPacket4Testing(); getErrorState().checkRestartingNodeDeadline(nodes); diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java index 6c5883c3ff..fe20c37032 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java @@ -101,6 +101,11 @@ void readBlock(final ExtendedBlock blk, * written to disk lazily * @param pinning whether to pin the block, so Balancer won't move it. * @param targetPinnings whether to pin the block on target datanode + * @param storageID optional StorageIDs designating where to write the + * block. An empty String or null indicates that this + * has not been provided. + * @param targetStorageIDs target StorageIDs corresponding to the target + * datanodes. */ void writeBlock(final ExtendedBlock blk, final StorageType storageType, @@ -118,7 +123,9 @@ void writeBlock(final ExtendedBlock blk, final CachingStrategy cachingStrategy, final boolean allowLazyPersist, final boolean pinning, - final boolean[] targetPinnings) throws IOException; + final boolean[] targetPinnings, + final String storageID, + final String[] targetStorageIDs) throws IOException; /** * Transfer a block to another datanode. * The block stage must be @@ -129,12 +136,15 @@ void writeBlock(final ExtendedBlock blk, * @param blockToken security token for accessing the block. * @param clientName client's name. * @param targets target datanodes. + * @param targetStorageIDs StorageID designating where to write the + * block. */ void transferBlock(final ExtendedBlock blk, final Token blockToken, final String clientName, final DatanodeInfo[] targets, - final StorageType[] targetStorageTypes) throws IOException; + final StorageType[] targetStorageTypes, + final String[] targetStorageIDs) throws IOException; /** * Request short circuit access file descriptors from a DataNode. @@ -179,12 +189,15 @@ void requestShortCircuitFds(final ExtendedBlock blk, * @param blockToken security token for accessing the block. * @param delHint the hint for deleting the block in the original datanode. * @param source the source datanode for receiving the block. + * @param storageId an optional storage ID to designate where the block is + * replaced to. */ void replaceBlock(final ExtendedBlock blk, final StorageType storageType, final Token blockToken, final String delHint, - final DatanodeInfo source) throws IOException; + final DatanodeInfo source, + final String storageId) throws IOException; /** * Copy a block. diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java index e133975f53..8a8d20ddb5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java @@ -22,6 +22,7 @@ import java.io.DataOutput; import java.io.DataOutputStream; import java.io.IOException; +import java.util.Arrays; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -132,7 +133,9 @@ public void writeBlock(final ExtendedBlock blk, final CachingStrategy cachingStrategy, final boolean allowLazyPersist, final boolean pinning, - final boolean[] targetPinnings) throws IOException { + final boolean[] targetPinnings, + final String storageId, + final String[] targetStorageIds) throws IOException { ClientOperationHeaderProto header = DataTransferProtoUtil.buildClientHeader( blk, clientName, blockToken); @@ -154,11 +157,14 @@ public void writeBlock(final ExtendedBlock blk, .setCachingStrategy(getCachingStrategy(cachingStrategy)) .setAllowLazyPersist(allowLazyPersist) .setPinning(pinning) - .addAllTargetPinnings(PBHelperClient.convert(targetPinnings, 1)); - + .addAllTargetPinnings(PBHelperClient.convert(targetPinnings, 1)) + .addAllTargetStorageIds(PBHelperClient.convert(targetStorageIds, 1)); if (source != null) { proto.setSource(PBHelperClient.convertDatanodeInfo(source)); } + if (storageId != null) { + proto.setStorageId(storageId); + } send(out, Op.WRITE_BLOCK, proto.build()); } @@ -168,7 +174,8 @@ public void transferBlock(final ExtendedBlock blk, final Token blockToken, final String clientName, final DatanodeInfo[] targets, - final StorageType[] targetStorageTypes) throws IOException { + final StorageType[] targetStorageTypes, + final String[] targetStorageIds) throws IOException { OpTransferBlockProto proto = OpTransferBlockProto.newBuilder() .setHeader(DataTransferProtoUtil.buildClientHeader( @@ -176,6 +183,7 @@ public void transferBlock(final ExtendedBlock blk, .addAllTargets(PBHelperClient.convert(targets)) .addAllTargetStorageTypes( PBHelperClient.convertStorageTypes(targetStorageTypes)) + .addAllTargetStorageIds(Arrays.asList(targetStorageIds)) .build(); send(out, Op.TRANSFER_BLOCK, proto); @@ -233,15 +241,18 @@ public void replaceBlock(final ExtendedBlock blk, final StorageType storageType, final Token blockToken, final String delHint, - final DatanodeInfo source) throws IOException { - OpReplaceBlockProto proto = OpReplaceBlockProto.newBuilder() + final DatanodeInfo source, + final String storageId) throws IOException { + OpReplaceBlockProto.Builder proto = OpReplaceBlockProto.newBuilder() .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken)) .setStorageType(PBHelperClient.convertStorageType(storageType)) .setDelHint(delHint) - .setSource(PBHelperClient.convertDatanodeInfo(source)) - .build(); + .setSource(PBHelperClient.convertDatanodeInfo(source)); + if (storageId != null) { + proto.setStorageId(storageId); + } - send(out, Op.REPLACE_BLOCK, proto); + send(out, Op.REPLACE_BLOCK, proto.build()); } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java index 2b8f102064..614f653f65 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java @@ -345,6 +345,16 @@ public static List convert(boolean[] targetPinnings, int idx) { return pinnings; } + public static List convert(String[] targetIds, int idx) { + List ids = new ArrayList<>(); + if (targetIds != null) { + for (; idx < targetIds.length; ++idx) { + ids.add(targetIds[idx]); + } + } + return ids; + } + public static ExtendedBlock convert(ExtendedBlockProto eb) { if (eb == null) return null; return new ExtendedBlock( eb.getPoolId(), eb.getBlockId(), eb.getNumBytes(), @@ -640,6 +650,9 @@ public static BlockTokenSecretProto convert( for (StorageType storageType : blockTokenSecret.getStorageTypes()) { builder.addStorageTypes(convertStorageType(storageType)); } + for (String storageId : blockTokenSecret.getStorageIds()) { + builder.addStorageIds(storageId); + } return builder.build(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java index 228a7b67d5..5950752d1a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java @@ -53,16 +53,19 @@ public enum AccessMode { private long blockId; private final EnumSet modes; private StorageType[] storageTypes; + private String[] storageIds; private boolean useProto; private byte [] cache; public BlockTokenIdentifier() { - this(null, null, 0, EnumSet.noneOf(AccessMode.class), null, false); + this(null, null, 0, EnumSet.noneOf(AccessMode.class), null, null, + false); } public BlockTokenIdentifier(String userId, String bpid, long blockId, - EnumSet modes, StorageType[] storageTypes, boolean useProto) { + EnumSet modes, StorageType[] storageTypes, + String[] storageIds, boolean useProto) { this.cache = null; this.userId = userId; this.blockPoolId = bpid; @@ -70,6 +73,8 @@ public BlockTokenIdentifier(String userId, String bpid, long blockId, this.modes = modes == null ? EnumSet.noneOf(AccessMode.class) : modes; this.storageTypes = Optional.ofNullable(storageTypes) .orElse(StorageType.EMPTY_ARRAY); + this.storageIds = Optional.ofNullable(storageIds) + .orElse(new String[0]); this.useProto = useProto; } @@ -125,6 +130,10 @@ public StorageType[] getStorageTypes(){ return storageTypes; } + public String[] getStorageIds(){ + return storageIds; + } + @Override public String toString() { return "block_token_identifier (expiryDate=" + this.getExpiryDate() @@ -132,7 +141,8 @@ public String toString() { + ", blockPoolId=" + this.getBlockPoolId() + ", blockId=" + this.getBlockId() + ", access modes=" + this.getAccessModes() + ", storageTypes= " - + Arrays.toString(this.getStorageTypes()) + ")"; + + Arrays.toString(this.getStorageTypes()) + ", storageIds= " + + Arrays.toString(this.getStorageIds()) + ")"; } static boolean isEqual(Object a, Object b) { @@ -151,7 +161,8 @@ && isEqual(this.userId, that.userId) && isEqual(this.blockPoolId, that.blockPoolId) && this.blockId == that.blockId && isEqual(this.modes, that.modes) - && Arrays.equals(this.storageTypes, that.storageTypes); + && Arrays.equals(this.storageTypes, that.storageTypes) + && Arrays.equals(this.storageIds, that.storageIds); } return false; } @@ -161,7 +172,8 @@ public int hashCode() { return (int) expiryDate ^ keyId ^ (int) blockId ^ modes.hashCode() ^ (userId == null ? 0 : userId.hashCode()) ^ (blockPoolId == null ? 0 : blockPoolId.hashCode()) - ^ (storageTypes == null ? 0 : Arrays.hashCode(storageTypes)); + ^ (storageTypes == null ? 0 : Arrays.hashCode(storageTypes)) + ^ (storageIds == null ? 0 : Arrays.hashCode(storageIds)); } /** @@ -220,6 +232,14 @@ void readFieldsLegacy(DataInput in) throws IOException { readStorageTypes[i] = WritableUtils.readEnum(in, StorageType.class); } storageTypes = readStorageTypes; + + length = WritableUtils.readVInt(in); + String[] readStorageIds = new String[length]; + for (int i = 0; i < length; i++) { + readStorageIds[i] = WritableUtils.readString(in); + } + storageIds = readStorageIds; + useProto = false; } @@ -248,6 +268,8 @@ void readFieldsProtobuf(DataInput in) throws IOException { storageTypes = blockTokenSecretProto.getStorageTypesList().stream() .map(PBHelperClient::convertStorageType) .toArray(StorageType[]::new); + storageIds = blockTokenSecretProto.getStorageIdsList().stream() + .toArray(String[]::new); useProto = true; } @@ -275,6 +297,10 @@ void writeLegacy(DataOutput out) throws IOException { for (StorageType type: storageTypes){ WritableUtils.writeEnum(out, type); } + WritableUtils.writeVInt(out, storageIds.length); + for (String id: storageIds) { + WritableUtils.writeString(out, id); + } } @VisibleForTesting diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto index 889361aa6d..2356201f04 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto @@ -125,12 +125,15 @@ message OpWriteBlockProto { //whether to pin the block, so Balancer won't move it. optional bool pinning = 14 [default = false]; repeated bool targetPinnings = 15; + optional string storageId = 16; + repeated string targetStorageIds = 17; } message OpTransferBlockProto { required ClientOperationHeaderProto header = 1; repeated DatanodeInfoProto targets = 2; repeated StorageTypeProto targetStorageTypes = 3; + repeated string targetStorageIds = 4; } message OpReplaceBlockProto { @@ -138,6 +141,7 @@ message OpReplaceBlockProto { required string delHint = 2; required DatanodeInfoProto source = 3; optional StorageTypeProto storageType = 4 [default = DISK]; + optional string storageId = 5; } message OpCopyBlockProto { diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto index 3e27427831..08ed3c8e6a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto @@ -570,4 +570,5 @@ message BlockTokenSecretProto { optional uint64 blockId = 5; repeated AccessModeProto modes = 6; repeated StorageTypeProto storageTypes = 7; + repeated string storageIds = 8; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java index 08ab967b54..bab2e8da5f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java @@ -25,7 +25,9 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.StripedBlockInfo; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto; @@ -185,7 +187,9 @@ private void opWriteBlock(DataInputStream in) throws IOException { CachingStrategy.newDefaultStrategy()), (proto.hasAllowLazyPersist() ? proto.getAllowLazyPersist() : false), (proto.hasPinning() ? proto.getPinning(): false), - (PBHelperClient.convertBooleanList(proto.getTargetPinningsList()))); + (PBHelperClient.convertBooleanList(proto.getTargetPinningsList())), + proto.getStorageId(), + proto.getTargetStorageIdsList().toArray(new String[0])); } finally { if (traceScope != null) traceScope.close(); } @@ -199,11 +203,18 @@ private void opTransferBlock(DataInputStream in) throws IOException { TraceScope traceScope = continueTraceSpan(proto.getHeader(), proto.getClass().getSimpleName()); try { - transferBlock(PBHelperClient.convert(proto.getHeader().getBaseHeader().getBlock()), + final ExtendedBlock block = + PBHelperClient.convert(proto.getHeader().getBaseHeader().getBlock()); + final StorageType[] targetStorageTypes = + PBHelperClient.convertStorageTypes(proto.getTargetStorageTypesList(), + targets.length); + transferBlock(block, PBHelperClient.convert(proto.getHeader().getBaseHeader().getToken()), proto.getHeader().getClientName(), targets, - PBHelperClient.convertStorageTypes(proto.getTargetStorageTypesList(), targets.length)); + targetStorageTypes, + proto.getTargetStorageIdsList().toArray(new String[0]) + ); } finally { if (traceScope != null) traceScope.close(); } @@ -264,7 +275,8 @@ private void opReplaceBlock(DataInputStream in) throws IOException { PBHelperClient.convertStorageType(proto.getStorageType()), PBHelperClient.convert(proto.getHeader().getToken()), proto.getDelHint(), - PBHelperClient.convert(proto.getSource())); + PBHelperClient.convert(proto.getSource()), + proto.getStorageId()); } finally { if (traceScope != null) traceScope.close(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java index 29fb73f765..8400b4f04b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java @@ -84,25 +84,27 @@ public byte[] retrievePassword(BlockTokenIdentifier identifier) /** * See {@link BlockTokenSecretManager#checkAccess(BlockTokenIdentifier, * String, ExtendedBlock, BlockTokenIdentifier.AccessMode, - * StorageType[])} + * StorageType[], String[])} */ public void checkAccess(BlockTokenIdentifier id, String userId, ExtendedBlock block, AccessMode mode, - StorageType[] storageTypes) throws InvalidToken { + StorageType[] storageTypes, String[] storageIds) + throws InvalidToken { get(block.getBlockPoolId()).checkAccess(id, userId, block, mode, - storageTypes); + storageTypes, storageIds); } /** * See {@link BlockTokenSecretManager#checkAccess(Token, String, * ExtendedBlock, BlockTokenIdentifier.AccessMode, - * StorageType[])}. + * StorageType[], String[])} */ public void checkAccess(Token token, String userId, ExtendedBlock block, AccessMode mode, - StorageType[] storageTypes) throws InvalidToken { + StorageType[] storageTypes, String[] storageIds) + throws InvalidToken { get(block.getBlockPoolId()).checkAccess(token, userId, block, mode, - storageTypes); + storageTypes, storageIds); } /** @@ -115,11 +117,13 @@ public void addKeys(String bpid, ExportedBlockKeys exportedKeys) /** * See {@link BlockTokenSecretManager#generateToken(ExtendedBlock, EnumSet, - * StorageType[])} + * StorageType[], String[])}. */ public Token generateToken(ExtendedBlock b, - EnumSet of, StorageType[] storageTypes) throws IOException { - return get(b.getBlockPoolId()).generateToken(b, of, storageTypes); + EnumSet of, StorageType[] storageTypes, String[] storageIds) + throws IOException { + return get(b.getBlockPoolId()).generateToken(b, of, storageTypes, + storageIds); } @VisibleForTesting diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java index f3bec83de1..6b54490e4f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java @@ -247,18 +247,19 @@ synchronized boolean updateKeys() throws IOException { /** Generate an block token for current user */ public Token generateToken(ExtendedBlock block, EnumSet modes, - StorageType[] storageTypes) throws IOException { + StorageType[] storageTypes, String[] storageIds) throws IOException { UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); String userID = (ugi == null ? null : ugi.getShortUserName()); - return generateToken(userID, block, modes, storageTypes); + return generateToken(userID, block, modes, storageTypes, storageIds); } /** Generate a block token for a specified user */ public Token generateToken(String userId, ExtendedBlock block, EnumSet modes, - StorageType[] storageTypes) throws IOException { + StorageType[] storageTypes, String[] storageIds) throws IOException { BlockTokenIdentifier id = new BlockTokenIdentifier(userId, block - .getBlockPoolId(), block.getBlockId(), modes, storageTypes, useProto); + .getBlockPoolId(), block.getBlockId(), modes, storageTypes, + storageIds, useProto); return new Token(id, this); } @@ -272,10 +273,13 @@ public Token generateToken(String userId, */ public void checkAccess(BlockTokenIdentifier id, String userId, ExtendedBlock block, BlockTokenIdentifier.AccessMode mode, - StorageType[] storageTypes) throws InvalidToken { + StorageType[] storageTypes, String[] storageIds) throws InvalidToken { checkAccess(id, userId, block, mode); if (storageTypes != null && storageTypes.length > 0) { - checkAccess(id.getStorageTypes(), storageTypes); + checkAccess(id.getStorageTypes(), storageTypes, "StorageTypes"); + } + if (storageIds != null && storageIds.length > 0) { + checkAccess(id.getStorageIds(), storageIds, "StorageIDs"); } } @@ -309,30 +313,31 @@ public void checkAccess(BlockTokenIdentifier id, String userId, } /** - * Check if the requested StorageTypes match the StorageTypes in the - * BlockTokenIdentifier. - * Empty candidateStorageTypes specifiers mean 'all is permitted'. They - * would otherwise be nonsensical. + * Check if the requested values can be satisfied with the values in the + * BlockToken. This is intended for use with StorageTypes and StorageIDs. + * + * The current node can only verify that one of the storage [Type|ID] is + * available. The rest will be on different nodes. */ - public static void checkAccess(StorageType[] candidateStorageTypes, - StorageType[] storageTypesRequested) throws InvalidToken { - if (storageTypesRequested.length == 0) { - throw new InvalidToken("The request has no StorageTypes. " + public static void checkAccess(T[] candidates, T[] requested, String msg) + throws InvalidToken { + if (requested.length == 0) { + throw new InvalidToken("The request has no " + msg + ". " + "This is probably a configuration error."); } - if (candidateStorageTypes.length == 0) { + if (candidates.length == 0) { return; } - List unseenCandidates = new ArrayList(); - unseenCandidates.addAll(Arrays.asList(candidateStorageTypes)); - for (StorageType storageType : storageTypesRequested) { - final int index = unseenCandidates.indexOf(storageType); + List unseenCandidates = new ArrayList(); + unseenCandidates.addAll(Arrays.asList(candidates)); + for (T req : requested) { + final int index = unseenCandidates.indexOf(req); if (index == -1) { - throw new InvalidToken("Block token with StorageTypes " - + Arrays.toString(candidateStorageTypes) - + " not valid for access with StorageTypes " - + Arrays.toString(storageTypesRequested)); + throw new InvalidToken("Block token with " + msg + " " + + Arrays.toString(candidates) + + " not valid for access with " + msg + " " + + Arrays.toString(requested)); } Collections.swap(unseenCandidates, index, unseenCandidates.size()-1); unseenCandidates.remove(unseenCandidates.size()-1); @@ -342,7 +347,7 @@ public static void checkAccess(StorageType[] candidateStorageTypes, /** Check if access should be allowed. userID is not checked if null */ public void checkAccess(Token token, String userId, ExtendedBlock block, BlockTokenIdentifier.AccessMode mode, - StorageType[] storageTypes) throws InvalidToken { + StorageType[] storageTypes, String[] storageIds) throws InvalidToken { BlockTokenIdentifier id = new BlockTokenIdentifier(); try { id.readFields(new DataInputStream(new ByteArrayInputStream(token @@ -352,7 +357,7 @@ public void checkAccess(Token token, String userId, "Unable to de-serialize block token identifier for user=" + userId + ", block=" + block + ", access mode=" + mode); } - checkAccess(id, userId, block, mode, storageTypes); + checkAccess(id, userId, block, mode, storageTypes, storageIds); if (!Arrays.equals(retrievePassword(id), token.getPassword())) { throw new InvalidToken("Block token with " + id.toString() + " doesn't have the correct token password"); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java index 91dc90799e..f855e45332 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java @@ -357,7 +357,7 @@ private void dispatch() { reportedBlock.getBlock()); final KeyManager km = nnc.getKeyManager(); Token accessToken = km.getAccessToken(eb, - new StorageType[]{target.storageType}); + new StorageType[]{target.storageType}, new String[0]); IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut, unbufIn, km, accessToken, target.getDatanodeInfo()); unbufOut = saslStreams.out; @@ -411,7 +411,8 @@ private void dispatch() { private void sendRequest(DataOutputStream out, ExtendedBlock eb, Token accessToken) throws IOException { new Sender(out).replaceBlock(eb, target.storageType, accessToken, - source.getDatanodeInfo().getDatanodeUuid(), proxySource.datanode); + source.getDatanodeInfo().getDatanodeUuid(), proxySource.datanode, + null); } /** Check whether to continue waiting for response */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/KeyManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/KeyManager.java index 06bf07fe13..faf95b706e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/KeyManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/KeyManager.java @@ -95,7 +95,7 @@ public void startBlockKeyUpdater() { /** Get an access token for a block. */ public Token getAccessToken(ExtendedBlock eb, - StorageType[] storageTypes) throws IOException { + StorageType[] storageTypes, String[] storageIds) throws IOException { if (!isBlockTokenEnabled) { return BlockTokenSecretManager.DUMMY_TOKEN; } else { @@ -105,7 +105,7 @@ public Token getAccessToken(ExtendedBlock eb, } return blockTokenSecretManager.generateToken(null, eb, EnumSet.of(BlockTokenIdentifier.AccessMode.REPLACE, - BlockTokenIdentifier.AccessMode.COPY), storageTypes); + BlockTokenIdentifier.AccessMode.COPY), storageTypes, storageIds); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index e63930a00b..8f58e255cc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -1283,13 +1283,15 @@ public void setBlockToken(final LocatedBlock b, internalBlock.setBlockId(b.getBlock().getBlockId() + indices[i]); blockTokens[i] = blockTokenSecretManager.generateToken( NameNode.getRemoteUser().getShortUserName(), - internalBlock, EnumSet.of(mode), b.getStorageTypes()); + internalBlock, EnumSet.of(mode), b.getStorageTypes(), + b.getStorageIDs()); } sb.setBlockTokens(blockTokens); } else { b.setBlockToken(blockTokenSecretManager.generateToken( NameNode.getRemoteUser().getShortUserName(), - b.getBlock(), EnumSet.of(mode), b.getStorageTypes())); + b.getBlock(), EnumSet.of(mode), b.getStorageTypes(), + b.getStorageIDs())); } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java index e0daca78b9..042169a9e6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java @@ -679,7 +679,8 @@ private boolean processCommandFromActive(DatanodeCommand cmd, case DatanodeProtocol.DNA_TRANSFER: // Send a copy of a block to another datanode dn.transferBlocks(bcmd.getBlockPoolId(), bcmd.getBlocks(), - bcmd.getTargets(), bcmd.getTargetStorageTypes()); + bcmd.getTargets(), bcmd.getTargetStorageTypes(), + bcmd.getTargetStorageIDs()); break; case DatanodeProtocol.DNA_INVALIDATE: // diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index 00109e052d..2ab40672bd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -151,7 +151,8 @@ class BlockReceiver implements Closeable { final DataNode datanode, DataChecksum requestedChecksum, CachingStrategy cachingStrategy, final boolean allowLazyPersist, - final boolean pinning) throws IOException { + final boolean pinning, + final String storageId) throws IOException { try{ this.block = block; this.in = in; @@ -197,6 +198,7 @@ class BlockReceiver implements Closeable { + "\n allowLazyPersist=" + allowLazyPersist + ", pinning=" + pinning + ", isClient=" + isClient + ", isDatanode=" + isDatanode + ", responseInterval=" + responseInterval + + ", storageID=" + (storageId != null ? storageId : "null") ); } @@ -204,11 +206,13 @@ class BlockReceiver implements Closeable { // Open local disk out // if (isDatanode) { //replication or move - replicaHandler = datanode.data.createTemporary(storageType, block); + replicaHandler = + datanode.data.createTemporary(storageType, storageId, block); } else { switch (stage) { case PIPELINE_SETUP_CREATE: - replicaHandler = datanode.data.createRbw(storageType, block, allowLazyPersist); + replicaHandler = datanode.data.createRbw(storageType, storageId, + block, allowLazyPersist); datanode.notifyNamenodeReceivingBlock( block, replicaHandler.getReplica().getStorageUuid()); break; @@ -233,7 +237,7 @@ class BlockReceiver implements Closeable { case TRANSFER_FINALIZED: // this is a transfer destination replicaHandler = - datanode.data.createTemporary(storageType, block); + datanode.data.createTemporary(storageType, storageId, block); break; default: throw new IOException("Unsupported stage " + stage + " while receiving block " + block + " from " + inAddr); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 66ef89ac36..2305e0ba0a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -1943,7 +1943,7 @@ private void checkBlockToken(ExtendedBlock block, LOG.debug("Got: " + id.toString()); } blockPoolTokenSecretManager.checkAccess(id, null, block, accessMode, - null); + null, null); } } @@ -2224,7 +2224,8 @@ private void reportBadBlock(final BPOfferService bpos, @VisibleForTesting void transferBlock(ExtendedBlock block, DatanodeInfo[] xferTargets, - StorageType[] xferTargetStorageTypes) throws IOException { + StorageType[] xferTargetStorageTypes, String[] xferTargetStorageIDs) + throws IOException { BPOfferService bpos = getBPOSForBlock(block); DatanodeRegistration bpReg = getDNRegistrationForBP(block.getBlockPoolId()); @@ -2281,17 +2282,19 @@ void transferBlock(ExtendedBlock block, DatanodeInfo[] xferTargets, LOG.info(bpReg + " Starting thread to transfer " + block + " to " + xfersBuilder); - new Daemon(new DataTransfer(xferTargets, xferTargetStorageTypes, block, + new Daemon(new DataTransfer(xferTargets, xferTargetStorageTypes, + xferTargetStorageIDs, block, BlockConstructionStage.PIPELINE_SETUP_CREATE, "")).start(); } } void transferBlocks(String poolId, Block blocks[], - DatanodeInfo xferTargets[][], StorageType[][] xferTargetStorageTypes) { + DatanodeInfo[][] xferTargets, StorageType[][] xferTargetStorageTypes, + String[][] xferTargetStorageIDs) { for (int i = 0; i < blocks.length; i++) { try { transferBlock(new ExtendedBlock(poolId, blocks[i]), xferTargets[i], - xferTargetStorageTypes[i]); + xferTargetStorageTypes[i], xferTargetStorageIDs[i]); } catch (IOException ie) { LOG.warn("Failed to transfer block " + blocks[i], ie); } @@ -2395,6 +2398,7 @@ CHECKSUM_SIZE depends on CHECKSUM_TYPE (usually, 4 for CRC32) private class DataTransfer implements Runnable { final DatanodeInfo[] targets; final StorageType[] targetStorageTypes; + final private String[] targetStorageIds; final ExtendedBlock b; final BlockConstructionStage stage; final private DatanodeRegistration bpReg; @@ -2406,8 +2410,8 @@ private class DataTransfer implements Runnable { * entire target list, the block, and the data. */ DataTransfer(DatanodeInfo targets[], StorageType[] targetStorageTypes, - ExtendedBlock b, BlockConstructionStage stage, - final String clientname) { + String[] targetStorageIds, ExtendedBlock b, + BlockConstructionStage stage, final String clientname) { if (DataTransferProtocol.LOG.isDebugEnabled()) { DataTransferProtocol.LOG.debug(getClass().getSimpleName() + ": " + b + " (numBytes=" + b.getNumBytes() + ")" @@ -2415,10 +2419,13 @@ private class DataTransfer implements Runnable { + ", clientname=" + clientname + ", targets=" + Arrays.asList(targets) + ", target storage types=" + (targetStorageTypes == null ? "[]" : - Arrays.asList(targetStorageTypes))); + Arrays.asList(targetStorageTypes)) + + ", target storage IDs=" + (targetStorageIds == null ? "[]" : + Arrays.asList(targetStorageIds))); } this.targets = targets; this.targetStorageTypes = targetStorageTypes; + this.targetStorageIds = targetStorageIds; this.b = b; this.stage = stage; BPOfferService bpos = blockPoolManager.get(b.getBlockPoolId()); @@ -2456,7 +2463,7 @@ public void run() { // Token accessToken = getBlockAccessToken(b, EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE), - targetStorageTypes); + targetStorageTypes, targetStorageIds); long writeTimeout = dnConf.socketWriteTimeout + HdfsConstants.WRITE_TIMEOUT_EXTENSION * (targets.length-1); @@ -2477,10 +2484,13 @@ public void run() { DatanodeInfo srcNode = new DatanodeInfoBuilder().setNodeID(bpReg) .build(); + String storageId = targetStorageIds.length > 0 ? + targetStorageIds[0] : null; new Sender(out).writeBlock(b, targetStorageTypes[0], accessToken, clientname, targets, targetStorageTypes, srcNode, stage, 0, 0, 0, 0, blockSender.getChecksum(), cachingStrategy, - false, false, null); + false, false, null, storageId, + targetStorageIds); // send data & checksum blockSender.sendBlock(out, unbufOut, null); @@ -2540,12 +2550,12 @@ public void run() { */ public Token getBlockAccessToken(ExtendedBlock b, EnumSet mode, - StorageType[] storageTypes) throws IOException { + StorageType[] storageTypes, String[] storageIds) throws IOException { Token accessToken = BlockTokenSecretManager.DUMMY_TOKEN; if (isBlockTokenEnabled) { accessToken = blockPoolTokenSecretManager.generateToken(b, mode, - storageTypes); + storageTypes, storageIds); } return accessToken; } @@ -2918,7 +2928,7 @@ private void checkReadAccess(final ExtendedBlock block) throws IOException { LOG.debug("Got: " + id.toString()); } blockPoolTokenSecretManager.checkAccess(id, null, block, - BlockTokenIdentifier.AccessMode.READ, null); + BlockTokenIdentifier.AccessMode.READ, null, null); } } } @@ -2934,7 +2944,8 @@ private void checkReadAccess(final ExtendedBlock block) throws IOException { */ void transferReplicaForPipelineRecovery(final ExtendedBlock b, final DatanodeInfo[] targets, final StorageType[] targetStorageTypes, - final String client) throws IOException { + final String[] targetStorageIds, final String client) + throws IOException { final long storedGS; final long visible; final BlockConstructionStage stage; @@ -2967,7 +2978,8 @@ void transferReplicaForPipelineRecovery(final ExtendedBlock b, b.setNumBytes(visible); if (targets.length > 0) { - new DataTransfer(targets, targetStorageTypes, b, stage, client).run(); + new DataTransfer(targets, targetStorageTypes, targetStorageIds, b, stage, + client).run(); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index cc13799472..d42e3307e5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -354,7 +354,8 @@ public void requestShortCircuitFds(final ExtendedBlock blk, updateCurrentThreadName("Passing file descriptors for block " + blk); DataOutputStream out = getBufferedOutputStream(); checkAccess(out, true, blk, token, - Op.REQUEST_SHORT_CIRCUIT_FDS, BlockTokenIdentifier.AccessMode.READ); + Op.REQUEST_SHORT_CIRCUIT_FDS, BlockTokenIdentifier.AccessMode.READ, + null, null); BlockOpResponseProto.Builder bld = BlockOpResponseProto.newBuilder(); FileInputStream fis[] = null; SlotId registeredSlotId = null; @@ -662,7 +663,7 @@ public void writeBlock(final ExtendedBlock block, final Token blockToken, final String clientname, final DatanodeInfo[] targets, - final StorageType[] targetStorageTypes, + final StorageType[] targetStorageTypes, final DatanodeInfo srcDataNode, final BlockConstructionStage stage, final int pipelineSize, @@ -673,7 +674,9 @@ public void writeBlock(final ExtendedBlock block, CachingStrategy cachingStrategy, boolean allowLazyPersist, final boolean pinning, - final boolean[] targetPinnings) throws IOException { + final boolean[] targetPinnings, + final String storageId, + final String[] targetStorageIds) throws IOException { previousOpClientName = clientname; updateCurrentThreadName("Receiving block " + block); final boolean isDatanode = clientname.length() == 0; @@ -692,8 +695,15 @@ public void writeBlock(final ExtendedBlock block, if (targetStorageTypes.length > 0) { System.arraycopy(targetStorageTypes, 0, storageTypes, 1, nst); } + int nsi = targetStorageIds.length; + String[] storageIds = new String[nsi + 1]; + storageIds[0] = storageId; + if (targetStorageTypes.length > 0) { + System.arraycopy(targetStorageIds, 0, storageIds, 1, nsi); + } checkAccess(replyOut, isClient, block, blockToken, Op.WRITE_BLOCK, - BlockTokenIdentifier.AccessMode.WRITE, storageTypes); + BlockTokenIdentifier.AccessMode.WRITE, + storageTypes, storageIds); // check single target for transfer-RBW/Finalized if (isTransfer && targets.length > 0) { @@ -743,7 +753,7 @@ public void writeBlock(final ExtendedBlock block, peer.getLocalAddressString(), stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd, clientname, srcDataNode, datanode, requestedChecksum, - cachingStrategy, allowLazyPersist, pinning)); + cachingStrategy, allowLazyPersist, pinning, storageId)); replica = blockReceiver.getReplica(); } else { replica = datanode.data.recoverClose( @@ -796,16 +806,18 @@ public void writeBlock(final ExtendedBlock block, if (targetPinnings != null && targetPinnings.length > 0) { new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0], - blockToken, clientname, targets, targetStorageTypes, srcDataNode, - stage, pipelineSize, minBytesRcvd, maxBytesRcvd, - latestGenerationStamp, requestedChecksum, cachingStrategy, - allowLazyPersist, targetPinnings[0], targetPinnings); + blockToken, clientname, targets, targetStorageTypes, + srcDataNode, stage, pipelineSize, minBytesRcvd, maxBytesRcvd, + latestGenerationStamp, requestedChecksum, cachingStrategy, + allowLazyPersist, targetPinnings[0], targetPinnings, + targetStorageIds[0], targetStorageIds); } else { new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0], - blockToken, clientname, targets, targetStorageTypes, srcDataNode, - stage, pipelineSize, minBytesRcvd, maxBytesRcvd, - latestGenerationStamp, requestedChecksum, cachingStrategy, - allowLazyPersist, false, targetPinnings); + blockToken, clientname, targets, targetStorageTypes, + srcDataNode, stage, pipelineSize, minBytesRcvd, maxBytesRcvd, + latestGenerationStamp, requestedChecksum, cachingStrategy, + allowLazyPersist, false, targetPinnings, + targetStorageIds[0], targetStorageIds); } mirrorOut.flush(); @@ -929,17 +941,19 @@ public void transferBlock(final ExtendedBlock blk, final Token blockToken, final String clientName, final DatanodeInfo[] targets, - final StorageType[] targetStorageTypes) throws IOException { + final StorageType[] targetStorageTypes, + final String[] targetStorageIds) throws IOException { previousOpClientName = clientName; updateCurrentThreadName(Op.TRANSFER_BLOCK + " " + blk); final DataOutputStream out = new DataOutputStream( getOutputStream()); checkAccess(out, true, blk, blockToken, Op.TRANSFER_BLOCK, - BlockTokenIdentifier.AccessMode.COPY, targetStorageTypes); + BlockTokenIdentifier.AccessMode.COPY, targetStorageTypes, + targetStorageIds); try { datanode.transferReplicaForPipelineRecovery(blk, targets, - targetStorageTypes, clientName); + targetStorageTypes, targetStorageIds, clientName); writeResponse(Status.SUCCESS, null, out); } catch (IOException ioe) { LOG.info("transferBlock " + blk + " received exception " + ioe); @@ -1105,12 +1119,14 @@ public void replaceBlock(final ExtendedBlock block, final StorageType storageType, final Token blockToken, final String delHint, - final DatanodeInfo proxySource) throws IOException { + final DatanodeInfo proxySource, + final String storageId) throws IOException { updateCurrentThreadName("Replacing block " + block + " from " + delHint); DataOutputStream replyOut = new DataOutputStream(getOutputStream()); checkAccess(replyOut, true, block, blockToken, Op.REPLACE_BLOCK, BlockTokenIdentifier.AccessMode.REPLACE, - new StorageType[]{ storageType }); + new StorageType[]{storageType}, + new String[]{storageId}); if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start String msg = "Not able to receive block " + block.getBlockId() + @@ -1131,7 +1147,7 @@ public void replaceBlock(final ExtendedBlock block, // Move the block to different storage in the same datanode if (proxySource.equals(datanode.getDatanodeId())) { ReplicaInfo oldReplica = datanode.data.moveBlockAcrossStorage(block, - storageType); + storageType, storageId); if (oldReplica != null) { LOG.info("Moved " + block + " from StorageType " + oldReplica.getVolume().getStorageType() + " to " + storageType); @@ -1188,7 +1204,7 @@ public void replaceBlock(final ExtendedBlock block, proxyReply, proxySock.getRemoteSocketAddress().toString(), proxySock.getLocalSocketAddress().toString(), null, 0, 0, 0, "", null, datanode, remoteChecksum, - CachingStrategy.newDropBehind(), false, false)); + CachingStrategy.newDropBehind(), false, false, storageId)); // receive a block blockReceiver.receiveBlock(null, null, replyOut, null, @@ -1258,11 +1274,12 @@ BlockReceiver getBlockReceiver( final DataNode dn, DataChecksum requestedChecksum, CachingStrategy cachingStrategy, final boolean allowLazyPersist, - final boolean pinning) throws IOException { + final boolean pinning, + final String storageId) throws IOException { return new BlockReceiver(block, storageType, in, inAddr, myAddr, stage, newGs, minBytesRcvd, maxBytesRcvd, clientname, srcDataNode, dn, requestedChecksum, - cachingStrategy, allowLazyPersist, pinning); + cachingStrategy, allowLazyPersist, pinning, storageId); } /** @@ -1365,7 +1382,7 @@ void checkAndWaitForBP(final ExtendedBlock block) private void checkAccess(OutputStream out, final boolean reply, ExtendedBlock blk, Token t, Op op, BlockTokenIdentifier.AccessMode mode) throws IOException { - checkAccess(out, reply, blk, t, op, mode, null); + checkAccess(out, reply, blk, t, op, mode, null, null); } private void checkAccess(OutputStream out, final boolean reply, @@ -1373,7 +1390,8 @@ private void checkAccess(OutputStream out, final boolean reply, final Token t, final Op op, final BlockTokenIdentifier.AccessMode mode, - final StorageType[] storageTypes) throws IOException { + final StorageType[] storageTypes, + final String[] storageIds) throws IOException { checkAndWaitForBP(blk); if (datanode.isBlockTokenEnabled) { if (LOG.isDebugEnabled()) { @@ -1382,7 +1400,7 @@ private void checkAccess(OutputStream out, final boolean reply, } try { datanode.blockPoolTokenSecretManager.checkAccess(t, null, blk, mode, - storageTypes); + storageTypes, storageIds); } catch(InvalidToken e) { try { if (reply) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java index 1492e5dd44..e076dda980 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java @@ -111,7 +111,8 @@ public void processErasureCodingTasks( new StripedReconstructionInfo( reconInfo.getExtendedBlock(), reconInfo.getErasureCodingPolicy(), reconInfo.getLiveBlockIndices(), reconInfo.getSourceDnInfos(), - reconInfo.getTargetDnInfos(), reconInfo.getTargetStorageTypes()); + reconInfo.getTargetDnInfos(), reconInfo.getTargetStorageTypes(), + reconInfo.getTargetStorageIDs()); final StripedBlockReconstructor task = new StripedBlockReconstructor(this, stripedReconInfo); if (task.hasValidTargets()) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java index b3884c2855..39ef67e7fe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java @@ -110,7 +110,7 @@ private BlockReader createBlockReader(long offsetInBlock) { stripedReader.getSocketAddress4Transfer(source); Token blockToken = datanode.getBlockAccessToken( block, EnumSet.of(BlockTokenIdentifier.AccessMode.READ), - StorageType.EMPTY_ARRAY); + StorageType.EMPTY_ARRAY, new String[0]); /* * This can be further improved if the replica is local, then we can * read directly from DN and need to check the replica is FINALIZED diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockWriter.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockWriter.java index a6989d4f54..24c1d61382 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockWriter.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockWriter.java @@ -61,6 +61,7 @@ class StripedBlockWriter { private final ExtendedBlock block; private final DatanodeInfo target; private final StorageType storageType; + private final String storageId; private Socket targetSocket; private DataOutputStream targetOutputStream; @@ -72,8 +73,8 @@ class StripedBlockWriter { StripedBlockWriter(StripedWriter stripedWriter, DataNode datanode, Configuration conf, ExtendedBlock block, - DatanodeInfo target, StorageType storageType) - throws IOException { + DatanodeInfo target, StorageType storageType, + String storageId) throws IOException { this.stripedWriter = stripedWriter; this.datanode = datanode; this.conf = conf; @@ -81,6 +82,7 @@ class StripedBlockWriter { this.block = block; this.target = target; this.storageType = storageType; + this.storageId = storageId; this.targetBuffer = stripedWriter.allocateWriteBuffer(); @@ -117,7 +119,7 @@ private void init() throws IOException { Token blockToken = datanode.getBlockAccessToken(block, EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE), - new StorageType[]{storageType}); + new StorageType[]{storageType}, new String[]{storageId}); long writeTimeout = datanode.getDnConf().getSocketWriteTimeout(); OutputStream unbufOut = NetUtils.getOutputStream(socket, writeTimeout); @@ -141,7 +143,7 @@ private void init() throws IOException { new StorageType[]{storageType}, source, BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0, 0, 0, stripedWriter.getChecksum(), stripedWriter.getCachingStrategy(), - false, false, null); + false, false, null, storageId, new String[]{storageId}); targetSocket = socket; targetOutputStream = out; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructionInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructionInfo.java index a5c328bd36..a619c34781 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructionInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructionInfo.java @@ -40,24 +40,27 @@ public class StripedReconstructionInfo { private final byte[] targetIndices; private final DatanodeInfo[] targets; private final StorageType[] targetStorageTypes; + private final String[] targetStorageIds; public StripedReconstructionInfo(ExtendedBlock blockGroup, ErasureCodingPolicy ecPolicy, byte[] liveIndices, DatanodeInfo[] sources, byte[] targetIndices) { - this(blockGroup, ecPolicy, liveIndices, sources, targetIndices, null, null); + this(blockGroup, ecPolicy, liveIndices, sources, targetIndices, null, + null, null); } StripedReconstructionInfo(ExtendedBlock blockGroup, ErasureCodingPolicy ecPolicy, byte[] liveIndices, DatanodeInfo[] sources, - DatanodeInfo[] targets, StorageType[] targetStorageTypes) { + DatanodeInfo[] targets, StorageType[] targetStorageTypes, + String[] targetStorageIds) { this(blockGroup, ecPolicy, liveIndices, sources, null, targets, - targetStorageTypes); + targetStorageTypes, targetStorageIds); } private StripedReconstructionInfo(ExtendedBlock blockGroup, ErasureCodingPolicy ecPolicy, byte[] liveIndices, DatanodeInfo[] sources, byte[] targetIndices, DatanodeInfo[] targets, - StorageType[] targetStorageTypes) { + StorageType[] targetStorageTypes, String[] targetStorageIds) { this.blockGroup = blockGroup; this.ecPolicy = ecPolicy; @@ -66,6 +69,7 @@ private StripedReconstructionInfo(ExtendedBlock blockGroup, this.targetIndices = targetIndices; this.targets = targets; this.targetStorageTypes = targetStorageTypes; + this.targetStorageIds = targetStorageIds; } ExtendedBlock getBlockGroup() { @@ -95,5 +99,9 @@ DatanodeInfo[] getTargets() { StorageType[] getTargetStorageTypes() { return targetStorageTypes; } + + String[] getTargetStorageIds() { + return targetStorageIds; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java index 225a7ed1cc..762506cfda 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java @@ -55,6 +55,7 @@ class StripedWriter { private final short[] targetIndices; private boolean hasValidTargets; private final StorageType[] targetStorageTypes; + private final String[] targetStorageIds; private StripedBlockWriter[] writers; @@ -77,6 +78,8 @@ class StripedWriter { assert targets != null; this.targetStorageTypes = stripedReconInfo.getTargetStorageTypes(); assert targetStorageTypes != null; + this.targetStorageIds = stripedReconInfo.getTargetStorageIds(); + assert targetStorageIds != null; writers = new StripedBlockWriter[targets.length]; @@ -192,7 +195,7 @@ int initTargetStreams() { private StripedBlockWriter createWriter(short index) throws IOException { return new StripedBlockWriter(this, datanode, conf, reconstructor.getBlock(targetIndices[index]), targets[index], - targetStorageTypes[index]); + targetStorageTypes[index], targetStorageIds[index]); } ByteBuffer allocateWriteBuffer() { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java index 39d9547c6c..efe222f6ed 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java @@ -113,8 +113,8 @@ public Configuration getConf() { new RoundRobinVolumeChoosingPolicy(); @Override - public V chooseVolume(List volumes, - long replicaSize) throws IOException { + public V chooseVolume(List volumes, long replicaSize, String storageId) + throws IOException { if (volumes.size() < 1) { throw new DiskOutOfSpaceException("No more available volumes"); } @@ -125,19 +125,20 @@ public V chooseVolume(List volumes, storageType.ordinal() : StorageType.DEFAULT.ordinal(); synchronized (syncLocks[index]) { - return doChooseVolume(volumes, replicaSize); + return doChooseVolume(volumes, replicaSize, storageId); } } - private V doChooseVolume(final List volumes, - long replicaSize) throws IOException { + private V doChooseVolume(final List volumes, long replicaSize, + String storageId) throws IOException { AvailableSpaceVolumeList volumesWithSpaces = new AvailableSpaceVolumeList(volumes); if (volumesWithSpaces.areAllVolumesWithinFreeSpaceThreshold()) { // If they're actually not too far out of whack, fall back on pure round // robin. - V volume = roundRobinPolicyBalanced.chooseVolume(volumes, replicaSize); + V volume = roundRobinPolicyBalanced.chooseVolume(volumes, replicaSize, + storageId); if (LOG.isDebugEnabled()) { LOG.debug("All volumes are within the configured free space balance " + "threshold. Selecting " + volume + " for write of block size " + @@ -165,7 +166,7 @@ private V doChooseVolume(final List volumes, if (mostAvailableAmongLowVolumes < replicaSize || random.nextFloat() < scaledPreferencePercent) { volume = roundRobinPolicyHighAvailable.chooseVolume( - highAvailableVolumes, replicaSize); + highAvailableVolumes, replicaSize, storageId); if (LOG.isDebugEnabled()) { LOG.debug("Volumes are imbalanced. Selecting " + volume + " from high available space volumes for write of block size " @@ -173,7 +174,7 @@ private V doChooseVolume(final List volumes, } } else { volume = roundRobinPolicyLowAvailable.chooseVolume( - lowAvailableVolumes, replicaSize); + lowAvailableVolumes, replicaSize, storageId); if (LOG.isDebugEnabled()) { LOG.debug("Volumes are imbalanced. Selecting " + volume + " from low available space volumes for write of block size " @@ -266,7 +267,8 @@ public List getVolumesWithHighAvailableSpace() { /** * Used so that we only check the available space on a given volume once, at - * the beginning of {@link AvailableSpaceVolumeChoosingPolicy#chooseVolume(List, long)}. + * the beginning of + * {@link AvailableSpaceVolumeChoosingPolicy#chooseVolume}. */ private class AvailableSpaceVolumePair { private final V volume; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java index 9e979f7363..d7e29cf4c8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java @@ -318,7 +318,7 @@ ReplicaInputStreams getTmpInputStreams(ExtendedBlock b, long blkoff, * @return the meta info of the replica which is being written to * @throws IOException if an error occurs */ - ReplicaHandler createTemporary(StorageType storageType, + ReplicaHandler createTemporary(StorageType storageType, String storageId, ExtendedBlock b) throws IOException; /** @@ -328,7 +328,7 @@ ReplicaHandler createTemporary(StorageType storageType, * @return the meta info of the replica which is being written to * @throws IOException if an error occurs */ - ReplicaHandler createRbw(StorageType storageType, + ReplicaHandler createRbw(StorageType storageType, String storageId, ExtendedBlock b, boolean allowLazyPersist) throws IOException; /** @@ -623,7 +623,7 @@ void onCompleteLazyPersist(String bpId, long blockId, * Move block from one storage to another storage */ ReplicaInfo moveBlockAcrossStorage(final ExtendedBlock block, - StorageType targetStorageType) throws IOException; + StorageType targetStorageType, String storageId) throws IOException; /** * Set a block to be pinned on this datanode so that it cannot be moved diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RoundRobinVolumeChoosingPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RoundRobinVolumeChoosingPolicy.java index 9474b92d99..b9bcf1ff27 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RoundRobinVolumeChoosingPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RoundRobinVolumeChoosingPolicy.java @@ -50,7 +50,7 @@ public RoundRobinVolumeChoosingPolicy() { } @Override - public V chooseVolume(final List volumes, long blockSize) + public V chooseVolume(final List volumes, long blockSize, String storageId) throws IOException { if (volumes.size() < 1) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/VolumeChoosingPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/VolumeChoosingPolicy.java index 62b1e759ce..8cbc0587b0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/VolumeChoosingPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/VolumeChoosingPolicy.java @@ -36,8 +36,11 @@ public interface VolumeChoosingPolicy { * * @param volumes - a list of available volumes. * @param replicaSize - the size of the replica for which a volume is sought. + * @param storageId - the storage id of the Volume nominated by the namenode. + * This can usually be ignored by the VolumeChoosingPolicy. * @return the chosen volume. * @throws IOException when disks are unavailable or are full. */ - public V chooseVolume(List volumes, long replicaSize) throws IOException; + V chooseVolume(List volumes, long replicaSize, String storageId) + throws IOException; } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 169e0e6118..9a5002ab7a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -927,7 +927,8 @@ static File[] copyBlockFiles(ReplicaInfo srcReplica, File dstMeta, */ @Override public ReplicaInfo moveBlockAcrossStorage(ExtendedBlock block, - StorageType targetStorageType) throws IOException { + StorageType targetStorageType, String targetStorageId) + throws IOException { ReplicaInfo replicaInfo = getReplicaInfo(block); if (replicaInfo.getState() != ReplicaState.FINALIZED) { throw new ReplicaNotFoundException( @@ -952,7 +953,8 @@ public ReplicaInfo moveBlockAcrossStorage(ExtendedBlock block, FsVolumeReference volumeRef = null; try (AutoCloseableLock lock = datasetLock.acquire()) { - volumeRef = volumes.getNextVolume(targetStorageType, block.getNumBytes()); + volumeRef = volumes.getNextVolume(targetStorageType, targetStorageId, + block.getNumBytes()); } try { moveBlock(block, replicaInfo, volumeRef); @@ -1298,11 +1300,11 @@ public Replica recoverClose(ExtendedBlock b, long newGS, } } } - + @Override // FsDatasetSpi public ReplicaHandler createRbw( - StorageType storageType, ExtendedBlock b, boolean allowLazyPersist) - throws IOException { + StorageType storageType, String storageId, ExtendedBlock b, + boolean allowLazyPersist) throws IOException { try (AutoCloseableLock lock = datasetLock.acquire()) { ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), b.getBlockId()); @@ -1335,7 +1337,7 @@ public ReplicaHandler createRbw( } if (ref == null) { - ref = volumes.getNextVolume(storageType, b.getNumBytes()); + ref = volumes.getNextVolume(storageType, storageId, b.getNumBytes()); } FsVolumeImpl v = (FsVolumeImpl) ref.getVolume(); @@ -1503,7 +1505,8 @@ public ReplicaInPipeline convertTemporaryToRbw( @Override // FsDatasetSpi public ReplicaHandler createTemporary( - StorageType storageType, ExtendedBlock b) throws IOException { + StorageType storageType, String storageId, ExtendedBlock b) + throws IOException { long startTimeMs = Time.monotonicNow(); long writerStopTimeoutMs = datanode.getDnConf().getXceiverStopTimeout(); ReplicaInfo lastFoundReplicaInfo = null; @@ -1516,7 +1519,7 @@ public ReplicaHandler createTemporary( invalidate(b.getBlockPoolId(), new Block[] { lastFoundReplicaInfo }); } FsVolumeReference ref = - volumes.getNextVolume(storageType, b.getNumBytes()); + volumes.getNextVolume(storageType, storageId, b.getNumBytes()); FsVolumeImpl v = (FsVolumeImpl) ref.getVolume(); ReplicaInPipeline newReplicaInfo; try { @@ -2899,7 +2902,7 @@ private boolean saveNextReplica() { replicaInfo.getVolume().isTransientStorage()) { // Pick a target volume to persist the block. targetReference = volumes.getNextVolume( - StorageType.DEFAULT, replicaInfo.getNumBytes()); + StorageType.DEFAULT, null, replicaInfo.getNumBytes()); targetVolume = (FsVolumeImpl) targetReference.getVolume(); ramDiskReplicaTracker.recordStartLazyPersist( diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java index e7f02287f3..75baf848af 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java @@ -81,10 +81,11 @@ List getVolumes() { return Collections.unmodifiableList(volumes); } - private FsVolumeReference chooseVolume(List list, long blockSize) - throws IOException { + private FsVolumeReference chooseVolume(List list, + long blockSize, String storageId) throws IOException { while (true) { - FsVolumeImpl volume = blockChooser.chooseVolume(list, blockSize); + FsVolumeImpl volume = blockChooser.chooseVolume(list, blockSize, + storageId); try { return volume.obtainReference(); } catch (ClosedChannelException e) { @@ -100,18 +101,20 @@ private FsVolumeReference chooseVolume(List list, long blockSize) * Get next volume. * * @param blockSize free space needed on the volume - * @param storageType the desired {@link StorageType} + * @param storageType the desired {@link StorageType} + * @param storageId the storage id which may or may not be used by + * the VolumeChoosingPolicy. * @return next volume to store the block in. */ - FsVolumeReference getNextVolume(StorageType storageType, long blockSize) - throws IOException { + FsVolumeReference getNextVolume(StorageType storageType, String storageId, + long blockSize) throws IOException { final List list = new ArrayList<>(volumes.size()); for(FsVolumeImpl v : volumes) { if (v.getStorageType() == storageType) { list.add(v); } } - return chooseVolume(list, blockSize); + return chooseVolume(list, blockSize, storageId); } /** @@ -129,7 +132,7 @@ FsVolumeReference getNextTransientVolume(long blockSize) throws IOException { list.add(v); } } - return chooseVolume(list, blockSize); + return chooseVolume(list, blockSize, null); } long getDfsUsed() throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index 74cdeaed44..c98a336990 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -1018,7 +1018,8 @@ public static BlockOpResponseProto transferRbw(final ExtendedBlock b, // send the request new Sender(out).transferBlock(b, new Token(), dfsClient.clientName, new DatanodeInfo[]{datanodes[1]}, - new StorageType[]{StorageType.DEFAULT}); + new StorageType[]{StorageType.DEFAULT}, + new String[0]); out.flush(); return BlockOpResponseProto.parseDelimitedFrom(in); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java index b6884daeea..3a8fb59154 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java @@ -1448,12 +1448,33 @@ public void testStorageTypeCheckAccess(){ testStorageTypeCheckAccessResult(new StorageType[]{StorageType.RAM_DISK, StorageType.SSD, StorageType.ARCHIVE}, new StorageType[]{StorageType.DISK}, false); + + testStorageTypeCheckAccessResult( + new StorageType[]{StorageType.DISK, StorageType.SSD}, + new StorageType[]{StorageType.SSD}, + true); + + testStorageTypeCheckAccessResult(new StorageType[]{StorageType.RAM_DISK}, + new StorageType[]{StorageType.DISK}, false); + + testStorageTypeCheckAccessResult( + new StorageType[]{StorageType.RAM_DISK, StorageType.SSD, + StorageType.ARCHIVE}, + new StorageType[]{StorageType.DISK}, + false); + + testStorageTypeCheckAccessResult( + new StorageType[]{StorageType.RAM_DISK, StorageType.SSD, + StorageType.ARCHIVE}, + new StorageType[]{StorageType.DISK}, + false); + } private void testStorageTypeCheckAccessResult(StorageType[] requested, StorageType[] allowed, boolean expAccess) { try { - BlockTokenSecretManager.checkAccess(requested, allowed); + BlockTokenSecretManager.checkAccess(requested, allowed, "StorageTypes"); if (!expAccess) { fail("No expected access with allowed StorageTypes " + Arrays.toString(allowed) + " and requested StorageTypes " @@ -1467,4 +1488,56 @@ private void testStorageTypeCheckAccessResult(StorageType[] requested, } } } + + @Test + public void testStorageIDCheckAccess() { + testStorageIDCheckAccessResult( + new String[]{"DN1-Storage1"}, + new String[]{"DN1-Storage1"}, true); + + testStorageIDCheckAccessResult(new String[]{"DN1-Storage1", "DN2-Storage1"}, + new String[]{"DN1-Storage1"}, + true); + + testStorageIDCheckAccessResult(new String[]{"DN1-Storage1", "DN2-Storage1"}, + new String[]{"DN1-Storage1", "DN1-Storage2"}, false); + + testStorageIDCheckAccessResult( + new String[]{"DN1-Storage1", "DN1-Storage2"}, + new String[]{"DN1-Storage1"}, true); + + testStorageIDCheckAccessResult( + new String[]{"DN1-Storage1", "DN1-Storage2"}, + new String[]{"DN2-Storage1"}, false); + + testStorageIDCheckAccessResult( + new String[]{"DN1-Storage2", "DN2-Storage2"}, + new String[]{"DN1-Storage1", "DN2-Storage1"}, false); + + testStorageIDCheckAccessResult(new String[0], new String[0], false); + + testStorageIDCheckAccessResult(new String[0], new String[]{"DN1-Storage1"}, + true); + + testStorageIDCheckAccessResult(new String[]{"DN1-Storage1"}, new String[0], + false); + } + + private void testStorageIDCheckAccessResult(String[] requested, + String[] allowed, boolean expAccess) { + try { + BlockTokenSecretManager.checkAccess(requested, allowed, "StorageIDs"); + if (!expAccess) { + fail("No expected access with allowed StorageIDs" + + Arrays.toString(allowed) + " and requested StorageIDs" + + Arrays.toString(requested)); + } + } catch (SecretManager.InvalidToken e) { + if (expAccess) { + fail("Expected access with allowed StorageIDs " + + Arrays.toString(allowed) + " and requested StorageIDs" + + Arrays.toString(requested)); + } + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java index 3f4fe28836..7a2ac1ba3f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java @@ -559,6 +559,7 @@ void writeBlock(ExtendedBlock block, BlockConstructionStage stage, BlockTokenSecretManager.DUMMY_TOKEN, "cl", new DatanodeInfo[1], new StorageType[1], null, stage, 0, block.getNumBytes(), block.getNumBytes(), newGS, - checksum, CachingStrategy.newDefaultStrategy(), false, false, null); + checksum, CachingStrategy.newDefaultStrategy(), false, false, + null, null, new String[0]); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java index 5c1b38f114..e159914797 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java @@ -98,11 +98,11 @@ public FsDatasetChecker(DataStorage storage, Configuration conf) { * correctly propagate the hint to FsDatasetSpi. */ @Override - public synchronized ReplicaHandler createRbw( - StorageType storageType, ExtendedBlock b, boolean allowLazyPersist) + public synchronized ReplicaHandler createRbw(StorageType storageType, + String storageId, ExtendedBlock b, boolean allowLazyPersist) throws IOException { assertThat(b.getLocalBlock().getNumBytes(), is(EXPECTED_BLOCK_LENGTH)); - return super.createRbw(storageType, b, allowLazyPersist); + return super.createRbw(storageType, storageId, b, allowLazyPersist); } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java index e98207f18c..747f2952d2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java @@ -151,7 +151,7 @@ public GetReplicaVisibleLengthResponseProto answer( assertTrue("Received BlockTokenIdentifier is wrong", ident.equals(id)); sm.checkAccess(id, null, PBHelperClient.convert(req.getBlock()), BlockTokenIdentifier.AccessMode.WRITE, - new StorageType[]{StorageType.DEFAULT}); + new StorageType[]{StorageType.DEFAULT}, null); result = id.getBlockId(); } return GetReplicaVisibleLengthResponseProto.newBuilder() @@ -160,11 +160,11 @@ public GetReplicaVisibleLengthResponseProto answer( } private BlockTokenIdentifier generateTokenId(BlockTokenSecretManager sm, - ExtendedBlock block, - EnumSet accessModes, - StorageType... storageTypes) throws IOException { + ExtendedBlock block, EnumSet accessModes, + StorageType[] storageTypes, String[] storageIds) + throws IOException { Token token = sm.generateToken(block, accessModes, - storageTypes); + storageTypes, storageIds); BlockTokenIdentifier id = sm.createIdentifier(); id.readFields(new DataInputStream(new ByteArrayInputStream(token .getIdentifier()))); @@ -178,29 +178,28 @@ private void testWritable(boolean enableProtobuf) throws Exception { enableProtobuf); TestWritable.testWritable(generateTokenId(sm, block3, EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class), - StorageType.DEFAULT)); + new StorageType[]{StorageType.DEFAULT}, null)); TestWritable.testWritable(generateTokenId(sm, block3, EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE), - StorageType.DEFAULT)); + new StorageType[]{StorageType.DEFAULT}, null)); TestWritable.testWritable(generateTokenId(sm, block3, EnumSet.allOf(BlockTokenIdentifier.AccessMode.class), - StorageType.DEFAULT)); + new StorageType[]{StorageType.DEFAULT}, null)); TestWritable.testWritable(generateTokenId(sm, block1, EnumSet.allOf(BlockTokenIdentifier.AccessMode.class), - StorageType.DEFAULT)); + new StorageType[]{StorageType.DEFAULT}, null)); TestWritable.testWritable(generateTokenId(sm, block2, EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE), - StorageType.DEFAULT)); + new StorageType[]{StorageType.DEFAULT}, null)); TestWritable.testWritable(generateTokenId(sm, block3, EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class), - StorageType.DEFAULT)); + new StorageType[]{StorageType.DEFAULT}, null)); // We must be backwards compatible when adding storageType TestWritable.testWritable(generateTokenId(sm, block3, - EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class), - (StorageType[]) null)); + EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class), null, null)); TestWritable.testWritable(generateTokenId(sm, block3, EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class), - StorageType.EMPTY_ARRAY)); + StorageType.EMPTY_ARRAY, null)); } @Test @@ -215,35 +214,36 @@ public void testWritableProtobuf() throws Exception { private static void checkAccess(BlockTokenSecretManager m, Token t, ExtendedBlock blk, - BlockTokenIdentifier.AccessMode mode) throws SecretManager.InvalidToken { - m.checkAccess(t, null, blk, mode, new StorageType[]{ StorageType.DEFAULT }); + BlockTokenIdentifier.AccessMode mode, StorageType[] storageTypes, + String[] storageIds) throws SecretManager.InvalidToken { + m.checkAccess(t, null, blk, mode, storageTypes, storageIds); } private void tokenGenerationAndVerification(BlockTokenSecretManager master, - BlockTokenSecretManager slave, StorageType... storageTypes) - throws Exception { + BlockTokenSecretManager slave, StorageType[] storageTypes, + String[] storageIds) throws Exception { // single-mode tokens for (BlockTokenIdentifier.AccessMode mode : BlockTokenIdentifier.AccessMode .values()) { // generated by master Token token1 = master.generateToken(block1, - EnumSet.of(mode), storageTypes); - checkAccess(master, token1, block1, mode); - checkAccess(slave, token1, block1, mode); + EnumSet.of(mode), storageTypes, storageIds); + checkAccess(master, token1, block1, mode, storageTypes, storageIds); + checkAccess(slave, token1, block1, mode, storageTypes, storageIds); // generated by slave Token token2 = slave.generateToken(block2, - EnumSet.of(mode), storageTypes); - checkAccess(master, token2, block2, mode); - checkAccess(slave, token2, block2, mode); + EnumSet.of(mode), storageTypes, storageIds); + checkAccess(master, token2, block2, mode, storageTypes, storageIds); + checkAccess(slave, token2, block2, mode, storageTypes, storageIds); } // multi-mode tokens Token mtoken = master.generateToken(block3, EnumSet.allOf(BlockTokenIdentifier.AccessMode.class), - storageTypes); + storageTypes, storageIds); for (BlockTokenIdentifier.AccessMode mode : BlockTokenIdentifier.AccessMode .values()) { - checkAccess(master, mtoken, block3, mode); - checkAccess(slave, mtoken, block3, mode); + checkAccess(master, mtoken, block3, mode, storageTypes, storageIds); + checkAccess(slave, mtoken, block3, mode, storageTypes, storageIds); } } @@ -259,18 +259,18 @@ private void testBlockTokenSecretManager(boolean enableProtobuf) ExportedBlockKeys keys = masterHandler.exportKeys(); slaveHandler.addKeys(keys); tokenGenerationAndVerification(masterHandler, slaveHandler, - StorageType.DEFAULT); - tokenGenerationAndVerification(masterHandler, slaveHandler, null); + new StorageType[]{StorageType.DEFAULT}, null); + tokenGenerationAndVerification(masterHandler, slaveHandler, null, null); // key updating masterHandler.updateKeys(); tokenGenerationAndVerification(masterHandler, slaveHandler, - StorageType.DEFAULT); - tokenGenerationAndVerification(masterHandler, slaveHandler, null); + new StorageType[]{StorageType.DEFAULT}, null); + tokenGenerationAndVerification(masterHandler, slaveHandler, null, null); keys = masterHandler.exportKeys(); slaveHandler.addKeys(keys); tokenGenerationAndVerification(masterHandler, slaveHandler, - StorageType.DEFAULT); - tokenGenerationAndVerification(masterHandler, slaveHandler, null); + new StorageType[]{StorageType.DEFAULT}, null); + tokenGenerationAndVerification(masterHandler, slaveHandler, null, null); } @Test @@ -315,7 +315,7 @@ private void testBlockTokenRpc(boolean enableProtobuf) throws Exception { enableProtobuf); Token token = sm.generateToken(block3, EnumSet.allOf(BlockTokenIdentifier.AccessMode.class), - new StorageType[]{StorageType.DEFAULT}); + new StorageType[]{StorageType.DEFAULT}, new String[0]); final Server server = createMockDatanode(sm, token, conf); @@ -365,7 +365,7 @@ private void testBlockTokenRpcLeak(boolean enableProtobuf) throws Exception { enableProtobuf); Token token = sm.generateToken(block3, EnumSet.allOf(BlockTokenIdentifier.AccessMode.class), - new StorageType[]{StorageType.DEFAULT}); + new StorageType[]{StorageType.DEFAULT}, new String[0]); final Server server = createMockDatanode(sm, token, conf); server.start(); @@ -451,19 +451,23 @@ private void testBlockPoolTokenSecretManager(boolean enableProtobuf) ExportedBlockKeys keys = masterHandler.exportKeys(); bpMgr.addKeys(bpid, keys); + String[] storageIds = new String[] {"DS-9001"}; tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid), - StorageType.DEFAULT); - tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid), null); + new StorageType[]{StorageType.DEFAULT}, storageIds); + tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid), null, + null); // Test key updating masterHandler.updateKeys(); tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid), - StorageType.DEFAULT); - tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid), null); + new StorageType[]{StorageType.DEFAULT}, storageIds); + tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid), null, + null); keys = masterHandler.exportKeys(); bpMgr.addKeys(bpid, keys); tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid), - StorageType.DEFAULT); - tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid), null); + new StorageType[]{StorageType.DEFAULT}, new String[]{"DS-9001"}); + tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid), null, + null); } } @@ -540,7 +544,7 @@ public void testLegacyBlockTokenBytesIsLegacy() throws IOException { useProto); Token token = sm.generateToken(block1, EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class), - new StorageType[]{StorageType.DEFAULT}); + new StorageType[]{StorageType.DEFAULT}, new String[0]); final byte[] tokenBytes = token.getIdentifier(); BlockTokenIdentifier legacyToken = new BlockTokenIdentifier(); BlockTokenIdentifier protobufToken = new BlockTokenIdentifier(); @@ -605,7 +609,7 @@ public void testProtobufBlockTokenBytesIsProtobuf() throws IOException { useProto); Token token = sm.generateToken(block1, EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class), - StorageType.EMPTY_ARRAY); + StorageType.EMPTY_ARRAY, new String[0]); final byte[] tokenBytes = token.getIdentifier(); BlockTokenIdentifier legacyToken = new BlockTokenIdentifier(); BlockTokenIdentifier protobufToken = new BlockTokenIdentifier(); @@ -699,7 +703,8 @@ public void testCraftedProtobufBlockTokenBytesIsProtobuf() throws */ BlockTokenIdentifier identifier = new BlockTokenIdentifier("user", "blockpool", 123, EnumSet.allOf(BlockTokenIdentifier.AccessMode.class), - new StorageType[]{StorageType.DISK, StorageType.ARCHIVE}, true); + new StorageType[]{StorageType.DISK, StorageType.ARCHIVE}, + new String[] {"fake-storage-id"}, true); Calendar cal = new GregorianCalendar(); cal.set(2017, 1, 9, 0, 12, 35); long datetime = cal.getTimeInMillis(); @@ -749,7 +754,8 @@ private void testBlockTokenSerialization(boolean useProto) throws new StorageType[]{StorageType.RAM_DISK, StorageType.SSD, StorageType.DISK, StorageType.ARCHIVE}; BlockTokenIdentifier ident = new BlockTokenIdentifier("user", "bpool", - 123, accessModes, storageTypes, useProto); + 123, accessModes, storageTypes, new String[] {"fake-storage-id"}, + useProto); ident.setExpiryDate(1487080345L); BlockTokenIdentifier ret = writeAndReadBlockToken(ident); assertEquals(ret.getExpiryDate(), 1487080345L); @@ -760,6 +766,7 @@ private void testBlockTokenSerialization(boolean useProto) throws assertEquals(ret.getAccessModes(), EnumSet.allOf(BlockTokenIdentifier.AccessMode.class)); assertArrayEquals(ret.getStorageTypes(), storageTypes); + assertArrayEquals(ret.getStorageIds(), new String[] {"fake-storage-id"}); } @Test @@ -767,5 +774,4 @@ public void testBlockTokenSerialization() throws IOException { testBlockTokenSerialization(false); testBlockTokenSerialization(true); } - } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java index 6810a0b3db..c9ff57221f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java @@ -389,7 +389,7 @@ public void blockReport_04() throws IOException { // Create a bogus new block which will not be present on the namenode. ExtendedBlock b = new ExtendedBlock( poolId, rand.nextLong(), 1024L, rand.nextLong()); - dn.getFSDataset().createRbw(StorageType.DEFAULT, b, false); + dn.getFSDataset().createRbw(StorageType.DEFAULT, null, b, false); DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index cd3befd1b8..18b4922b51 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -1023,21 +1023,22 @@ public synchronized ReplicaHandler recoverRbw( @Override // FsDatasetSpi public synchronized ReplicaHandler createRbw( - StorageType storageType, ExtendedBlock b, + StorageType storageType, String storageId, ExtendedBlock b, boolean allowLazyPersist) throws IOException { - return createTemporary(storageType, b); + return createTemporary(storageType, storageId, b); } @Override // FsDatasetSpi public synchronized ReplicaHandler createTemporary( - StorageType storageType, ExtendedBlock b) throws IOException { + StorageType storageType, String storageId, ExtendedBlock b) + throws IOException { if (isValidBlock(b)) { - throw new ReplicaAlreadyExistsException("Block " + b + - " is valid, and cannot be written to."); - } + throw new ReplicaAlreadyExistsException("Block " + b + + " is valid, and cannot be written to."); + } if (isValidRbw(b)) { - throw new ReplicaAlreadyExistsException("Block " + b + - " is being written, and cannot be written to."); + throw new ReplicaAlreadyExistsException("Block " + b + + " is being written, and cannot be written to."); } final Map map = getMap(b.getBlockPoolId()); BInfo binfo = new BInfo(b.getBlockPoolId(), b.getLocalBlock(), true); @@ -1419,7 +1420,7 @@ public void onFailLazyPersist(String bpId, long blockId) { @Override public ReplicaInfo moveBlockAcrossStorage(ExtendedBlock block, - StorageType targetStorageType) throws IOException { + StorageType targetStorageType, String storageId) throws IOException { // TODO Auto-generated method stub return null; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java index 579252bd38..311d5a67c3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java @@ -647,7 +647,7 @@ public void testNoReplicaUnderRecovery() throws IOException { if(LOG.isDebugEnabled()) { LOG.debug("Running " + GenericTestUtils.getMethodName()); } - dn.data.createRbw(StorageType.DEFAULT, block, false); + dn.data.createRbw(StorageType.DEFAULT, null, block, false); BlockRecoveryWorker.RecoveryTaskContiguous RecoveryTaskContiguous = recoveryWorker.new RecoveryTaskContiguous(rBlock); try { @@ -673,7 +673,7 @@ public void testNotMatchedReplicaID() throws IOException { LOG.debug("Running " + GenericTestUtils.getMethodName()); } ReplicaInPipeline replicaInfo = dn.data.createRbw( - StorageType.DEFAULT, block, false).getReplica(); + StorageType.DEFAULT, null, block, false).getReplica(); ReplicaOutputStreams streams = null; try { streams = replicaInfo.createStreams(true, @@ -972,7 +972,7 @@ public void run() { // Register this thread as the writer for the recoveringBlock. LOG.debug("slowWriter creating rbw"); ReplicaHandler replicaHandler = - spyDN.data.createRbw(StorageType.DISK, block, false); + spyDN.data.createRbw(StorageType.DISK, null, block, false); replicaHandler.close(); LOG.debug("slowWriter created rbw"); // Tell the parent thread to start progressing. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java index f811bd85da..8992d47d52 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java @@ -394,7 +394,7 @@ private boolean replaceBlock( DataOutputStream out = new DataOutputStream(sock.getOutputStream()); new Sender(out).replaceBlock(block, targetStorageType, BlockTokenSecretManager.DUMMY_TOKEN, source.getDatanodeUuid(), - sourceProxy); + sourceProxy, null); out.flush(); // receiveResponse DataInputStream reply = new DataInputStream(sock.getInputStream()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverLazyPersistHint.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverLazyPersistHint.java index b2bfe4998a..8fda664897 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverLazyPersistHint.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverLazyPersistHint.java @@ -129,7 +129,7 @@ private void issueWriteBlockCall(DataXceiver xceiver, boolean lazyPersist) DataChecksum.newDataChecksum(DataChecksum.Type.NULL, 0), CachingStrategy.newDefaultStrategy(), lazyPersist, - false, null); + false, null, null, new String[0]); } // Helper functions to setup the mock objects. @@ -151,7 +151,7 @@ private static DataXceiver makeStubDataXceiver( any(BlockConstructionStage.class), anyLong(), anyLong(), anyLong(), anyString(), any(DatanodeInfo.class), any(DataNode.class), any(DataChecksum.class), any(CachingStrategy.class), - captor.capture(), anyBoolean()); + captor.capture(), anyBoolean(), any(String.class)); doReturn(mock(DataOutputStream.class)).when(xceiverSpy) .getBufferedOutputStream(); return xceiverSpy; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java index cd86720ef8..38e4287020 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java @@ -167,7 +167,8 @@ public void testReplicationError() throws Exception { BlockTokenSecretManager.DUMMY_TOKEN, "", new DatanodeInfo[0], new StorageType[0], null, BlockConstructionStage.PIPELINE_SETUP_CREATE, 1, 0L, 0L, 0L, - checksum, CachingStrategy.newDefaultStrategy(), false, false, null); + checksum, CachingStrategy.newDefaultStrategy(), false, false, + null, null, new String[0]); out.flush(); // close the connection before sending the content of the block @@ -274,7 +275,7 @@ public void testDataTransferWhenBytesPerChecksumIsZero() throws IOException { dn1.getDatanodeId()); dn0.transferBlock(block, new DatanodeInfo[]{dnd1}, - new StorageType[]{StorageType.DISK}); + new StorageType[]{StorageType.DISK}, new String[0]); // Sleep for 1 second so the DataTrasnfer daemon can start transfer. try { Thread.sleep(1000); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java index 4e724bc7cb..2e69595d4c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java @@ -81,7 +81,7 @@ static int addSomeBlocks(SimulatedFSDataset fsdataset, long startingBlockId, // we pass expected len as zero, - fsdataset should use the sizeof actual // data written ReplicaInPipeline bInfo = fsdataset.createRbw( - StorageType.DEFAULT, b, false).getReplica(); + StorageType.DEFAULT, null, b, false).getReplica(); ReplicaOutputStreams out = bInfo.createStreams(true, DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512)); try { @@ -368,7 +368,7 @@ public void run() { ExtendedBlock block = new ExtendedBlock(newbpid,1); try { // it will throw an exception if the block pool is not found - fsdataset.createTemporary(StorageType.DEFAULT, block); + fsdataset.createTemporary(StorageType.DEFAULT, null, block); } catch (IOException ioe) { // JUnit does not capture exception in non-main thread, // so cache it and then let main thread throw later. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java index 62ef731e32..2e439d68ac 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java @@ -138,14 +138,15 @@ public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b, long blkoff, } @Override - public ReplicaHandler createTemporary(StorageType t, ExtendedBlock b) + public ReplicaHandler createTemporary(StorageType t, String i, + ExtendedBlock b) throws IOException { return new ReplicaHandler(new ExternalReplicaInPipeline(), null); } @Override - public ReplicaHandler createRbw(StorageType t, ExtendedBlock b, boolean tf) - throws IOException { + public ReplicaHandler createRbw(StorageType storageType, String id, + ExtendedBlock b, boolean tf) throws IOException { return new ReplicaHandler(new ExternalReplicaInPipeline(), null); } @@ -332,7 +333,8 @@ public void onFailLazyPersist(String bpId, long blockId) { } @Override - public ReplicaInfo moveBlockAcrossStorage(ExtendedBlock block, StorageType targetStorageType) throws IOException { + public ReplicaInfo moveBlockAcrossStorage(ExtendedBlock block, + StorageType targetStorageType, String storageId) throws IOException { return null; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestAvailableSpaceVolumeChoosingPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestAvailableSpaceVolumeChoosingPolicy.java index 9414a0e8a9..24a43e7a91 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestAvailableSpaceVolumeChoosingPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestAvailableSpaceVolumeChoosingPolicy.java @@ -89,10 +89,12 @@ public void testTwoUnbalancedVolumes() throws Exception { // than the threshold of 1MB. volumes.add(Mockito.mock(FsVolumeSpi.class)); Mockito.when(volumes.get(1).getAvailable()).thenReturn(1024L * 1024L * 3); - - Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100)); - Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100)); - Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100)); + Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100, + null)); + Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100, + null)); + Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100, + null)); } @Test(timeout=60000) @@ -115,21 +117,29 @@ public void testThreeUnbalancedVolumes() throws Exception { // Third volume, again with 3MB free space. volumes.add(Mockito.mock(FsVolumeSpi.class)); Mockito.when(volumes.get(2).getAvailable()).thenReturn(1024L * 1024L * 3); - + // We should alternate assigning between the two volumes with a lot of free // space. initPolicy(policy, 1.0f); - Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100)); - Assert.assertEquals(volumes.get(2), policy.chooseVolume(volumes, 100)); - Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100)); - Assert.assertEquals(volumes.get(2), policy.chooseVolume(volumes, 100)); + Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100, + null)); + Assert.assertEquals(volumes.get(2), policy.chooseVolume(volumes, 100, + null)); + Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100, + null)); + Assert.assertEquals(volumes.get(2), policy.chooseVolume(volumes, 100, + null)); // All writes should be assigned to the volume with the least free space. initPolicy(policy, 0.0f); - Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100)); - Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100)); - Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100)); - Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100)); + Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100, + null)); + Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100, + null)); + Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100, + null)); + Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100, + null)); } @Test(timeout=60000) @@ -156,22 +166,30 @@ public void testFourUnbalancedVolumes() throws Exception { // Fourth volume, again with 3MB free space. volumes.add(Mockito.mock(FsVolumeSpi.class)); Mockito.when(volumes.get(3).getAvailable()).thenReturn(1024L * 1024L * 3); - + // We should alternate assigning between the two volumes with a lot of free // space. initPolicy(policy, 1.0f); - Assert.assertEquals(volumes.get(2), policy.chooseVolume(volumes, 100)); - Assert.assertEquals(volumes.get(3), policy.chooseVolume(volumes, 100)); - Assert.assertEquals(volumes.get(2), policy.chooseVolume(volumes, 100)); - Assert.assertEquals(volumes.get(3), policy.chooseVolume(volumes, 100)); + Assert.assertEquals(volumes.get(2), policy.chooseVolume(volumes, 100, + null)); + Assert.assertEquals(volumes.get(3), policy.chooseVolume(volumes, 100, + null)); + Assert.assertEquals(volumes.get(2), policy.chooseVolume(volumes, 100, + null)); + Assert.assertEquals(volumes.get(3), policy.chooseVolume(volumes, 100, + null)); // We should alternate assigning between the two volumes with less free // space. initPolicy(policy, 0.0f); - Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100)); - Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100)); - Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100)); - Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100)); + Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100, + null)); + Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100, + null)); + Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100, + null)); + Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100, + null)); } @Test(timeout=60000) @@ -190,13 +208,14 @@ public void testNotEnoughSpaceOnSelectedVolume() throws Exception { // than the threshold of 1MB. volumes.add(Mockito.mock(FsVolumeSpi.class)); Mockito.when(volumes.get(1).getAvailable()).thenReturn(1024L * 1024L * 3); - + // All writes should be assigned to the volume with the least free space. // However, if the volume with the least free space doesn't have enough // space to accept the replica size, and another volume does have enough // free space, that should be chosen instead. initPolicy(policy, 0.0f); - Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 1024L * 1024L * 2)); + Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, + 1024L * 1024L * 2, null)); } @Test(timeout=60000) @@ -220,10 +239,11 @@ public void testAvailableSpaceChanges() throws Exception { .thenReturn(1024L * 1024L * 3) .thenReturn(1024L * 1024L * 3) .thenReturn(1024L * 1024L * 1); // After the third check, return 1MB. - + // Should still be able to get a volume for the replica even though the // available space on the second volume changed. - Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100)); + Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, + 100, null)); } @Test(timeout=60000) @@ -271,12 +291,12 @@ public void doRandomizedTest(float preferencePercent, int lowSpaceVolumes, Mockito.when(volume.getAvailable()).thenReturn(1024L * 1024L * 3); volumes.add(volume); } - + initPolicy(policy, preferencePercent); long lowAvailableSpaceVolumeSelected = 0; long highAvailableSpaceVolumeSelected = 0; for (int i = 0; i < RANDOMIZED_ITERATIONS; i++) { - FsVolumeSpi volume = policy.chooseVolume(volumes, 100); + FsVolumeSpi volume = policy.chooseVolume(volumes, 100, null); for (int j = 0; j < volumes.size(); j++) { // Note how many times the first low available volume was selected if (volume == volumes.get(j) && j == 0) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestRoundRobinVolumeChoosingPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestRoundRobinVolumeChoosingPolicy.java index 9b3047f949..44e2a30e55 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestRoundRobinVolumeChoosingPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestRoundRobinVolumeChoosingPolicy.java @@ -50,20 +50,21 @@ public static void testRR(VolumeChoosingPolicy policy) // Second volume, with 200 bytes of space. volumes.add(Mockito.mock(FsVolumeSpi.class)); Mockito.when(volumes.get(1).getAvailable()).thenReturn(200L); - + // Test two rounds of round-robin choosing - Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 0)); - Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 0)); - Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 0)); - Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 0)); + Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 0, null)); + Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 0, null)); + Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 0, null)); + Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 0, null)); // The first volume has only 100L space, so the policy should // wisely choose the second one in case we ask for more. - Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 150)); + Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 150, + null)); // Fail if no volume can be chosen? try { - policy.chooseVolume(volumes, Long.MAX_VALUE); + policy.chooseVolume(volumes, Long.MAX_VALUE, null); Assert.fail(); } catch (IOException e) { // Passed. @@ -93,7 +94,7 @@ public static void testRRPolicyExceptionMessage( int blockSize = 700; try { - policy.chooseVolume(volumes, blockSize); + policy.chooseVolume(volumes, blockSize, null); Assert.fail("expected to throw DiskOutOfSpaceException"); } catch(DiskOutOfSpaceException e) { Assert.assertEquals("Not returnig the expected message", @@ -137,21 +138,21 @@ public static void testRRPolicyWithStorageTypes( Mockito.when(ssdVolumes.get(1).getAvailable()).thenReturn(100L); Assert.assertEquals(diskVolumes.get(0), - policy.chooseVolume(diskVolumes, 0)); + policy.chooseVolume(diskVolumes, 0, null)); // Independent Round-Robin for different storage type Assert.assertEquals(ssdVolumes.get(0), - policy.chooseVolume(ssdVolumes, 0)); + policy.chooseVolume(ssdVolumes, 0, null)); // Take block size into consideration Assert.assertEquals(ssdVolumes.get(0), - policy.chooseVolume(ssdVolumes, 150L)); + policy.chooseVolume(ssdVolumes, 150L, null)); Assert.assertEquals(diskVolumes.get(1), - policy.chooseVolume(diskVolumes, 0)); + policy.chooseVolume(diskVolumes, 0, null)); Assert.assertEquals(diskVolumes.get(0), - policy.chooseVolume(diskVolumes, 50L)); + policy.chooseVolume(diskVolumes, 50L, null)); try { - policy.chooseVolume(diskVolumes, 200L); + policy.chooseVolume(diskVolumes, 200L, null); Assert.fail("Should throw an DiskOutOfSpaceException before this!"); } catch (DiskOutOfSpaceException e) { // Pass. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java index 905c3f0c7f..32935613aa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java @@ -259,7 +259,7 @@ public void testRemoveVolumes() throws IOException { String bpid = BLOCK_POOL_IDS[NUM_BLOCKS % BLOCK_POOL_IDS.length]; ExtendedBlock eb = new ExtendedBlock(bpid, i); try (ReplicaHandler replica = - dataset.createRbw(StorageType.DEFAULT, eb, false)) { + dataset.createRbw(StorageType.DEFAULT, null, eb, false)) { } } final String[] dataDirs = @@ -566,7 +566,7 @@ public void run() { class ResponderThread extends Thread { public void run() { try (ReplicaHandler replica = dataset - .createRbw(StorageType.DEFAULT, eb, false)) { + .createRbw(StorageType.DEFAULT, null, eb, false)) { LOG.info("CreateRbw finished"); startFinalizeLatch.countDown(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java index 83c15caf63..ee3a79f940 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java @@ -101,7 +101,7 @@ public Boolean get() { } for (int i = 0; i < 10; i++) { try (FsVolumeReference ref = - volumeList.getNextVolume(StorageType.DEFAULT, 128)) { + volumeList.getNextVolume(StorageType.DEFAULT, null, 128)) { // volume No.2 will not be chosen. assertNotEquals(ref.getVolume(), volumes.get(1)); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java index da53cae714..11525ed8c9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java @@ -353,7 +353,7 @@ private void testWriteToRbw(FsDatasetImpl dataSet, ExtendedBlock[] blocks) throw } try { - dataSet.createRbw(StorageType.DEFAULT, blocks[FINALIZED], false); + dataSet.createRbw(StorageType.DEFAULT, null, blocks[FINALIZED], false); Assert.fail("Should not have created a replica that's already " + "finalized " + blocks[FINALIZED]); } catch (ReplicaAlreadyExistsException e) { @@ -371,7 +371,7 @@ private void testWriteToRbw(FsDatasetImpl dataSet, ExtendedBlock[] blocks) throw } try { - dataSet.createRbw(StorageType.DEFAULT, blocks[TEMPORARY], false); + dataSet.createRbw(StorageType.DEFAULT, null, blocks[TEMPORARY], false); Assert.fail("Should not have created a replica that had created as " + "temporary " + blocks[TEMPORARY]); } catch (ReplicaAlreadyExistsException e) { @@ -381,7 +381,7 @@ private void testWriteToRbw(FsDatasetImpl dataSet, ExtendedBlock[] blocks) throw 0L, blocks[RBW].getNumBytes()); // expect to be successful try { - dataSet.createRbw(StorageType.DEFAULT, blocks[RBW], false); + dataSet.createRbw(StorageType.DEFAULT, null, blocks[RBW], false); Assert.fail("Should not have created a replica that had created as RBW " + blocks[RBW]); } catch (ReplicaAlreadyExistsException e) { @@ -397,7 +397,7 @@ private void testWriteToRbw(FsDatasetImpl dataSet, ExtendedBlock[] blocks) throw } try { - dataSet.createRbw(StorageType.DEFAULT, blocks[RWR], false); + dataSet.createRbw(StorageType.DEFAULT, null, blocks[RWR], false); Assert.fail("Should not have created a replica that was waiting to be " + "recovered " + blocks[RWR]); } catch (ReplicaAlreadyExistsException e) { @@ -413,7 +413,7 @@ private void testWriteToRbw(FsDatasetImpl dataSet, ExtendedBlock[] blocks) throw } try { - dataSet.createRbw(StorageType.DEFAULT, blocks[RUR], false); + dataSet.createRbw(StorageType.DEFAULT, null, blocks[RUR], false); Assert.fail("Should not have created a replica that was under recovery " + blocks[RUR]); } catch (ReplicaAlreadyExistsException e) { @@ -430,49 +430,49 @@ private void testWriteToRbw(FsDatasetImpl dataSet, ExtendedBlock[] blocks) throw e.getMessage().contains(ReplicaNotFoundException.NON_EXISTENT_REPLICA)); } - dataSet.createRbw(StorageType.DEFAULT, blocks[NON_EXISTENT], false); + dataSet.createRbw(StorageType.DEFAULT, null, blocks[NON_EXISTENT], false); } private void testWriteToTemporary(FsDatasetImpl dataSet, ExtendedBlock[] blocks) throws IOException { try { - dataSet.createTemporary(StorageType.DEFAULT, blocks[FINALIZED]); + dataSet.createTemporary(StorageType.DEFAULT, null, blocks[FINALIZED]); Assert.fail("Should not have created a temporary replica that was " + "finalized " + blocks[FINALIZED]); } catch (ReplicaAlreadyExistsException e) { } try { - dataSet.createTemporary(StorageType.DEFAULT, blocks[TEMPORARY]); + dataSet.createTemporary(StorageType.DEFAULT, null, blocks[TEMPORARY]); Assert.fail("Should not have created a replica that had created as" + "temporary " + blocks[TEMPORARY]); } catch (ReplicaAlreadyExistsException e) { } try { - dataSet.createTemporary(StorageType.DEFAULT, blocks[RBW]); + dataSet.createTemporary(StorageType.DEFAULT, null, blocks[RBW]); Assert.fail("Should not have created a replica that had created as RBW " + blocks[RBW]); } catch (ReplicaAlreadyExistsException e) { } try { - dataSet.createTemporary(StorageType.DEFAULT, blocks[RWR]); + dataSet.createTemporary(StorageType.DEFAULT, null, blocks[RWR]); Assert.fail("Should not have created a replica that was waiting to be " + "recovered " + blocks[RWR]); } catch (ReplicaAlreadyExistsException e) { } try { - dataSet.createTemporary(StorageType.DEFAULT, blocks[RUR]); + dataSet.createTemporary(StorageType.DEFAULT, null, blocks[RUR]); Assert.fail("Should not have created a replica that was under recovery " + blocks[RUR]); } catch (ReplicaAlreadyExistsException e) { } - dataSet.createTemporary(StorageType.DEFAULT, blocks[NON_EXISTENT]); + dataSet.createTemporary(StorageType.DEFAULT, null, blocks[NON_EXISTENT]); try { - dataSet.createTemporary(StorageType.DEFAULT, blocks[NON_EXISTENT]); + dataSet.createTemporary(StorageType.DEFAULT, null, blocks[NON_EXISTENT]); Assert.fail("Should not have created a replica that had already been " + "created " + blocks[NON_EXISTENT]); } catch (Exception e) { @@ -485,7 +485,8 @@ private void testWriteToTemporary(FsDatasetImpl dataSet, ExtendedBlock[] blocks) blocks[NON_EXISTENT].setGenerationStamp(newGenStamp); try { ReplicaInPipeline replicaInfo = - dataSet.createTemporary(StorageType.DEFAULT, blocks[NON_EXISTENT]).getReplica(); + dataSet.createTemporary(StorageType.DEFAULT, null, + blocks[NON_EXISTENT]).getReplica(); Assert.assertTrue(replicaInfo.getGenerationStamp() == newGenStamp); Assert.assertTrue( replicaInfo.getBlockId() == blocks[NON_EXISTENT].getBlockId()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeStorageDirectives.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeStorageDirectives.java new file mode 100644 index 0000000000..e0f7426804 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeStorageDirectives.java @@ -0,0 +1,330 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.namenode; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.ReconfigurationException; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.*; +import org.apache.hadoop.hdfs.protocol.*; +import org.apache.hadoop.hdfs.server.blockmanagement.*; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy; +import org.apache.hadoop.net.Node; +import org.junit.After; +import org.junit.Test; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.*; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * Test to ensure that the StorageType and StorageID sent from Namenode + * to DFSClient are respected. + */ +public class TestNamenodeStorageDirectives { + public static final Logger LOG = + LoggerFactory.getLogger(TestNamenodeStorageDirectives.class); + + private static final int BLOCK_SIZE = 512; + + private MiniDFSCluster cluster; + + @After + public void tearDown() { + shutdown(); + } + + private void startDFSCluster(int numNameNodes, int numDataNodes, + int storagePerDataNode, StorageType[][] storageTypes) + throws IOException { + startDFSCluster(numNameNodes, numDataNodes, storagePerDataNode, + storageTypes, RoundRobinVolumeChoosingPolicy.class, + BlockPlacementPolicyDefault.class); + } + + private void startDFSCluster(int numNameNodes, int numDataNodes, + int storagePerDataNode, StorageType[][] storageTypes, + Class volumeChoosingPolicy, + Class blockPlacementPolicy) throws + IOException { + shutdown(); + Configuration conf = new Configuration(); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); + + /* + * Lower the DN heartbeat, DF rate, and recheck interval to one second + * so state about failures and datanode death propagates faster. + */ + conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1); + conf.setInt(DFSConfigKeys.DFS_DF_INTERVAL_KEY, 1000); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, + 1000); + /* Allow 1 volume failure */ + conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1); + conf.setTimeDuration(DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY, + 0, TimeUnit.MILLISECONDS); + conf.setClass( + DFSConfigKeys.DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_POLICY_KEY, + volumeChoosingPolicy, VolumeChoosingPolicy.class); + conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, + blockPlacementPolicy, BlockPlacementPolicy.class); + + MiniDFSNNTopology nnTopology = + MiniDFSNNTopology.simpleFederatedTopology(numNameNodes); + + cluster = new MiniDFSCluster.Builder(conf) + .nnTopology(nnTopology) + .numDataNodes(numDataNodes) + .storagesPerDatanode(storagePerDataNode) + .storageTypes(storageTypes) + .build(); + cluster.waitActive(); + } + + private void shutdown() { + if (cluster != null) { + cluster.shutdown(); + cluster = null; + } + } + + private void createFile(Path path, int numBlocks, short replicateFactor) + throws IOException, InterruptedException, TimeoutException { + createFile(0, path, numBlocks, replicateFactor); + } + + private void createFile(int fsIdx, Path path, int numBlocks, + short replicateFactor) + throws IOException, TimeoutException, InterruptedException { + final int seed = 0; + final DistributedFileSystem fs = cluster.getFileSystem(fsIdx); + DFSTestUtil.createFile(fs, path, BLOCK_SIZE * numBlocks, + replicateFactor, seed); + DFSTestUtil.waitReplication(fs, path, replicateFactor); + } + + private boolean verifyFileReplicasOnStorageType(Path path, int numBlocks, + StorageType storageType) throws IOException { + MiniDFSCluster.NameNodeInfo info = cluster.getNameNodeInfos()[0]; + InetSocketAddress addr = info.nameNode.getServiceRpcAddress(); + assert addr.getPort() != 0; + DFSClient client = new DFSClient(addr, cluster.getConfiguration(0)); + + FileSystem fs = cluster.getFileSystem(); + + if (!fs.exists(path)) { + LOG.info("verifyFileReplicasOnStorageType: file {} does not exist", path); + return false; + } + long fileLength = client.getFileInfo(path.toString()).getLen(); + int foundBlocks = 0; + LocatedBlocks locatedBlocks = + client.getLocatedBlocks(path.toString(), 0, fileLength); + for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) { + for (StorageType st : locatedBlock.getStorageTypes()) { + if (st == storageType) { + foundBlocks++; + } + } + } + + LOG.info("Found {}/{} blocks on StorageType {}", + foundBlocks, numBlocks, storageType); + final boolean isValid = foundBlocks >= numBlocks; + return isValid; + } + + private void testStorageTypes(StorageType[][] storageTypes, + String storagePolicy, StorageType[] expectedStorageTypes, + StorageType[] unexpectedStorageTypes) throws ReconfigurationException, + InterruptedException, TimeoutException, IOException { + final int numDataNodes = storageTypes.length; + final int storagePerDataNode = storageTypes[0].length; + startDFSCluster(1, numDataNodes, storagePerDataNode, storageTypes); + cluster.getFileSystem(0).setStoragePolicy(new Path("/"), storagePolicy); + Path testFile = new Path("/test"); + final short replFactor = 2; + final int numBlocks = 10; + createFile(testFile, numBlocks, replFactor); + + for (StorageType storageType: expectedStorageTypes) { + assertTrue(verifyFileReplicasOnStorageType(testFile, numBlocks, + storageType)); + } + + for (StorageType storageType: unexpectedStorageTypes) { + assertFalse(verifyFileReplicasOnStorageType(testFile, numBlocks, + storageType)); + } + } + + /** + * Verify that writing to SSD and DISK will write to the correct Storage + * Types. + * @throws IOException + */ + @Test(timeout=60000) + public void testTargetStorageTypes() throws ReconfigurationException, + InterruptedException, TimeoutException, IOException { + // DISK and not anything else. + testStorageTypes(new StorageType[][]{ + {StorageType.SSD, StorageType.DISK}, + {StorageType.SSD, StorageType.DISK}}, + "ONE_SSD", + new StorageType[]{StorageType.SSD, StorageType.DISK}, + new StorageType[]{StorageType.RAM_DISK, StorageType.ARCHIVE}); + // only on SSD. + testStorageTypes(new StorageType[][]{ + {StorageType.SSD, StorageType.DISK}, + {StorageType.SSD, StorageType.DISK}}, + "ALL_SSD", + new StorageType[]{StorageType.SSD}, + new StorageType[]{StorageType.RAM_DISK, StorageType.DISK, + StorageType.ARCHIVE}); + // only on SSD. + testStorageTypes(new StorageType[][]{ + {StorageType.SSD, StorageType.DISK, StorageType.DISK}, + {StorageType.SSD, StorageType.DISK, StorageType.DISK}, + {StorageType.DISK, StorageType.DISK, StorageType.DISK}}, + "ALL_SSD", + new StorageType[]{StorageType.SSD}, + new StorageType[]{StorageType.RAM_DISK, StorageType.DISK, + StorageType.ARCHIVE}); + + // DISK and not anything else. + testStorageTypes(new StorageType[][] { + {StorageType.RAM_DISK, StorageType.SSD}, + {StorageType.SSD, StorageType.DISK}, + {StorageType.SSD, StorageType.DISK}}, + "HOT", + new StorageType[]{StorageType.DISK}, + new StorageType[] {StorageType.RAM_DISK, StorageType.SSD, + StorageType.ARCHIVE}); + + testStorageTypes(new StorageType[][] { + {StorageType.RAM_DISK, StorageType.SSD}, + {StorageType.SSD, StorageType.DISK}, + {StorageType.ARCHIVE, StorageType.ARCHIVE}, + {StorageType.ARCHIVE, StorageType.ARCHIVE}}, + "WARM", + new StorageType[]{StorageType.DISK, StorageType.ARCHIVE}, + new StorageType[]{StorageType.RAM_DISK, StorageType.SSD}); + + testStorageTypes(new StorageType[][] { + {StorageType.RAM_DISK, StorageType.SSD}, + {StorageType.SSD, StorageType.DISK}, + {StorageType.ARCHIVE, StorageType.ARCHIVE}, + {StorageType.ARCHIVE, StorageType.ARCHIVE}}, + "COLD", + new StorageType[]{StorageType.ARCHIVE}, + new StorageType[]{StorageType.RAM_DISK, StorageType.SSD, + StorageType.DISK}); + + // We wait for Lasy Persist to write to disk. + testStorageTypes(new StorageType[][] { + {StorageType.RAM_DISK, StorageType.SSD}, + {StorageType.SSD, StorageType.DISK}, + {StorageType.SSD, StorageType.DISK}}, + "LAZY_PERSIST", + new StorageType[]{StorageType.DISK}, + new StorageType[]{StorageType.RAM_DISK, StorageType.SSD, + StorageType.ARCHIVE}); + } + + /** + * A VolumeChoosingPolicy test stub used to verify that the storageId passed + * in is indeed in the list of volumes. + * @param + */ + private static class TestVolumeChoosingPolicy + extends RoundRobinVolumeChoosingPolicy { + static String expectedStorageId; + + @Override + public V chooseVolume(List volumes, long replicaSize, String storageId) + throws IOException { + assertEquals(expectedStorageId, storageId); + return super.chooseVolume(volumes, replicaSize, storageId); + } + } + + private static class TestBlockPlacementPolicy + extends BlockPlacementPolicyDefault { + static DatanodeStorageInfo[] dnStorageInfosToReturn; + + @Override + public DatanodeStorageInfo[] chooseTarget(String srcPath, int numOfReplicas, + Node writer, List chosenNodes, + boolean returnChosenNodes, Set excludedNodes, long blocksize, + final BlockStoragePolicy storagePolicy, EnumSet flags) { + return dnStorageInfosToReturn; + } + } + + private DatanodeStorageInfo getDatanodeStorageInfo(int dnIndex) + throws UnregisteredNodeException { + if (cluster == null) { + return null; + } + DatanodeID dnId = cluster.getDataNodes().get(dnIndex).getDatanodeId(); + DatanodeManager dnManager = cluster.getNamesystem() + .getBlockManager().getDatanodeManager(); + return dnManager.getDatanode(dnId).getStorageInfos()[0]; + } + + @Test(timeout=60000) + public void testStorageIDBlockPlacementSpecific() + throws ReconfigurationException, InterruptedException, TimeoutException, + IOException { + final StorageType[][] storageTypes = { + {StorageType.DISK, StorageType.DISK}, + {StorageType.DISK, StorageType.DISK}, + {StorageType.DISK, StorageType.DISK}, + {StorageType.DISK, StorageType.DISK}, + {StorageType.DISK, StorageType.DISK}, + }; + final int numDataNodes = storageTypes.length; + final int storagePerDataNode = storageTypes[0].length; + startDFSCluster(1, numDataNodes, storagePerDataNode, storageTypes, + TestVolumeChoosingPolicy.class, TestBlockPlacementPolicy.class); + Path testFile = new Path("/test"); + final short replFactor = 1; + final int numBlocks = 10; + DatanodeStorageInfo dnInfoToUse = getDatanodeStorageInfo(0); + TestBlockPlacementPolicy.dnStorageInfosToReturn = + new DatanodeStorageInfo[] {dnInfoToUse}; + TestVolumeChoosingPolicy.expectedStorageId = dnInfoToUse.getStorageID(); + //file creation invokes both BlockPlacementPolicy and VolumeChoosingPolicy, + //and will test that the storage ids match + createFile(testFile, numBlocks, replFactor); + } +}