HDFS-9807. Add an optional StorageID to writes. Contributed by Ewan Higgs

This commit is contained in:
Chris Douglas 2017-05-05 12:01:26 -07:00
parent 4e6bbd049d
commit a3954ccab1
48 changed files with 903 additions and 297 deletions

View File

@ -174,10 +174,12 @@ private class StreamerStreams implements java.io.Closeable {
void sendTransferBlock(final DatanodeInfo[] targets, void sendTransferBlock(final DatanodeInfo[] targets,
final StorageType[] targetStorageTypes, final StorageType[] targetStorageTypes,
final String[] targetStorageIDs,
final Token<BlockTokenIdentifier> blockToken) throws IOException { final Token<BlockTokenIdentifier> blockToken) throws IOException {
//send the TRANSFER_BLOCK request //send the TRANSFER_BLOCK request
new Sender(out).transferBlock(block.getCurrentBlock(), blockToken, new Sender(out).transferBlock(block.getCurrentBlock(), blockToken,
dfsClient.clientName, targets, targetStorageTypes); dfsClient.clientName, targets, targetStorageTypes,
targetStorageIDs);
out.flush(); out.flush();
//ack //ack
BlockOpResponseProto transferResponse = BlockOpResponseProto BlockOpResponseProto transferResponse = BlockOpResponseProto
@ -1367,9 +1369,11 @@ private void addDatanode2ExistingPipeline() throws IOException {
final DatanodeInfo src = original[tried % original.length]; final DatanodeInfo src = original[tried % original.length];
final DatanodeInfo[] targets = {nodes[d]}; final DatanodeInfo[] targets = {nodes[d]};
final StorageType[] targetStorageTypes = {storageTypes[d]}; final StorageType[] targetStorageTypes = {storageTypes[d]};
final String[] targetStorageIDs = {storageIDs[d]};
try { try {
transfer(src, targets, targetStorageTypes, lb.getBlockToken()); transfer(src, targets, targetStorageTypes, targetStorageIDs,
lb.getBlockToken());
} catch (IOException ioe) { } catch (IOException ioe) {
DFSClient.LOG.warn("Error transferring data from " + src + " to " + DFSClient.LOG.warn("Error transferring data from " + src + " to " +
nodes[d] + ": " + ioe.getMessage()); nodes[d] + ": " + ioe.getMessage());
@ -1400,6 +1404,7 @@ private long computeTransferReadTimeout() {
private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets, private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets,
final StorageType[] targetStorageTypes, final StorageType[] targetStorageTypes,
final String[] targetStorageIDs,
final Token<BlockTokenIdentifier> blockToken) final Token<BlockTokenIdentifier> blockToken)
throws IOException { throws IOException {
//transfer replica to the new datanode //transfer replica to the new datanode
@ -1412,7 +1417,8 @@ private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets,
streams = new StreamerStreams(src, writeTimeout, readTimeout, streams = new StreamerStreams(src, writeTimeout, readTimeout,
blockToken); blockToken);
streams.sendTransferBlock(targets, targetStorageTypes, blockToken); streams.sendTransferBlock(targets, targetStorageTypes,
targetStorageIDs, blockToken);
return; return;
} catch (InvalidEncryptionKeyException e) { } catch (InvalidEncryptionKeyException e) {
policy.recordFailure(e); policy.recordFailure(e);
@ -1440,11 +1446,12 @@ private void setupPipelineForAppendOrRecovery() throws IOException {
streamerClosed = true; streamerClosed = true;
return; return;
} }
setupPipelineInternal(nodes, storageTypes); setupPipelineInternal(nodes, storageTypes, storageIDs);
} }
protected void setupPipelineInternal(DatanodeInfo[] datanodes, protected void setupPipelineInternal(DatanodeInfo[] datanodes,
StorageType[] nodeStorageTypes) throws IOException { StorageType[] nodeStorageTypes, String[] nodeStorageIDs)
throws IOException {
boolean success = false; boolean success = false;
long newGS = 0L; long newGS = 0L;
while (!success && !streamerClosed && dfsClient.clientRunning) { while (!success && !streamerClosed && dfsClient.clientRunning) {
@ -1465,7 +1472,8 @@ protected void setupPipelineInternal(DatanodeInfo[] datanodes,
accessToken = lb.getBlockToken(); accessToken = lb.getBlockToken();
// set up the pipeline again with the remaining nodes // set up the pipeline again with the remaining nodes
success = createBlockOutputStream(nodes, storageTypes, newGS, isRecovery); success = createBlockOutputStream(nodes, storageTypes, storageIDs, newGS,
isRecovery);
failPacket4Testing(); failPacket4Testing();
@ -1601,7 +1609,8 @@ DatanodeInfo[] getExcludedNodes() {
protected LocatedBlock nextBlockOutputStream() throws IOException { protected LocatedBlock nextBlockOutputStream() throws IOException {
LocatedBlock lb; LocatedBlock lb;
DatanodeInfo[] nodes; DatanodeInfo[] nodes;
StorageType[] storageTypes; StorageType[] nextStorageTypes;
String[] nextStorageIDs;
int count = dfsClient.getConf().getNumBlockWriteRetry(); int count = dfsClient.getConf().getNumBlockWriteRetry();
boolean success; boolean success;
final ExtendedBlock oldBlock = block.getCurrentBlock(); final ExtendedBlock oldBlock = block.getCurrentBlock();
@ -1617,10 +1626,12 @@ protected LocatedBlock nextBlockOutputStream() throws IOException {
bytesSent = 0; bytesSent = 0;
accessToken = lb.getBlockToken(); accessToken = lb.getBlockToken();
nodes = lb.getLocations(); nodes = lb.getLocations();
storageTypes = lb.getStorageTypes(); nextStorageTypes = lb.getStorageTypes();
nextStorageIDs = lb.getStorageIDs();
// Connect to first DataNode in the list. // Connect to first DataNode in the list.
success = createBlockOutputStream(nodes, storageTypes, 0L, false); success = createBlockOutputStream(nodes, nextStorageTypes, nextStorageIDs,
0L, false);
if (!success) { if (!success) {
LOG.warn("Abandoning " + block); LOG.warn("Abandoning " + block);
@ -1643,7 +1654,8 @@ protected LocatedBlock nextBlockOutputStream() throws IOException {
// Returns true if success, otherwise return failure. // Returns true if success, otherwise return failure.
// //
boolean createBlockOutputStream(DatanodeInfo[] nodes, boolean createBlockOutputStream(DatanodeInfo[] nodes,
StorageType[] nodeStorageTypes, long newGS, boolean recoveryFlag) { StorageType[] nodeStorageTypes, String[] nodeStorageIDs,
long newGS, boolean recoveryFlag) {
if (nodes.length == 0) { if (nodes.length == 0) {
LOG.info("nodes are empty for write pipeline of " + block); LOG.info("nodes are empty for write pipeline of " + block);
return false; return false;
@ -1696,7 +1708,8 @@ boolean createBlockOutputStream(DatanodeInfo[] nodes,
dfsClient.clientName, nodes, nodeStorageTypes, null, bcs, dfsClient.clientName, nodes, nodeStorageTypes, null, bcs,
nodes.length, block.getNumBytes(), bytesSent, newGS, nodes.length, block.getNumBytes(), bytesSent, newGS,
checksum4WriteBlock, cachingStrategy.get(), isLazyPersistFile, checksum4WriteBlock, cachingStrategy.get(), isLazyPersistFile,
(targetPinnings != null && targetPinnings[0]), targetPinnings); (targetPinnings != null && targetPinnings[0]), targetPinnings,
nodeStorageIDs[0], nodeStorageIDs);
// receive ack for connect // receive ack for connect
BlockOpResponseProto resp = BlockOpResponseProto.parseFrom( BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(

View File

@ -100,9 +100,11 @@ protected LocatedBlock nextBlockOutputStream() throws IOException {
DatanodeInfo[] nodes = lb.getLocations(); DatanodeInfo[] nodes = lb.getLocations();
StorageType[] storageTypes = lb.getStorageTypes(); StorageType[] storageTypes = lb.getStorageTypes();
String[] storageIDs = lb.getStorageIDs();
// Connect to the DataNode. If fail the internal error state will be set. // Connect to the DataNode. If fail the internal error state will be set.
success = createBlockOutputStream(nodes, storageTypes, 0L, false); success = createBlockOutputStream(nodes, storageTypes, storageIDs, 0L,
false);
if (!success) { if (!success) {
block.setCurrentBlock(null); block.setCurrentBlock(null);
@ -121,7 +123,8 @@ LocatedBlock peekFollowingBlock() {
@Override @Override
protected void setupPipelineInternal(DatanodeInfo[] nodes, protected void setupPipelineInternal(DatanodeInfo[] nodes,
StorageType[] nodeStorageTypes) throws IOException { StorageType[] nodeStorageTypes, String[] nodeStorageIDs)
throws IOException {
boolean success = false; boolean success = false;
while (!success && !streamerClosed() && dfsClient.clientRunning) { while (!success && !streamerClosed() && dfsClient.clientRunning) {
if (!handleRestartingDatanode()) { if (!handleRestartingDatanode()) {
@ -141,7 +144,8 @@ protected void setupPipelineInternal(DatanodeInfo[] nodes,
// set up the pipeline again with the remaining nodes. when a striped // set up the pipeline again with the remaining nodes. when a striped
// data streamer comes here, it must be in external error state. // data streamer comes here, it must be in external error state.
assert getErrorState().hasExternalError(); assert getErrorState().hasExternalError();
success = createBlockOutputStream(nodes, nodeStorageTypes, newGS, true); success = createBlockOutputStream(nodes, nodeStorageTypes,
nodeStorageIDs, newGS, true);
failPacket4Testing(); failPacket4Testing();
getErrorState().checkRestartingNodeDeadline(nodes); getErrorState().checkRestartingNodeDeadline(nodes);

View File

@ -101,6 +101,11 @@ void readBlock(final ExtendedBlock blk,
* written to disk lazily * written to disk lazily
* @param pinning whether to pin the block, so Balancer won't move it. * @param pinning whether to pin the block, so Balancer won't move it.
* @param targetPinnings whether to pin the block on target datanode * @param targetPinnings whether to pin the block on target datanode
* @param storageID optional StorageIDs designating where to write the
* block. An empty String or null indicates that this
* has not been provided.
* @param targetStorageIDs target StorageIDs corresponding to the target
* datanodes.
*/ */
void writeBlock(final ExtendedBlock blk, void writeBlock(final ExtendedBlock blk,
final StorageType storageType, final StorageType storageType,
@ -118,7 +123,9 @@ void writeBlock(final ExtendedBlock blk,
final CachingStrategy cachingStrategy, final CachingStrategy cachingStrategy,
final boolean allowLazyPersist, final boolean allowLazyPersist,
final boolean pinning, final boolean pinning,
final boolean[] targetPinnings) throws IOException; final boolean[] targetPinnings,
final String storageID,
final String[] targetStorageIDs) throws IOException;
/** /**
* Transfer a block to another datanode. * Transfer a block to another datanode.
* The block stage must be * The block stage must be
@ -129,12 +136,15 @@ void writeBlock(final ExtendedBlock blk,
* @param blockToken security token for accessing the block. * @param blockToken security token for accessing the block.
* @param clientName client's name. * @param clientName client's name.
* @param targets target datanodes. * @param targets target datanodes.
* @param targetStorageIDs StorageID designating where to write the
* block.
*/ */
void transferBlock(final ExtendedBlock blk, void transferBlock(final ExtendedBlock blk,
final Token<BlockTokenIdentifier> blockToken, final Token<BlockTokenIdentifier> blockToken,
final String clientName, final String clientName,
final DatanodeInfo[] targets, final DatanodeInfo[] targets,
final StorageType[] targetStorageTypes) throws IOException; final StorageType[] targetStorageTypes,
final String[] targetStorageIDs) throws IOException;
/** /**
* Request short circuit access file descriptors from a DataNode. * Request short circuit access file descriptors from a DataNode.
@ -179,12 +189,15 @@ void requestShortCircuitFds(final ExtendedBlock blk,
* @param blockToken security token for accessing the block. * @param blockToken security token for accessing the block.
* @param delHint the hint for deleting the block in the original datanode. * @param delHint the hint for deleting the block in the original datanode.
* @param source the source datanode for receiving the block. * @param source the source datanode for receiving the block.
* @param storageId an optional storage ID to designate where the block is
* replaced to.
*/ */
void replaceBlock(final ExtendedBlock blk, void replaceBlock(final ExtendedBlock blk,
final StorageType storageType, final StorageType storageType,
final Token<BlockTokenIdentifier> blockToken, final Token<BlockTokenIdentifier> blockToken,
final String delHint, final String delHint,
final DatanodeInfo source) throws IOException; final DatanodeInfo source,
final String storageId) throws IOException;
/** /**
* Copy a block. * Copy a block.

View File

@ -22,6 +22,7 @@
import java.io.DataOutput; import java.io.DataOutput;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
@ -132,7 +133,9 @@ public void writeBlock(final ExtendedBlock blk,
final CachingStrategy cachingStrategy, final CachingStrategy cachingStrategy,
final boolean allowLazyPersist, final boolean allowLazyPersist,
final boolean pinning, final boolean pinning,
final boolean[] targetPinnings) throws IOException { final boolean[] targetPinnings,
final String storageId,
final String[] targetStorageIds) throws IOException {
ClientOperationHeaderProto header = DataTransferProtoUtil.buildClientHeader( ClientOperationHeaderProto header = DataTransferProtoUtil.buildClientHeader(
blk, clientName, blockToken); blk, clientName, blockToken);
@ -154,11 +157,14 @@ public void writeBlock(final ExtendedBlock blk,
.setCachingStrategy(getCachingStrategy(cachingStrategy)) .setCachingStrategy(getCachingStrategy(cachingStrategy))
.setAllowLazyPersist(allowLazyPersist) .setAllowLazyPersist(allowLazyPersist)
.setPinning(pinning) .setPinning(pinning)
.addAllTargetPinnings(PBHelperClient.convert(targetPinnings, 1)); .addAllTargetPinnings(PBHelperClient.convert(targetPinnings, 1))
.addAllTargetStorageIds(PBHelperClient.convert(targetStorageIds, 1));
if (source != null) { if (source != null) {
proto.setSource(PBHelperClient.convertDatanodeInfo(source)); proto.setSource(PBHelperClient.convertDatanodeInfo(source));
} }
if (storageId != null) {
proto.setStorageId(storageId);
}
send(out, Op.WRITE_BLOCK, proto.build()); send(out, Op.WRITE_BLOCK, proto.build());
} }
@ -168,7 +174,8 @@ public void transferBlock(final ExtendedBlock blk,
final Token<BlockTokenIdentifier> blockToken, final Token<BlockTokenIdentifier> blockToken,
final String clientName, final String clientName,
final DatanodeInfo[] targets, final DatanodeInfo[] targets,
final StorageType[] targetStorageTypes) throws IOException { final StorageType[] targetStorageTypes,
final String[] targetStorageIds) throws IOException {
OpTransferBlockProto proto = OpTransferBlockProto.newBuilder() OpTransferBlockProto proto = OpTransferBlockProto.newBuilder()
.setHeader(DataTransferProtoUtil.buildClientHeader( .setHeader(DataTransferProtoUtil.buildClientHeader(
@ -176,6 +183,7 @@ public void transferBlock(final ExtendedBlock blk,
.addAllTargets(PBHelperClient.convert(targets)) .addAllTargets(PBHelperClient.convert(targets))
.addAllTargetStorageTypes( .addAllTargetStorageTypes(
PBHelperClient.convertStorageTypes(targetStorageTypes)) PBHelperClient.convertStorageTypes(targetStorageTypes))
.addAllTargetStorageIds(Arrays.asList(targetStorageIds))
.build(); .build();
send(out, Op.TRANSFER_BLOCK, proto); send(out, Op.TRANSFER_BLOCK, proto);
@ -233,15 +241,18 @@ public void replaceBlock(final ExtendedBlock blk,
final StorageType storageType, final StorageType storageType,
final Token<BlockTokenIdentifier> blockToken, final Token<BlockTokenIdentifier> blockToken,
final String delHint, final String delHint,
final DatanodeInfo source) throws IOException { final DatanodeInfo source,
OpReplaceBlockProto proto = OpReplaceBlockProto.newBuilder() final String storageId) throws IOException {
OpReplaceBlockProto.Builder proto = OpReplaceBlockProto.newBuilder()
.setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken)) .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken))
.setStorageType(PBHelperClient.convertStorageType(storageType)) .setStorageType(PBHelperClient.convertStorageType(storageType))
.setDelHint(delHint) .setDelHint(delHint)
.setSource(PBHelperClient.convertDatanodeInfo(source)) .setSource(PBHelperClient.convertDatanodeInfo(source));
.build(); if (storageId != null) {
proto.setStorageId(storageId);
}
send(out, Op.REPLACE_BLOCK, proto); send(out, Op.REPLACE_BLOCK, proto.build());
} }
@Override @Override

View File

@ -345,6 +345,16 @@ public static List<Boolean> convert(boolean[] targetPinnings, int idx) {
return pinnings; return pinnings;
} }
public static List<String> convert(String[] targetIds, int idx) {
List<String> ids = new ArrayList<>();
if (targetIds != null) {
for (; idx < targetIds.length; ++idx) {
ids.add(targetIds[idx]);
}
}
return ids;
}
public static ExtendedBlock convert(ExtendedBlockProto eb) { public static ExtendedBlock convert(ExtendedBlockProto eb) {
if (eb == null) return null; if (eb == null) return null;
return new ExtendedBlock( eb.getPoolId(), eb.getBlockId(), eb.getNumBytes(), return new ExtendedBlock( eb.getPoolId(), eb.getBlockId(), eb.getNumBytes(),
@ -640,6 +650,9 @@ public static BlockTokenSecretProto convert(
for (StorageType storageType : blockTokenSecret.getStorageTypes()) { for (StorageType storageType : blockTokenSecret.getStorageTypes()) {
builder.addStorageTypes(convertStorageType(storageType)); builder.addStorageTypes(convertStorageType(storageType));
} }
for (String storageId : blockTokenSecret.getStorageIds()) {
builder.addStorageIds(storageId);
}
return builder.build(); return builder.build();
} }

View File

@ -53,16 +53,19 @@ public enum AccessMode {
private long blockId; private long blockId;
private final EnumSet<AccessMode> modes; private final EnumSet<AccessMode> modes;
private StorageType[] storageTypes; private StorageType[] storageTypes;
private String[] storageIds;
private boolean useProto; private boolean useProto;
private byte [] cache; private byte [] cache;
public BlockTokenIdentifier() { public BlockTokenIdentifier() {
this(null, null, 0, EnumSet.noneOf(AccessMode.class), null, false); this(null, null, 0, EnumSet.noneOf(AccessMode.class), null, null,
false);
} }
public BlockTokenIdentifier(String userId, String bpid, long blockId, public BlockTokenIdentifier(String userId, String bpid, long blockId,
EnumSet<AccessMode> modes, StorageType[] storageTypes, boolean useProto) { EnumSet<AccessMode> modes, StorageType[] storageTypes,
String[] storageIds, boolean useProto) {
this.cache = null; this.cache = null;
this.userId = userId; this.userId = userId;
this.blockPoolId = bpid; this.blockPoolId = bpid;
@ -70,6 +73,8 @@ public BlockTokenIdentifier(String userId, String bpid, long blockId,
this.modes = modes == null ? EnumSet.noneOf(AccessMode.class) : modes; this.modes = modes == null ? EnumSet.noneOf(AccessMode.class) : modes;
this.storageTypes = Optional.ofNullable(storageTypes) this.storageTypes = Optional.ofNullable(storageTypes)
.orElse(StorageType.EMPTY_ARRAY); .orElse(StorageType.EMPTY_ARRAY);
this.storageIds = Optional.ofNullable(storageIds)
.orElse(new String[0]);
this.useProto = useProto; this.useProto = useProto;
} }
@ -125,6 +130,10 @@ public StorageType[] getStorageTypes(){
return storageTypes; return storageTypes;
} }
public String[] getStorageIds(){
return storageIds;
}
@Override @Override
public String toString() { public String toString() {
return "block_token_identifier (expiryDate=" + this.getExpiryDate() return "block_token_identifier (expiryDate=" + this.getExpiryDate()
@ -132,7 +141,8 @@ public String toString() {
+ ", blockPoolId=" + this.getBlockPoolId() + ", blockPoolId=" + this.getBlockPoolId()
+ ", blockId=" + this.getBlockId() + ", access modes=" + ", blockId=" + this.getBlockId() + ", access modes="
+ this.getAccessModes() + ", storageTypes= " + this.getAccessModes() + ", storageTypes= "
+ Arrays.toString(this.getStorageTypes()) + ")"; + Arrays.toString(this.getStorageTypes()) + ", storageIds= "
+ Arrays.toString(this.getStorageIds()) + ")";
} }
static boolean isEqual(Object a, Object b) { static boolean isEqual(Object a, Object b) {
@ -151,7 +161,8 @@ && isEqual(this.userId, that.userId)
&& isEqual(this.blockPoolId, that.blockPoolId) && isEqual(this.blockPoolId, that.blockPoolId)
&& this.blockId == that.blockId && this.blockId == that.blockId
&& isEqual(this.modes, that.modes) && isEqual(this.modes, that.modes)
&& Arrays.equals(this.storageTypes, that.storageTypes); && Arrays.equals(this.storageTypes, that.storageTypes)
&& Arrays.equals(this.storageIds, that.storageIds);
} }
return false; return false;
} }
@ -161,7 +172,8 @@ public int hashCode() {
return (int) expiryDate ^ keyId ^ (int) blockId ^ modes.hashCode() return (int) expiryDate ^ keyId ^ (int) blockId ^ modes.hashCode()
^ (userId == null ? 0 : userId.hashCode()) ^ (userId == null ? 0 : userId.hashCode())
^ (blockPoolId == null ? 0 : blockPoolId.hashCode()) ^ (blockPoolId == null ? 0 : blockPoolId.hashCode())
^ (storageTypes == null ? 0 : Arrays.hashCode(storageTypes)); ^ (storageTypes == null ? 0 : Arrays.hashCode(storageTypes))
^ (storageIds == null ? 0 : Arrays.hashCode(storageIds));
} }
/** /**
@ -220,6 +232,14 @@ void readFieldsLegacy(DataInput in) throws IOException {
readStorageTypes[i] = WritableUtils.readEnum(in, StorageType.class); readStorageTypes[i] = WritableUtils.readEnum(in, StorageType.class);
} }
storageTypes = readStorageTypes; storageTypes = readStorageTypes;
length = WritableUtils.readVInt(in);
String[] readStorageIds = new String[length];
for (int i = 0; i < length; i++) {
readStorageIds[i] = WritableUtils.readString(in);
}
storageIds = readStorageIds;
useProto = false; useProto = false;
} }
@ -248,6 +268,8 @@ void readFieldsProtobuf(DataInput in) throws IOException {
storageTypes = blockTokenSecretProto.getStorageTypesList().stream() storageTypes = blockTokenSecretProto.getStorageTypesList().stream()
.map(PBHelperClient::convertStorageType) .map(PBHelperClient::convertStorageType)
.toArray(StorageType[]::new); .toArray(StorageType[]::new);
storageIds = blockTokenSecretProto.getStorageIdsList().stream()
.toArray(String[]::new);
useProto = true; useProto = true;
} }
@ -275,6 +297,10 @@ void writeLegacy(DataOutput out) throws IOException {
for (StorageType type: storageTypes){ for (StorageType type: storageTypes){
WritableUtils.writeEnum(out, type); WritableUtils.writeEnum(out, type);
} }
WritableUtils.writeVInt(out, storageIds.length);
for (String id: storageIds) {
WritableUtils.writeString(out, id);
}
} }
@VisibleForTesting @VisibleForTesting

View File

@ -125,12 +125,15 @@ message OpWriteBlockProto {
//whether to pin the block, so Balancer won't move it. //whether to pin the block, so Balancer won't move it.
optional bool pinning = 14 [default = false]; optional bool pinning = 14 [default = false];
repeated bool targetPinnings = 15; repeated bool targetPinnings = 15;
optional string storageId = 16;
repeated string targetStorageIds = 17;
} }
message OpTransferBlockProto { message OpTransferBlockProto {
required ClientOperationHeaderProto header = 1; required ClientOperationHeaderProto header = 1;
repeated DatanodeInfoProto targets = 2; repeated DatanodeInfoProto targets = 2;
repeated StorageTypeProto targetStorageTypes = 3; repeated StorageTypeProto targetStorageTypes = 3;
repeated string targetStorageIds = 4;
} }
message OpReplaceBlockProto { message OpReplaceBlockProto {
@ -138,6 +141,7 @@ message OpReplaceBlockProto {
required string delHint = 2; required string delHint = 2;
required DatanodeInfoProto source = 3; required DatanodeInfoProto source = 3;
optional StorageTypeProto storageType = 4 [default = DISK]; optional StorageTypeProto storageType = 4 [default = DISK];
optional string storageId = 5;
} }
message OpCopyBlockProto { message OpCopyBlockProto {

View File

@ -570,4 +570,5 @@ message BlockTokenSecretProto {
optional uint64 blockId = 5; optional uint64 blockId = 5;
repeated AccessModeProto modes = 6; repeated AccessModeProto modes = 6;
repeated StorageTypeProto storageTypes = 7; repeated StorageTypeProto storageTypes = 7;
repeated string storageIds = 8;
} }

View File

@ -25,7 +25,9 @@
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.StripedBlockInfo; import org.apache.hadoop.hdfs.protocol.StripedBlockInfo;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
@ -185,7 +187,9 @@ private void opWriteBlock(DataInputStream in) throws IOException {
CachingStrategy.newDefaultStrategy()), CachingStrategy.newDefaultStrategy()),
(proto.hasAllowLazyPersist() ? proto.getAllowLazyPersist() : false), (proto.hasAllowLazyPersist() ? proto.getAllowLazyPersist() : false),
(proto.hasPinning() ? proto.getPinning(): false), (proto.hasPinning() ? proto.getPinning(): false),
(PBHelperClient.convertBooleanList(proto.getTargetPinningsList()))); (PBHelperClient.convertBooleanList(proto.getTargetPinningsList())),
proto.getStorageId(),
proto.getTargetStorageIdsList().toArray(new String[0]));
} finally { } finally {
if (traceScope != null) traceScope.close(); if (traceScope != null) traceScope.close();
} }
@ -199,11 +203,18 @@ private void opTransferBlock(DataInputStream in) throws IOException {
TraceScope traceScope = continueTraceSpan(proto.getHeader(), TraceScope traceScope = continueTraceSpan(proto.getHeader(),
proto.getClass().getSimpleName()); proto.getClass().getSimpleName());
try { try {
transferBlock(PBHelperClient.convert(proto.getHeader().getBaseHeader().getBlock()), final ExtendedBlock block =
PBHelperClient.convert(proto.getHeader().getBaseHeader().getBlock());
final StorageType[] targetStorageTypes =
PBHelperClient.convertStorageTypes(proto.getTargetStorageTypesList(),
targets.length);
transferBlock(block,
PBHelperClient.convert(proto.getHeader().getBaseHeader().getToken()), PBHelperClient.convert(proto.getHeader().getBaseHeader().getToken()),
proto.getHeader().getClientName(), proto.getHeader().getClientName(),
targets, targets,
PBHelperClient.convertStorageTypes(proto.getTargetStorageTypesList(), targets.length)); targetStorageTypes,
proto.getTargetStorageIdsList().toArray(new String[0])
);
} finally { } finally {
if (traceScope != null) traceScope.close(); if (traceScope != null) traceScope.close();
} }
@ -264,7 +275,8 @@ private void opReplaceBlock(DataInputStream in) throws IOException {
PBHelperClient.convertStorageType(proto.getStorageType()), PBHelperClient.convertStorageType(proto.getStorageType()),
PBHelperClient.convert(proto.getHeader().getToken()), PBHelperClient.convert(proto.getHeader().getToken()),
proto.getDelHint(), proto.getDelHint(),
PBHelperClient.convert(proto.getSource())); PBHelperClient.convert(proto.getSource()),
proto.getStorageId());
} finally { } finally {
if (traceScope != null) traceScope.close(); if (traceScope != null) traceScope.close();
} }

View File

@ -84,25 +84,27 @@ public byte[] retrievePassword(BlockTokenIdentifier identifier)
/** /**
* See {@link BlockTokenSecretManager#checkAccess(BlockTokenIdentifier, * See {@link BlockTokenSecretManager#checkAccess(BlockTokenIdentifier,
* String, ExtendedBlock, BlockTokenIdentifier.AccessMode, * String, ExtendedBlock, BlockTokenIdentifier.AccessMode,
* StorageType[])} * StorageType[], String[])}
*/ */
public void checkAccess(BlockTokenIdentifier id, String userId, public void checkAccess(BlockTokenIdentifier id, String userId,
ExtendedBlock block, AccessMode mode, ExtendedBlock block, AccessMode mode,
StorageType[] storageTypes) throws InvalidToken { StorageType[] storageTypes, String[] storageIds)
throws InvalidToken {
get(block.getBlockPoolId()).checkAccess(id, userId, block, mode, get(block.getBlockPoolId()).checkAccess(id, userId, block, mode,
storageTypes); storageTypes, storageIds);
} }
/** /**
* See {@link BlockTokenSecretManager#checkAccess(Token, String, * See {@link BlockTokenSecretManager#checkAccess(Token, String,
* ExtendedBlock, BlockTokenIdentifier.AccessMode, * ExtendedBlock, BlockTokenIdentifier.AccessMode,
* StorageType[])}. * StorageType[], String[])}
*/ */
public void checkAccess(Token<BlockTokenIdentifier> token, public void checkAccess(Token<BlockTokenIdentifier> token,
String userId, ExtendedBlock block, AccessMode mode, String userId, ExtendedBlock block, AccessMode mode,
StorageType[] storageTypes) throws InvalidToken { StorageType[] storageTypes, String[] storageIds)
throws InvalidToken {
get(block.getBlockPoolId()).checkAccess(token, userId, block, mode, get(block.getBlockPoolId()).checkAccess(token, userId, block, mode,
storageTypes); storageTypes, storageIds);
} }
/** /**
@ -115,11 +117,13 @@ public void addKeys(String bpid, ExportedBlockKeys exportedKeys)
/** /**
* See {@link BlockTokenSecretManager#generateToken(ExtendedBlock, EnumSet, * See {@link BlockTokenSecretManager#generateToken(ExtendedBlock, EnumSet,
* StorageType[])} * StorageType[], String[])}.
*/ */
public Token<BlockTokenIdentifier> generateToken(ExtendedBlock b, public Token<BlockTokenIdentifier> generateToken(ExtendedBlock b,
EnumSet<AccessMode> of, StorageType[] storageTypes) throws IOException { EnumSet<AccessMode> of, StorageType[] storageTypes, String[] storageIds)
return get(b.getBlockPoolId()).generateToken(b, of, storageTypes); throws IOException {
return get(b.getBlockPoolId()).generateToken(b, of, storageTypes,
storageIds);
} }
@VisibleForTesting @VisibleForTesting

View File

@ -247,18 +247,19 @@ synchronized boolean updateKeys() throws IOException {
/** Generate an block token for current user */ /** Generate an block token for current user */
public Token<BlockTokenIdentifier> generateToken(ExtendedBlock block, public Token<BlockTokenIdentifier> generateToken(ExtendedBlock block,
EnumSet<BlockTokenIdentifier.AccessMode> modes, EnumSet<BlockTokenIdentifier.AccessMode> modes,
StorageType[] storageTypes) throws IOException { StorageType[] storageTypes, String[] storageIds) throws IOException {
UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
String userID = (ugi == null ? null : ugi.getShortUserName()); String userID = (ugi == null ? null : ugi.getShortUserName());
return generateToken(userID, block, modes, storageTypes); return generateToken(userID, block, modes, storageTypes, storageIds);
} }
/** Generate a block token for a specified user */ /** Generate a block token for a specified user */
public Token<BlockTokenIdentifier> generateToken(String userId, public Token<BlockTokenIdentifier> generateToken(String userId,
ExtendedBlock block, EnumSet<BlockTokenIdentifier.AccessMode> modes, ExtendedBlock block, EnumSet<BlockTokenIdentifier.AccessMode> modes,
StorageType[] storageTypes) throws IOException { StorageType[] storageTypes, String[] storageIds) throws IOException {
BlockTokenIdentifier id = new BlockTokenIdentifier(userId, block BlockTokenIdentifier id = new BlockTokenIdentifier(userId, block
.getBlockPoolId(), block.getBlockId(), modes, storageTypes, useProto); .getBlockPoolId(), block.getBlockId(), modes, storageTypes,
storageIds, useProto);
return new Token<BlockTokenIdentifier>(id, this); return new Token<BlockTokenIdentifier>(id, this);
} }
@ -272,10 +273,13 @@ public Token<BlockTokenIdentifier> generateToken(String userId,
*/ */
public void checkAccess(BlockTokenIdentifier id, String userId, public void checkAccess(BlockTokenIdentifier id, String userId,
ExtendedBlock block, BlockTokenIdentifier.AccessMode mode, ExtendedBlock block, BlockTokenIdentifier.AccessMode mode,
StorageType[] storageTypes) throws InvalidToken { StorageType[] storageTypes, String[] storageIds) throws InvalidToken {
checkAccess(id, userId, block, mode); checkAccess(id, userId, block, mode);
if (storageTypes != null && storageTypes.length > 0) { if (storageTypes != null && storageTypes.length > 0) {
checkAccess(id.getStorageTypes(), storageTypes); checkAccess(id.getStorageTypes(), storageTypes, "StorageTypes");
}
if (storageIds != null && storageIds.length > 0) {
checkAccess(id.getStorageIds(), storageIds, "StorageIDs");
} }
} }
@ -309,30 +313,31 @@ public void checkAccess(BlockTokenIdentifier id, String userId,
} }
/** /**
* Check if the requested StorageTypes match the StorageTypes in the * Check if the requested values can be satisfied with the values in the
* BlockTokenIdentifier. * BlockToken. This is intended for use with StorageTypes and StorageIDs.
* Empty candidateStorageTypes specifiers mean 'all is permitted'. They *
* would otherwise be nonsensical. * The current node can only verify that one of the storage [Type|ID] is
* available. The rest will be on different nodes.
*/ */
public static void checkAccess(StorageType[] candidateStorageTypes, public static <T> void checkAccess(T[] candidates, T[] requested, String msg)
StorageType[] storageTypesRequested) throws InvalidToken { throws InvalidToken {
if (storageTypesRequested.length == 0) { if (requested.length == 0) {
throw new InvalidToken("The request has no StorageTypes. " throw new InvalidToken("The request has no " + msg + ". "
+ "This is probably a configuration error."); + "This is probably a configuration error.");
} }
if (candidateStorageTypes.length == 0) { if (candidates.length == 0) {
return; return;
} }
List<StorageType> unseenCandidates = new ArrayList<StorageType>(); List unseenCandidates = new ArrayList<T>();
unseenCandidates.addAll(Arrays.asList(candidateStorageTypes)); unseenCandidates.addAll(Arrays.asList(candidates));
for (StorageType storageType : storageTypesRequested) { for (T req : requested) {
final int index = unseenCandidates.indexOf(storageType); final int index = unseenCandidates.indexOf(req);
if (index == -1) { if (index == -1) {
throw new InvalidToken("Block token with StorageTypes " throw new InvalidToken("Block token with " + msg + " "
+ Arrays.toString(candidateStorageTypes) + Arrays.toString(candidates)
+ " not valid for access with StorageTypes " + " not valid for access with " + msg + " "
+ Arrays.toString(storageTypesRequested)); + Arrays.toString(requested));
} }
Collections.swap(unseenCandidates, index, unseenCandidates.size()-1); Collections.swap(unseenCandidates, index, unseenCandidates.size()-1);
unseenCandidates.remove(unseenCandidates.size()-1); unseenCandidates.remove(unseenCandidates.size()-1);
@ -342,7 +347,7 @@ public static void checkAccess(StorageType[] candidateStorageTypes,
/** Check if access should be allowed. userID is not checked if null */ /** Check if access should be allowed. userID is not checked if null */
public void checkAccess(Token<BlockTokenIdentifier> token, String userId, public void checkAccess(Token<BlockTokenIdentifier> token, String userId,
ExtendedBlock block, BlockTokenIdentifier.AccessMode mode, ExtendedBlock block, BlockTokenIdentifier.AccessMode mode,
StorageType[] storageTypes) throws InvalidToken { StorageType[] storageTypes, String[] storageIds) throws InvalidToken {
BlockTokenIdentifier id = new BlockTokenIdentifier(); BlockTokenIdentifier id = new BlockTokenIdentifier();
try { try {
id.readFields(new DataInputStream(new ByteArrayInputStream(token id.readFields(new DataInputStream(new ByteArrayInputStream(token
@ -352,7 +357,7 @@ public void checkAccess(Token<BlockTokenIdentifier> token, String userId,
"Unable to de-serialize block token identifier for user=" + userId "Unable to de-serialize block token identifier for user=" + userId
+ ", block=" + block + ", access mode=" + mode); + ", block=" + block + ", access mode=" + mode);
} }
checkAccess(id, userId, block, mode, storageTypes); checkAccess(id, userId, block, mode, storageTypes, storageIds);
if (!Arrays.equals(retrievePassword(id), token.getPassword())) { if (!Arrays.equals(retrievePassword(id), token.getPassword())) {
throw new InvalidToken("Block token with " + id.toString() throw new InvalidToken("Block token with " + id.toString()
+ " doesn't have the correct token password"); + " doesn't have the correct token password");

View File

@ -357,7 +357,7 @@ private void dispatch() {
reportedBlock.getBlock()); reportedBlock.getBlock());
final KeyManager km = nnc.getKeyManager(); final KeyManager km = nnc.getKeyManager();
Token<BlockTokenIdentifier> accessToken = km.getAccessToken(eb, Token<BlockTokenIdentifier> accessToken = km.getAccessToken(eb,
new StorageType[]{target.storageType}); new StorageType[]{target.storageType}, new String[0]);
IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut, IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut,
unbufIn, km, accessToken, target.getDatanodeInfo()); unbufIn, km, accessToken, target.getDatanodeInfo());
unbufOut = saslStreams.out; unbufOut = saslStreams.out;
@ -411,7 +411,8 @@ private void dispatch() {
private void sendRequest(DataOutputStream out, ExtendedBlock eb, private void sendRequest(DataOutputStream out, ExtendedBlock eb,
Token<BlockTokenIdentifier> accessToken) throws IOException { Token<BlockTokenIdentifier> accessToken) throws IOException {
new Sender(out).replaceBlock(eb, target.storageType, accessToken, new Sender(out).replaceBlock(eb, target.storageType, accessToken,
source.getDatanodeInfo().getDatanodeUuid(), proxySource.datanode); source.getDatanodeInfo().getDatanodeUuid(), proxySource.datanode,
null);
} }
/** Check whether to continue waiting for response */ /** Check whether to continue waiting for response */

View File

@ -95,7 +95,7 @@ public void startBlockKeyUpdater() {
/** Get an access token for a block. */ /** Get an access token for a block. */
public Token<BlockTokenIdentifier> getAccessToken(ExtendedBlock eb, public Token<BlockTokenIdentifier> getAccessToken(ExtendedBlock eb,
StorageType[] storageTypes) throws IOException { StorageType[] storageTypes, String[] storageIds) throws IOException {
if (!isBlockTokenEnabled) { if (!isBlockTokenEnabled) {
return BlockTokenSecretManager.DUMMY_TOKEN; return BlockTokenSecretManager.DUMMY_TOKEN;
} else { } else {
@ -105,7 +105,7 @@ public Token<BlockTokenIdentifier> getAccessToken(ExtendedBlock eb,
} }
return blockTokenSecretManager.generateToken(null, eb, return blockTokenSecretManager.generateToken(null, eb,
EnumSet.of(BlockTokenIdentifier.AccessMode.REPLACE, EnumSet.of(BlockTokenIdentifier.AccessMode.REPLACE,
BlockTokenIdentifier.AccessMode.COPY), storageTypes); BlockTokenIdentifier.AccessMode.COPY), storageTypes, storageIds);
} }
} }

View File

@ -1283,13 +1283,15 @@ public void setBlockToken(final LocatedBlock b,
internalBlock.setBlockId(b.getBlock().getBlockId() + indices[i]); internalBlock.setBlockId(b.getBlock().getBlockId() + indices[i]);
blockTokens[i] = blockTokenSecretManager.generateToken( blockTokens[i] = blockTokenSecretManager.generateToken(
NameNode.getRemoteUser().getShortUserName(), NameNode.getRemoteUser().getShortUserName(),
internalBlock, EnumSet.of(mode), b.getStorageTypes()); internalBlock, EnumSet.of(mode), b.getStorageTypes(),
b.getStorageIDs());
} }
sb.setBlockTokens(blockTokens); sb.setBlockTokens(blockTokens);
} else { } else {
b.setBlockToken(blockTokenSecretManager.generateToken( b.setBlockToken(blockTokenSecretManager.generateToken(
NameNode.getRemoteUser().getShortUserName(), NameNode.getRemoteUser().getShortUserName(),
b.getBlock(), EnumSet.of(mode), b.getStorageTypes())); b.getBlock(), EnumSet.of(mode), b.getStorageTypes(),
b.getStorageIDs()));
} }
} }
} }

View File

@ -679,7 +679,8 @@ private boolean processCommandFromActive(DatanodeCommand cmd,
case DatanodeProtocol.DNA_TRANSFER: case DatanodeProtocol.DNA_TRANSFER:
// Send a copy of a block to another datanode // Send a copy of a block to another datanode
dn.transferBlocks(bcmd.getBlockPoolId(), bcmd.getBlocks(), dn.transferBlocks(bcmd.getBlockPoolId(), bcmd.getBlocks(),
bcmd.getTargets(), bcmd.getTargetStorageTypes()); bcmd.getTargets(), bcmd.getTargetStorageTypes(),
bcmd.getTargetStorageIDs());
break; break;
case DatanodeProtocol.DNA_INVALIDATE: case DatanodeProtocol.DNA_INVALIDATE:
// //

View File

@ -151,7 +151,8 @@ class BlockReceiver implements Closeable {
final DataNode datanode, DataChecksum requestedChecksum, final DataNode datanode, DataChecksum requestedChecksum,
CachingStrategy cachingStrategy, CachingStrategy cachingStrategy,
final boolean allowLazyPersist, final boolean allowLazyPersist,
final boolean pinning) throws IOException { final boolean pinning,
final String storageId) throws IOException {
try{ try{
this.block = block; this.block = block;
this.in = in; this.in = in;
@ -197,6 +198,7 @@ class BlockReceiver implements Closeable {
+ "\n allowLazyPersist=" + allowLazyPersist + ", pinning=" + pinning + "\n allowLazyPersist=" + allowLazyPersist + ", pinning=" + pinning
+ ", isClient=" + isClient + ", isDatanode=" + isDatanode + ", isClient=" + isClient + ", isDatanode=" + isDatanode
+ ", responseInterval=" + responseInterval + ", responseInterval=" + responseInterval
+ ", storageID=" + (storageId != null ? storageId : "null")
); );
} }
@ -204,11 +206,13 @@ class BlockReceiver implements Closeable {
// Open local disk out // Open local disk out
// //
if (isDatanode) { //replication or move if (isDatanode) { //replication or move
replicaHandler = datanode.data.createTemporary(storageType, block); replicaHandler =
datanode.data.createTemporary(storageType, storageId, block);
} else { } else {
switch (stage) { switch (stage) {
case PIPELINE_SETUP_CREATE: case PIPELINE_SETUP_CREATE:
replicaHandler = datanode.data.createRbw(storageType, block, allowLazyPersist); replicaHandler = datanode.data.createRbw(storageType, storageId,
block, allowLazyPersist);
datanode.notifyNamenodeReceivingBlock( datanode.notifyNamenodeReceivingBlock(
block, replicaHandler.getReplica().getStorageUuid()); block, replicaHandler.getReplica().getStorageUuid());
break; break;
@ -233,7 +237,7 @@ class BlockReceiver implements Closeable {
case TRANSFER_FINALIZED: case TRANSFER_FINALIZED:
// this is a transfer destination // this is a transfer destination
replicaHandler = replicaHandler =
datanode.data.createTemporary(storageType, block); datanode.data.createTemporary(storageType, storageId, block);
break; break;
default: throw new IOException("Unsupported stage " + stage + default: throw new IOException("Unsupported stage " + stage +
" while receiving block " + block + " from " + inAddr); " while receiving block " + block + " from " + inAddr);

View File

@ -1943,7 +1943,7 @@ private void checkBlockToken(ExtendedBlock block,
LOG.debug("Got: " + id.toString()); LOG.debug("Got: " + id.toString());
} }
blockPoolTokenSecretManager.checkAccess(id, null, block, accessMode, blockPoolTokenSecretManager.checkAccess(id, null, block, accessMode,
null); null, null);
} }
} }
@ -2224,7 +2224,8 @@ private void reportBadBlock(final BPOfferService bpos,
@VisibleForTesting @VisibleForTesting
void transferBlock(ExtendedBlock block, DatanodeInfo[] xferTargets, void transferBlock(ExtendedBlock block, DatanodeInfo[] xferTargets,
StorageType[] xferTargetStorageTypes) throws IOException { StorageType[] xferTargetStorageTypes, String[] xferTargetStorageIDs)
throws IOException {
BPOfferService bpos = getBPOSForBlock(block); BPOfferService bpos = getBPOSForBlock(block);
DatanodeRegistration bpReg = getDNRegistrationForBP(block.getBlockPoolId()); DatanodeRegistration bpReg = getDNRegistrationForBP(block.getBlockPoolId());
@ -2281,17 +2282,19 @@ void transferBlock(ExtendedBlock block, DatanodeInfo[] xferTargets,
LOG.info(bpReg + " Starting thread to transfer " + LOG.info(bpReg + " Starting thread to transfer " +
block + " to " + xfersBuilder); block + " to " + xfersBuilder);
new Daemon(new DataTransfer(xferTargets, xferTargetStorageTypes, block, new Daemon(new DataTransfer(xferTargets, xferTargetStorageTypes,
xferTargetStorageIDs, block,
BlockConstructionStage.PIPELINE_SETUP_CREATE, "")).start(); BlockConstructionStage.PIPELINE_SETUP_CREATE, "")).start();
} }
} }
void transferBlocks(String poolId, Block blocks[], void transferBlocks(String poolId, Block blocks[],
DatanodeInfo xferTargets[][], StorageType[][] xferTargetStorageTypes) { DatanodeInfo[][] xferTargets, StorageType[][] xferTargetStorageTypes,
String[][] xferTargetStorageIDs) {
for (int i = 0; i < blocks.length; i++) { for (int i = 0; i < blocks.length; i++) {
try { try {
transferBlock(new ExtendedBlock(poolId, blocks[i]), xferTargets[i], transferBlock(new ExtendedBlock(poolId, blocks[i]), xferTargets[i],
xferTargetStorageTypes[i]); xferTargetStorageTypes[i], xferTargetStorageIDs[i]);
} catch (IOException ie) { } catch (IOException ie) {
LOG.warn("Failed to transfer block " + blocks[i], ie); LOG.warn("Failed to transfer block " + blocks[i], ie);
} }
@ -2395,6 +2398,7 @@ CHECKSUM_SIZE depends on CHECKSUM_TYPE (usually, 4 for CRC32)
private class DataTransfer implements Runnable { private class DataTransfer implements Runnable {
final DatanodeInfo[] targets; final DatanodeInfo[] targets;
final StorageType[] targetStorageTypes; final StorageType[] targetStorageTypes;
final private String[] targetStorageIds;
final ExtendedBlock b; final ExtendedBlock b;
final BlockConstructionStage stage; final BlockConstructionStage stage;
final private DatanodeRegistration bpReg; final private DatanodeRegistration bpReg;
@ -2406,8 +2410,8 @@ private class DataTransfer implements Runnable {
* entire target list, the block, and the data. * entire target list, the block, and the data.
*/ */
DataTransfer(DatanodeInfo targets[], StorageType[] targetStorageTypes, DataTransfer(DatanodeInfo targets[], StorageType[] targetStorageTypes,
ExtendedBlock b, BlockConstructionStage stage, String[] targetStorageIds, ExtendedBlock b,
final String clientname) { BlockConstructionStage stage, final String clientname) {
if (DataTransferProtocol.LOG.isDebugEnabled()) { if (DataTransferProtocol.LOG.isDebugEnabled()) {
DataTransferProtocol.LOG.debug(getClass().getSimpleName() + ": " DataTransferProtocol.LOG.debug(getClass().getSimpleName() + ": "
+ b + " (numBytes=" + b.getNumBytes() + ")" + b + " (numBytes=" + b.getNumBytes() + ")"
@ -2415,10 +2419,13 @@ private class DataTransfer implements Runnable {
+ ", clientname=" + clientname + ", clientname=" + clientname
+ ", targets=" + Arrays.asList(targets) + ", targets=" + Arrays.asList(targets)
+ ", target storage types=" + (targetStorageTypes == null ? "[]" : + ", target storage types=" + (targetStorageTypes == null ? "[]" :
Arrays.asList(targetStorageTypes))); Arrays.asList(targetStorageTypes))
+ ", target storage IDs=" + (targetStorageIds == null ? "[]" :
Arrays.asList(targetStorageIds)));
} }
this.targets = targets; this.targets = targets;
this.targetStorageTypes = targetStorageTypes; this.targetStorageTypes = targetStorageTypes;
this.targetStorageIds = targetStorageIds;
this.b = b; this.b = b;
this.stage = stage; this.stage = stage;
BPOfferService bpos = blockPoolManager.get(b.getBlockPoolId()); BPOfferService bpos = blockPoolManager.get(b.getBlockPoolId());
@ -2456,7 +2463,7 @@ public void run() {
// //
Token<BlockTokenIdentifier> accessToken = getBlockAccessToken(b, Token<BlockTokenIdentifier> accessToken = getBlockAccessToken(b,
EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE), EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE),
targetStorageTypes); targetStorageTypes, targetStorageIds);
long writeTimeout = dnConf.socketWriteTimeout + long writeTimeout = dnConf.socketWriteTimeout +
HdfsConstants.WRITE_TIMEOUT_EXTENSION * (targets.length-1); HdfsConstants.WRITE_TIMEOUT_EXTENSION * (targets.length-1);
@ -2477,10 +2484,13 @@ public void run() {
DatanodeInfo srcNode = new DatanodeInfoBuilder().setNodeID(bpReg) DatanodeInfo srcNode = new DatanodeInfoBuilder().setNodeID(bpReg)
.build(); .build();
String storageId = targetStorageIds.length > 0 ?
targetStorageIds[0] : null;
new Sender(out).writeBlock(b, targetStorageTypes[0], accessToken, new Sender(out).writeBlock(b, targetStorageTypes[0], accessToken,
clientname, targets, targetStorageTypes, srcNode, clientname, targets, targetStorageTypes, srcNode,
stage, 0, 0, 0, 0, blockSender.getChecksum(), cachingStrategy, stage, 0, 0, 0, 0, blockSender.getChecksum(), cachingStrategy,
false, false, null); false, false, null, storageId,
targetStorageIds);
// send data & checksum // send data & checksum
blockSender.sendBlock(out, unbufOut, null); blockSender.sendBlock(out, unbufOut, null);
@ -2540,12 +2550,12 @@ public void run() {
*/ */
public Token<BlockTokenIdentifier> getBlockAccessToken(ExtendedBlock b, public Token<BlockTokenIdentifier> getBlockAccessToken(ExtendedBlock b,
EnumSet<AccessMode> mode, EnumSet<AccessMode> mode,
StorageType[] storageTypes) throws IOException { StorageType[] storageTypes, String[] storageIds) throws IOException {
Token<BlockTokenIdentifier> accessToken = Token<BlockTokenIdentifier> accessToken =
BlockTokenSecretManager.DUMMY_TOKEN; BlockTokenSecretManager.DUMMY_TOKEN;
if (isBlockTokenEnabled) { if (isBlockTokenEnabled) {
accessToken = blockPoolTokenSecretManager.generateToken(b, mode, accessToken = blockPoolTokenSecretManager.generateToken(b, mode,
storageTypes); storageTypes, storageIds);
} }
return accessToken; return accessToken;
} }
@ -2918,7 +2928,7 @@ private void checkReadAccess(final ExtendedBlock block) throws IOException {
LOG.debug("Got: " + id.toString()); LOG.debug("Got: " + id.toString());
} }
blockPoolTokenSecretManager.checkAccess(id, null, block, blockPoolTokenSecretManager.checkAccess(id, null, block,
BlockTokenIdentifier.AccessMode.READ, null); BlockTokenIdentifier.AccessMode.READ, null, null);
} }
} }
} }
@ -2934,7 +2944,8 @@ private void checkReadAccess(final ExtendedBlock block) throws IOException {
*/ */
void transferReplicaForPipelineRecovery(final ExtendedBlock b, void transferReplicaForPipelineRecovery(final ExtendedBlock b,
final DatanodeInfo[] targets, final StorageType[] targetStorageTypes, final DatanodeInfo[] targets, final StorageType[] targetStorageTypes,
final String client) throws IOException { final String[] targetStorageIds, final String client)
throws IOException {
final long storedGS; final long storedGS;
final long visible; final long visible;
final BlockConstructionStage stage; final BlockConstructionStage stage;
@ -2967,7 +2978,8 @@ void transferReplicaForPipelineRecovery(final ExtendedBlock b,
b.setNumBytes(visible); b.setNumBytes(visible);
if (targets.length > 0) { if (targets.length > 0) {
new DataTransfer(targets, targetStorageTypes, b, stage, client).run(); new DataTransfer(targets, targetStorageTypes, targetStorageIds, b, stage,
client).run();
} }
} }

View File

@ -354,7 +354,8 @@ public void requestShortCircuitFds(final ExtendedBlock blk,
updateCurrentThreadName("Passing file descriptors for block " + blk); updateCurrentThreadName("Passing file descriptors for block " + blk);
DataOutputStream out = getBufferedOutputStream(); DataOutputStream out = getBufferedOutputStream();
checkAccess(out, true, blk, token, checkAccess(out, true, blk, token,
Op.REQUEST_SHORT_CIRCUIT_FDS, BlockTokenIdentifier.AccessMode.READ); Op.REQUEST_SHORT_CIRCUIT_FDS, BlockTokenIdentifier.AccessMode.READ,
null, null);
BlockOpResponseProto.Builder bld = BlockOpResponseProto.newBuilder(); BlockOpResponseProto.Builder bld = BlockOpResponseProto.newBuilder();
FileInputStream fis[] = null; FileInputStream fis[] = null;
SlotId registeredSlotId = null; SlotId registeredSlotId = null;
@ -662,7 +663,7 @@ public void writeBlock(final ExtendedBlock block,
final Token<BlockTokenIdentifier> blockToken, final Token<BlockTokenIdentifier> blockToken,
final String clientname, final String clientname,
final DatanodeInfo[] targets, final DatanodeInfo[] targets,
final StorageType[] targetStorageTypes, final StorageType[] targetStorageTypes,
final DatanodeInfo srcDataNode, final DatanodeInfo srcDataNode,
final BlockConstructionStage stage, final BlockConstructionStage stage,
final int pipelineSize, final int pipelineSize,
@ -673,7 +674,9 @@ public void writeBlock(final ExtendedBlock block,
CachingStrategy cachingStrategy, CachingStrategy cachingStrategy,
boolean allowLazyPersist, boolean allowLazyPersist,
final boolean pinning, final boolean pinning,
final boolean[] targetPinnings) throws IOException { final boolean[] targetPinnings,
final String storageId,
final String[] targetStorageIds) throws IOException {
previousOpClientName = clientname; previousOpClientName = clientname;
updateCurrentThreadName("Receiving block " + block); updateCurrentThreadName("Receiving block " + block);
final boolean isDatanode = clientname.length() == 0; final boolean isDatanode = clientname.length() == 0;
@ -692,8 +695,15 @@ public void writeBlock(final ExtendedBlock block,
if (targetStorageTypes.length > 0) { if (targetStorageTypes.length > 0) {
System.arraycopy(targetStorageTypes, 0, storageTypes, 1, nst); System.arraycopy(targetStorageTypes, 0, storageTypes, 1, nst);
} }
int nsi = targetStorageIds.length;
String[] storageIds = new String[nsi + 1];
storageIds[0] = storageId;
if (targetStorageTypes.length > 0) {
System.arraycopy(targetStorageIds, 0, storageIds, 1, nsi);
}
checkAccess(replyOut, isClient, block, blockToken, Op.WRITE_BLOCK, checkAccess(replyOut, isClient, block, blockToken, Op.WRITE_BLOCK,
BlockTokenIdentifier.AccessMode.WRITE, storageTypes); BlockTokenIdentifier.AccessMode.WRITE,
storageTypes, storageIds);
// check single target for transfer-RBW/Finalized // check single target for transfer-RBW/Finalized
if (isTransfer && targets.length > 0) { if (isTransfer && targets.length > 0) {
@ -743,7 +753,7 @@ public void writeBlock(final ExtendedBlock block,
peer.getLocalAddressString(), peer.getLocalAddressString(),
stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd, stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
clientname, srcDataNode, datanode, requestedChecksum, clientname, srcDataNode, datanode, requestedChecksum,
cachingStrategy, allowLazyPersist, pinning)); cachingStrategy, allowLazyPersist, pinning, storageId));
replica = blockReceiver.getReplica(); replica = blockReceiver.getReplica();
} else { } else {
replica = datanode.data.recoverClose( replica = datanode.data.recoverClose(
@ -796,16 +806,18 @@ public void writeBlock(final ExtendedBlock block,
if (targetPinnings != null && targetPinnings.length > 0) { if (targetPinnings != null && targetPinnings.length > 0) {
new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0], new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
blockToken, clientname, targets, targetStorageTypes, srcDataNode, blockToken, clientname, targets, targetStorageTypes,
stage, pipelineSize, minBytesRcvd, maxBytesRcvd, srcDataNode, stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
latestGenerationStamp, requestedChecksum, cachingStrategy, latestGenerationStamp, requestedChecksum, cachingStrategy,
allowLazyPersist, targetPinnings[0], targetPinnings); allowLazyPersist, targetPinnings[0], targetPinnings,
targetStorageIds[0], targetStorageIds);
} else { } else {
new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0], new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
blockToken, clientname, targets, targetStorageTypes, srcDataNode, blockToken, clientname, targets, targetStorageTypes,
stage, pipelineSize, minBytesRcvd, maxBytesRcvd, srcDataNode, stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
latestGenerationStamp, requestedChecksum, cachingStrategy, latestGenerationStamp, requestedChecksum, cachingStrategy,
allowLazyPersist, false, targetPinnings); allowLazyPersist, false, targetPinnings,
targetStorageIds[0], targetStorageIds);
} }
mirrorOut.flush(); mirrorOut.flush();
@ -929,17 +941,19 @@ public void transferBlock(final ExtendedBlock blk,
final Token<BlockTokenIdentifier> blockToken, final Token<BlockTokenIdentifier> blockToken,
final String clientName, final String clientName,
final DatanodeInfo[] targets, final DatanodeInfo[] targets,
final StorageType[] targetStorageTypes) throws IOException { final StorageType[] targetStorageTypes,
final String[] targetStorageIds) throws IOException {
previousOpClientName = clientName; previousOpClientName = clientName;
updateCurrentThreadName(Op.TRANSFER_BLOCK + " " + blk); updateCurrentThreadName(Op.TRANSFER_BLOCK + " " + blk);
final DataOutputStream out = new DataOutputStream( final DataOutputStream out = new DataOutputStream(
getOutputStream()); getOutputStream());
checkAccess(out, true, blk, blockToken, Op.TRANSFER_BLOCK, checkAccess(out, true, blk, blockToken, Op.TRANSFER_BLOCK,
BlockTokenIdentifier.AccessMode.COPY, targetStorageTypes); BlockTokenIdentifier.AccessMode.COPY, targetStorageTypes,
targetStorageIds);
try { try {
datanode.transferReplicaForPipelineRecovery(blk, targets, datanode.transferReplicaForPipelineRecovery(blk, targets,
targetStorageTypes, clientName); targetStorageTypes, targetStorageIds, clientName);
writeResponse(Status.SUCCESS, null, out); writeResponse(Status.SUCCESS, null, out);
} catch (IOException ioe) { } catch (IOException ioe) {
LOG.info("transferBlock " + blk + " received exception " + ioe); LOG.info("transferBlock " + blk + " received exception " + ioe);
@ -1105,12 +1119,14 @@ public void replaceBlock(final ExtendedBlock block,
final StorageType storageType, final StorageType storageType,
final Token<BlockTokenIdentifier> blockToken, final Token<BlockTokenIdentifier> blockToken,
final String delHint, final String delHint,
final DatanodeInfo proxySource) throws IOException { final DatanodeInfo proxySource,
final String storageId) throws IOException {
updateCurrentThreadName("Replacing block " + block + " from " + delHint); updateCurrentThreadName("Replacing block " + block + " from " + delHint);
DataOutputStream replyOut = new DataOutputStream(getOutputStream()); DataOutputStream replyOut = new DataOutputStream(getOutputStream());
checkAccess(replyOut, true, block, blockToken, checkAccess(replyOut, true, block, blockToken,
Op.REPLACE_BLOCK, BlockTokenIdentifier.AccessMode.REPLACE, Op.REPLACE_BLOCK, BlockTokenIdentifier.AccessMode.REPLACE,
new StorageType[]{ storageType }); new StorageType[]{storageType},
new String[]{storageId});
if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
String msg = "Not able to receive block " + block.getBlockId() + String msg = "Not able to receive block " + block.getBlockId() +
@ -1131,7 +1147,7 @@ public void replaceBlock(final ExtendedBlock block,
// Move the block to different storage in the same datanode // Move the block to different storage in the same datanode
if (proxySource.equals(datanode.getDatanodeId())) { if (proxySource.equals(datanode.getDatanodeId())) {
ReplicaInfo oldReplica = datanode.data.moveBlockAcrossStorage(block, ReplicaInfo oldReplica = datanode.data.moveBlockAcrossStorage(block,
storageType); storageType, storageId);
if (oldReplica != null) { if (oldReplica != null) {
LOG.info("Moved " + block + " from StorageType " LOG.info("Moved " + block + " from StorageType "
+ oldReplica.getVolume().getStorageType() + " to " + storageType); + oldReplica.getVolume().getStorageType() + " to " + storageType);
@ -1188,7 +1204,7 @@ public void replaceBlock(final ExtendedBlock block,
proxyReply, proxySock.getRemoteSocketAddress().toString(), proxyReply, proxySock.getRemoteSocketAddress().toString(),
proxySock.getLocalSocketAddress().toString(), proxySock.getLocalSocketAddress().toString(),
null, 0, 0, 0, "", null, datanode, remoteChecksum, null, 0, 0, 0, "", null, datanode, remoteChecksum,
CachingStrategy.newDropBehind(), false, false)); CachingStrategy.newDropBehind(), false, false, storageId));
// receive a block // receive a block
blockReceiver.receiveBlock(null, null, replyOut, null, blockReceiver.receiveBlock(null, null, replyOut, null,
@ -1258,11 +1274,12 @@ BlockReceiver getBlockReceiver(
final DataNode dn, DataChecksum requestedChecksum, final DataNode dn, DataChecksum requestedChecksum,
CachingStrategy cachingStrategy, CachingStrategy cachingStrategy,
final boolean allowLazyPersist, final boolean allowLazyPersist,
final boolean pinning) throws IOException { final boolean pinning,
final String storageId) throws IOException {
return new BlockReceiver(block, storageType, in, return new BlockReceiver(block, storageType, in,
inAddr, myAddr, stage, newGs, minBytesRcvd, maxBytesRcvd, inAddr, myAddr, stage, newGs, minBytesRcvd, maxBytesRcvd,
clientname, srcDataNode, dn, requestedChecksum, clientname, srcDataNode, dn, requestedChecksum,
cachingStrategy, allowLazyPersist, pinning); cachingStrategy, allowLazyPersist, pinning, storageId);
} }
/** /**
@ -1365,7 +1382,7 @@ void checkAndWaitForBP(final ExtendedBlock block)
private void checkAccess(OutputStream out, final boolean reply, private void checkAccess(OutputStream out, final boolean reply,
ExtendedBlock blk, Token<BlockTokenIdentifier> t, Op op, ExtendedBlock blk, Token<BlockTokenIdentifier> t, Op op,
BlockTokenIdentifier.AccessMode mode) throws IOException { BlockTokenIdentifier.AccessMode mode) throws IOException {
checkAccess(out, reply, blk, t, op, mode, null); checkAccess(out, reply, blk, t, op, mode, null, null);
} }
private void checkAccess(OutputStream out, final boolean reply, private void checkAccess(OutputStream out, final boolean reply,
@ -1373,7 +1390,8 @@ private void checkAccess(OutputStream out, final boolean reply,
final Token<BlockTokenIdentifier> t, final Token<BlockTokenIdentifier> t,
final Op op, final Op op,
final BlockTokenIdentifier.AccessMode mode, final BlockTokenIdentifier.AccessMode mode,
final StorageType[] storageTypes) throws IOException { final StorageType[] storageTypes,
final String[] storageIds) throws IOException {
checkAndWaitForBP(blk); checkAndWaitForBP(blk);
if (datanode.isBlockTokenEnabled) { if (datanode.isBlockTokenEnabled) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
@ -1382,7 +1400,7 @@ private void checkAccess(OutputStream out, final boolean reply,
} }
try { try {
datanode.blockPoolTokenSecretManager.checkAccess(t, null, blk, mode, datanode.blockPoolTokenSecretManager.checkAccess(t, null, blk, mode,
storageTypes); storageTypes, storageIds);
} catch(InvalidToken e) { } catch(InvalidToken e) {
try { try {
if (reply) { if (reply) {

View File

@ -111,7 +111,8 @@ public void processErasureCodingTasks(
new StripedReconstructionInfo( new StripedReconstructionInfo(
reconInfo.getExtendedBlock(), reconInfo.getErasureCodingPolicy(), reconInfo.getExtendedBlock(), reconInfo.getErasureCodingPolicy(),
reconInfo.getLiveBlockIndices(), reconInfo.getSourceDnInfos(), reconInfo.getLiveBlockIndices(), reconInfo.getSourceDnInfos(),
reconInfo.getTargetDnInfos(), reconInfo.getTargetStorageTypes()); reconInfo.getTargetDnInfos(), reconInfo.getTargetStorageTypes(),
reconInfo.getTargetStorageIDs());
final StripedBlockReconstructor task = final StripedBlockReconstructor task =
new StripedBlockReconstructor(this, stripedReconInfo); new StripedBlockReconstructor(this, stripedReconInfo);
if (task.hasValidTargets()) { if (task.hasValidTargets()) {

View File

@ -110,7 +110,7 @@ private BlockReader createBlockReader(long offsetInBlock) {
stripedReader.getSocketAddress4Transfer(source); stripedReader.getSocketAddress4Transfer(source);
Token<BlockTokenIdentifier> blockToken = datanode.getBlockAccessToken( Token<BlockTokenIdentifier> blockToken = datanode.getBlockAccessToken(
block, EnumSet.of(BlockTokenIdentifier.AccessMode.READ), block, EnumSet.of(BlockTokenIdentifier.AccessMode.READ),
StorageType.EMPTY_ARRAY); StorageType.EMPTY_ARRAY, new String[0]);
/* /*
* This can be further improved if the replica is local, then we can * This can be further improved if the replica is local, then we can
* read directly from DN and need to check the replica is FINALIZED * read directly from DN and need to check the replica is FINALIZED

View File

@ -61,6 +61,7 @@ class StripedBlockWriter {
private final ExtendedBlock block; private final ExtendedBlock block;
private final DatanodeInfo target; private final DatanodeInfo target;
private final StorageType storageType; private final StorageType storageType;
private final String storageId;
private Socket targetSocket; private Socket targetSocket;
private DataOutputStream targetOutputStream; private DataOutputStream targetOutputStream;
@ -72,8 +73,8 @@ class StripedBlockWriter {
StripedBlockWriter(StripedWriter stripedWriter, DataNode datanode, StripedBlockWriter(StripedWriter stripedWriter, DataNode datanode,
Configuration conf, ExtendedBlock block, Configuration conf, ExtendedBlock block,
DatanodeInfo target, StorageType storageType) DatanodeInfo target, StorageType storageType,
throws IOException { String storageId) throws IOException {
this.stripedWriter = stripedWriter; this.stripedWriter = stripedWriter;
this.datanode = datanode; this.datanode = datanode;
this.conf = conf; this.conf = conf;
@ -81,6 +82,7 @@ class StripedBlockWriter {
this.block = block; this.block = block;
this.target = target; this.target = target;
this.storageType = storageType; this.storageType = storageType;
this.storageId = storageId;
this.targetBuffer = stripedWriter.allocateWriteBuffer(); this.targetBuffer = stripedWriter.allocateWriteBuffer();
@ -117,7 +119,7 @@ private void init() throws IOException {
Token<BlockTokenIdentifier> blockToken = Token<BlockTokenIdentifier> blockToken =
datanode.getBlockAccessToken(block, datanode.getBlockAccessToken(block,
EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE), EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE),
new StorageType[]{storageType}); new StorageType[]{storageType}, new String[]{storageId});
long writeTimeout = datanode.getDnConf().getSocketWriteTimeout(); long writeTimeout = datanode.getDnConf().getSocketWriteTimeout();
OutputStream unbufOut = NetUtils.getOutputStream(socket, writeTimeout); OutputStream unbufOut = NetUtils.getOutputStream(socket, writeTimeout);
@ -141,7 +143,7 @@ private void init() throws IOException {
new StorageType[]{storageType}, source, new StorageType[]{storageType}, source,
BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0, 0, 0, BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0, 0, 0,
stripedWriter.getChecksum(), stripedWriter.getCachingStrategy(), stripedWriter.getChecksum(), stripedWriter.getCachingStrategy(),
false, false, null); false, false, null, storageId, new String[]{storageId});
targetSocket = socket; targetSocket = socket;
targetOutputStream = out; targetOutputStream = out;

View File

@ -40,24 +40,27 @@ public class StripedReconstructionInfo {
private final byte[] targetIndices; private final byte[] targetIndices;
private final DatanodeInfo[] targets; private final DatanodeInfo[] targets;
private final StorageType[] targetStorageTypes; private final StorageType[] targetStorageTypes;
private final String[] targetStorageIds;
public StripedReconstructionInfo(ExtendedBlock blockGroup, public StripedReconstructionInfo(ExtendedBlock blockGroup,
ErasureCodingPolicy ecPolicy, byte[] liveIndices, DatanodeInfo[] sources, ErasureCodingPolicy ecPolicy, byte[] liveIndices, DatanodeInfo[] sources,
byte[] targetIndices) { byte[] targetIndices) {
this(blockGroup, ecPolicy, liveIndices, sources, targetIndices, null, null); this(blockGroup, ecPolicy, liveIndices, sources, targetIndices, null,
null, null);
} }
StripedReconstructionInfo(ExtendedBlock blockGroup, StripedReconstructionInfo(ExtendedBlock blockGroup,
ErasureCodingPolicy ecPolicy, byte[] liveIndices, DatanodeInfo[] sources, ErasureCodingPolicy ecPolicy, byte[] liveIndices, DatanodeInfo[] sources,
DatanodeInfo[] targets, StorageType[] targetStorageTypes) { DatanodeInfo[] targets, StorageType[] targetStorageTypes,
String[] targetStorageIds) {
this(blockGroup, ecPolicy, liveIndices, sources, null, targets, this(blockGroup, ecPolicy, liveIndices, sources, null, targets,
targetStorageTypes); targetStorageTypes, targetStorageIds);
} }
private StripedReconstructionInfo(ExtendedBlock blockGroup, private StripedReconstructionInfo(ExtendedBlock blockGroup,
ErasureCodingPolicy ecPolicy, byte[] liveIndices, DatanodeInfo[] sources, ErasureCodingPolicy ecPolicy, byte[] liveIndices, DatanodeInfo[] sources,
byte[] targetIndices, DatanodeInfo[] targets, byte[] targetIndices, DatanodeInfo[] targets,
StorageType[] targetStorageTypes) { StorageType[] targetStorageTypes, String[] targetStorageIds) {
this.blockGroup = blockGroup; this.blockGroup = blockGroup;
this.ecPolicy = ecPolicy; this.ecPolicy = ecPolicy;
@ -66,6 +69,7 @@ private StripedReconstructionInfo(ExtendedBlock blockGroup,
this.targetIndices = targetIndices; this.targetIndices = targetIndices;
this.targets = targets; this.targets = targets;
this.targetStorageTypes = targetStorageTypes; this.targetStorageTypes = targetStorageTypes;
this.targetStorageIds = targetStorageIds;
} }
ExtendedBlock getBlockGroup() { ExtendedBlock getBlockGroup() {
@ -95,5 +99,9 @@ DatanodeInfo[] getTargets() {
StorageType[] getTargetStorageTypes() { StorageType[] getTargetStorageTypes() {
return targetStorageTypes; return targetStorageTypes;
} }
String[] getTargetStorageIds() {
return targetStorageIds;
}
} }

View File

@ -55,6 +55,7 @@ class StripedWriter {
private final short[] targetIndices; private final short[] targetIndices;
private boolean hasValidTargets; private boolean hasValidTargets;
private final StorageType[] targetStorageTypes; private final StorageType[] targetStorageTypes;
private final String[] targetStorageIds;
private StripedBlockWriter[] writers; private StripedBlockWriter[] writers;
@ -77,6 +78,8 @@ class StripedWriter {
assert targets != null; assert targets != null;
this.targetStorageTypes = stripedReconInfo.getTargetStorageTypes(); this.targetStorageTypes = stripedReconInfo.getTargetStorageTypes();
assert targetStorageTypes != null; assert targetStorageTypes != null;
this.targetStorageIds = stripedReconInfo.getTargetStorageIds();
assert targetStorageIds != null;
writers = new StripedBlockWriter[targets.length]; writers = new StripedBlockWriter[targets.length];
@ -192,7 +195,7 @@ int initTargetStreams() {
private StripedBlockWriter createWriter(short index) throws IOException { private StripedBlockWriter createWriter(short index) throws IOException {
return new StripedBlockWriter(this, datanode, conf, return new StripedBlockWriter(this, datanode, conf,
reconstructor.getBlock(targetIndices[index]), targets[index], reconstructor.getBlock(targetIndices[index]), targets[index],
targetStorageTypes[index]); targetStorageTypes[index], targetStorageIds[index]);
} }
ByteBuffer allocateWriteBuffer() { ByteBuffer allocateWriteBuffer() {

View File

@ -113,8 +113,8 @@ public Configuration getConf() {
new RoundRobinVolumeChoosingPolicy<V>(); new RoundRobinVolumeChoosingPolicy<V>();
@Override @Override
public V chooseVolume(List<V> volumes, public V chooseVolume(List<V> volumes, long replicaSize, String storageId)
long replicaSize) throws IOException { throws IOException {
if (volumes.size() < 1) { if (volumes.size() < 1) {
throw new DiskOutOfSpaceException("No more available volumes"); throw new DiskOutOfSpaceException("No more available volumes");
} }
@ -125,19 +125,20 @@ public V chooseVolume(List<V> volumes,
storageType.ordinal() : StorageType.DEFAULT.ordinal(); storageType.ordinal() : StorageType.DEFAULT.ordinal();
synchronized (syncLocks[index]) { synchronized (syncLocks[index]) {
return doChooseVolume(volumes, replicaSize); return doChooseVolume(volumes, replicaSize, storageId);
} }
} }
private V doChooseVolume(final List<V> volumes, private V doChooseVolume(final List<V> volumes, long replicaSize,
long replicaSize) throws IOException { String storageId) throws IOException {
AvailableSpaceVolumeList volumesWithSpaces = AvailableSpaceVolumeList volumesWithSpaces =
new AvailableSpaceVolumeList(volumes); new AvailableSpaceVolumeList(volumes);
if (volumesWithSpaces.areAllVolumesWithinFreeSpaceThreshold()) { if (volumesWithSpaces.areAllVolumesWithinFreeSpaceThreshold()) {
// If they're actually not too far out of whack, fall back on pure round // If they're actually not too far out of whack, fall back on pure round
// robin. // robin.
V volume = roundRobinPolicyBalanced.chooseVolume(volumes, replicaSize); V volume = roundRobinPolicyBalanced.chooseVolume(volumes, replicaSize,
storageId);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("All volumes are within the configured free space balance " + LOG.debug("All volumes are within the configured free space balance " +
"threshold. Selecting " + volume + " for write of block size " + "threshold. Selecting " + volume + " for write of block size " +
@ -165,7 +166,7 @@ private V doChooseVolume(final List<V> volumes,
if (mostAvailableAmongLowVolumes < replicaSize || if (mostAvailableAmongLowVolumes < replicaSize ||
random.nextFloat() < scaledPreferencePercent) { random.nextFloat() < scaledPreferencePercent) {
volume = roundRobinPolicyHighAvailable.chooseVolume( volume = roundRobinPolicyHighAvailable.chooseVolume(
highAvailableVolumes, replicaSize); highAvailableVolumes, replicaSize, storageId);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Volumes are imbalanced. Selecting " + volume + LOG.debug("Volumes are imbalanced. Selecting " + volume +
" from high available space volumes for write of block size " " from high available space volumes for write of block size "
@ -173,7 +174,7 @@ private V doChooseVolume(final List<V> volumes,
} }
} else { } else {
volume = roundRobinPolicyLowAvailable.chooseVolume( volume = roundRobinPolicyLowAvailable.chooseVolume(
lowAvailableVolumes, replicaSize); lowAvailableVolumes, replicaSize, storageId);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Volumes are imbalanced. Selecting " + volume + LOG.debug("Volumes are imbalanced. Selecting " + volume +
" from low available space volumes for write of block size " " from low available space volumes for write of block size "
@ -266,7 +267,8 @@ public List<AvailableSpaceVolumePair> getVolumesWithHighAvailableSpace() {
/** /**
* Used so that we only check the available space on a given volume once, at * Used so that we only check the available space on a given volume once, at
* the beginning of {@link AvailableSpaceVolumeChoosingPolicy#chooseVolume(List, long)}. * the beginning of
* {@link AvailableSpaceVolumeChoosingPolicy#chooseVolume}.
*/ */
private class AvailableSpaceVolumePair { private class AvailableSpaceVolumePair {
private final V volume; private final V volume;

View File

@ -318,7 +318,7 @@ ReplicaInputStreams getTmpInputStreams(ExtendedBlock b, long blkoff,
* @return the meta info of the replica which is being written to * @return the meta info of the replica which is being written to
* @throws IOException if an error occurs * @throws IOException if an error occurs
*/ */
ReplicaHandler createTemporary(StorageType storageType, ReplicaHandler createTemporary(StorageType storageType, String storageId,
ExtendedBlock b) throws IOException; ExtendedBlock b) throws IOException;
/** /**
@ -328,7 +328,7 @@ ReplicaHandler createTemporary(StorageType storageType,
* @return the meta info of the replica which is being written to * @return the meta info of the replica which is being written to
* @throws IOException if an error occurs * @throws IOException if an error occurs
*/ */
ReplicaHandler createRbw(StorageType storageType, ReplicaHandler createRbw(StorageType storageType, String storageId,
ExtendedBlock b, boolean allowLazyPersist) throws IOException; ExtendedBlock b, boolean allowLazyPersist) throws IOException;
/** /**
@ -623,7 +623,7 @@ void onCompleteLazyPersist(String bpId, long blockId,
* Move block from one storage to another storage * Move block from one storage to another storage
*/ */
ReplicaInfo moveBlockAcrossStorage(final ExtendedBlock block, ReplicaInfo moveBlockAcrossStorage(final ExtendedBlock block,
StorageType targetStorageType) throws IOException; StorageType targetStorageType, String storageId) throws IOException;
/** /**
* Set a block to be pinned on this datanode so that it cannot be moved * Set a block to be pinned on this datanode so that it cannot be moved

View File

@ -50,7 +50,7 @@ public RoundRobinVolumeChoosingPolicy() {
} }
@Override @Override
public V chooseVolume(final List<V> volumes, long blockSize) public V chooseVolume(final List<V> volumes, long blockSize, String storageId)
throws IOException { throws IOException {
if (volumes.size() < 1) { if (volumes.size() < 1) {

View File

@ -36,8 +36,11 @@ public interface VolumeChoosingPolicy<V extends FsVolumeSpi> {
* *
* @param volumes - a list of available volumes. * @param volumes - a list of available volumes.
* @param replicaSize - the size of the replica for which a volume is sought. * @param replicaSize - the size of the replica for which a volume is sought.
* @param storageId - the storage id of the Volume nominated by the namenode.
* This can usually be ignored by the VolumeChoosingPolicy.
* @return the chosen volume. * @return the chosen volume.
* @throws IOException when disks are unavailable or are full. * @throws IOException when disks are unavailable or are full.
*/ */
public V chooseVolume(List<V> volumes, long replicaSize) throws IOException; V chooseVolume(List<V> volumes, long replicaSize, String storageId)
throws IOException;
} }

View File

@ -927,7 +927,8 @@ static File[] copyBlockFiles(ReplicaInfo srcReplica, File dstMeta,
*/ */
@Override @Override
public ReplicaInfo moveBlockAcrossStorage(ExtendedBlock block, public ReplicaInfo moveBlockAcrossStorage(ExtendedBlock block,
StorageType targetStorageType) throws IOException { StorageType targetStorageType, String targetStorageId)
throws IOException {
ReplicaInfo replicaInfo = getReplicaInfo(block); ReplicaInfo replicaInfo = getReplicaInfo(block);
if (replicaInfo.getState() != ReplicaState.FINALIZED) { if (replicaInfo.getState() != ReplicaState.FINALIZED) {
throw new ReplicaNotFoundException( throw new ReplicaNotFoundException(
@ -952,7 +953,8 @@ public ReplicaInfo moveBlockAcrossStorage(ExtendedBlock block,
FsVolumeReference volumeRef = null; FsVolumeReference volumeRef = null;
try (AutoCloseableLock lock = datasetLock.acquire()) { try (AutoCloseableLock lock = datasetLock.acquire()) {
volumeRef = volumes.getNextVolume(targetStorageType, block.getNumBytes()); volumeRef = volumes.getNextVolume(targetStorageType, targetStorageId,
block.getNumBytes());
} }
try { try {
moveBlock(block, replicaInfo, volumeRef); moveBlock(block, replicaInfo, volumeRef);
@ -1298,11 +1300,11 @@ public Replica recoverClose(ExtendedBlock b, long newGS,
} }
} }
} }
@Override // FsDatasetSpi @Override // FsDatasetSpi
public ReplicaHandler createRbw( public ReplicaHandler createRbw(
StorageType storageType, ExtendedBlock b, boolean allowLazyPersist) StorageType storageType, String storageId, ExtendedBlock b,
throws IOException { boolean allowLazyPersist) throws IOException {
try (AutoCloseableLock lock = datasetLock.acquire()) { try (AutoCloseableLock lock = datasetLock.acquire()) {
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
b.getBlockId()); b.getBlockId());
@ -1335,7 +1337,7 @@ public ReplicaHandler createRbw(
} }
if (ref == null) { if (ref == null) {
ref = volumes.getNextVolume(storageType, b.getNumBytes()); ref = volumes.getNextVolume(storageType, storageId, b.getNumBytes());
} }
FsVolumeImpl v = (FsVolumeImpl) ref.getVolume(); FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
@ -1503,7 +1505,8 @@ public ReplicaInPipeline convertTemporaryToRbw(
@Override // FsDatasetSpi @Override // FsDatasetSpi
public ReplicaHandler createTemporary( public ReplicaHandler createTemporary(
StorageType storageType, ExtendedBlock b) throws IOException { StorageType storageType, String storageId, ExtendedBlock b)
throws IOException {
long startTimeMs = Time.monotonicNow(); long startTimeMs = Time.monotonicNow();
long writerStopTimeoutMs = datanode.getDnConf().getXceiverStopTimeout(); long writerStopTimeoutMs = datanode.getDnConf().getXceiverStopTimeout();
ReplicaInfo lastFoundReplicaInfo = null; ReplicaInfo lastFoundReplicaInfo = null;
@ -1516,7 +1519,7 @@ public ReplicaHandler createTemporary(
invalidate(b.getBlockPoolId(), new Block[] { lastFoundReplicaInfo }); invalidate(b.getBlockPoolId(), new Block[] { lastFoundReplicaInfo });
} }
FsVolumeReference ref = FsVolumeReference ref =
volumes.getNextVolume(storageType, b.getNumBytes()); volumes.getNextVolume(storageType, storageId, b.getNumBytes());
FsVolumeImpl v = (FsVolumeImpl) ref.getVolume(); FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
ReplicaInPipeline newReplicaInfo; ReplicaInPipeline newReplicaInfo;
try { try {
@ -2899,7 +2902,7 @@ private boolean saveNextReplica() {
replicaInfo.getVolume().isTransientStorage()) { replicaInfo.getVolume().isTransientStorage()) {
// Pick a target volume to persist the block. // Pick a target volume to persist the block.
targetReference = volumes.getNextVolume( targetReference = volumes.getNextVolume(
StorageType.DEFAULT, replicaInfo.getNumBytes()); StorageType.DEFAULT, null, replicaInfo.getNumBytes());
targetVolume = (FsVolumeImpl) targetReference.getVolume(); targetVolume = (FsVolumeImpl) targetReference.getVolume();
ramDiskReplicaTracker.recordStartLazyPersist( ramDiskReplicaTracker.recordStartLazyPersist(

View File

@ -81,10 +81,11 @@ List<FsVolumeImpl> getVolumes() {
return Collections.unmodifiableList(volumes); return Collections.unmodifiableList(volumes);
} }
private FsVolumeReference chooseVolume(List<FsVolumeImpl> list, long blockSize) private FsVolumeReference chooseVolume(List<FsVolumeImpl> list,
throws IOException { long blockSize, String storageId) throws IOException {
while (true) { while (true) {
FsVolumeImpl volume = blockChooser.chooseVolume(list, blockSize); FsVolumeImpl volume = blockChooser.chooseVolume(list, blockSize,
storageId);
try { try {
return volume.obtainReference(); return volume.obtainReference();
} catch (ClosedChannelException e) { } catch (ClosedChannelException e) {
@ -100,18 +101,20 @@ private FsVolumeReference chooseVolume(List<FsVolumeImpl> list, long blockSize)
* Get next volume. * Get next volume.
* *
* @param blockSize free space needed on the volume * @param blockSize free space needed on the volume
* @param storageType the desired {@link StorageType} * @param storageType the desired {@link StorageType}
* @param storageId the storage id which may or may not be used by
* the VolumeChoosingPolicy.
* @return next volume to store the block in. * @return next volume to store the block in.
*/ */
FsVolumeReference getNextVolume(StorageType storageType, long blockSize) FsVolumeReference getNextVolume(StorageType storageType, String storageId,
throws IOException { long blockSize) throws IOException {
final List<FsVolumeImpl> list = new ArrayList<>(volumes.size()); final List<FsVolumeImpl> list = new ArrayList<>(volumes.size());
for(FsVolumeImpl v : volumes) { for(FsVolumeImpl v : volumes) {
if (v.getStorageType() == storageType) { if (v.getStorageType() == storageType) {
list.add(v); list.add(v);
} }
} }
return chooseVolume(list, blockSize); return chooseVolume(list, blockSize, storageId);
} }
/** /**
@ -129,7 +132,7 @@ FsVolumeReference getNextTransientVolume(long blockSize) throws IOException {
list.add(v); list.add(v);
} }
} }
return chooseVolume(list, blockSize); return chooseVolume(list, blockSize, null);
} }
long getDfsUsed() throws IOException { long getDfsUsed() throws IOException {

View File

@ -1018,7 +1018,8 @@ public static BlockOpResponseProto transferRbw(final ExtendedBlock b,
// send the request // send the request
new Sender(out).transferBlock(b, new Token<BlockTokenIdentifier>(), new Sender(out).transferBlock(b, new Token<BlockTokenIdentifier>(),
dfsClient.clientName, new DatanodeInfo[]{datanodes[1]}, dfsClient.clientName, new DatanodeInfo[]{datanodes[1]},
new StorageType[]{StorageType.DEFAULT}); new StorageType[]{StorageType.DEFAULT},
new String[0]);
out.flush(); out.flush();
return BlockOpResponseProto.parseDelimitedFrom(in); return BlockOpResponseProto.parseDelimitedFrom(in);

View File

@ -1448,12 +1448,33 @@ public void testStorageTypeCheckAccess(){
testStorageTypeCheckAccessResult(new StorageType[]{StorageType.RAM_DISK, testStorageTypeCheckAccessResult(new StorageType[]{StorageType.RAM_DISK,
StorageType.SSD, StorageType.ARCHIVE}, StorageType.SSD, StorageType.ARCHIVE},
new StorageType[]{StorageType.DISK}, false); new StorageType[]{StorageType.DISK}, false);
testStorageTypeCheckAccessResult(
new StorageType[]{StorageType.DISK, StorageType.SSD},
new StorageType[]{StorageType.SSD},
true);
testStorageTypeCheckAccessResult(new StorageType[]{StorageType.RAM_DISK},
new StorageType[]{StorageType.DISK}, false);
testStorageTypeCheckAccessResult(
new StorageType[]{StorageType.RAM_DISK, StorageType.SSD,
StorageType.ARCHIVE},
new StorageType[]{StorageType.DISK},
false);
testStorageTypeCheckAccessResult(
new StorageType[]{StorageType.RAM_DISK, StorageType.SSD,
StorageType.ARCHIVE},
new StorageType[]{StorageType.DISK},
false);
} }
private void testStorageTypeCheckAccessResult(StorageType[] requested, private void testStorageTypeCheckAccessResult(StorageType[] requested,
StorageType[] allowed, boolean expAccess) { StorageType[] allowed, boolean expAccess) {
try { try {
BlockTokenSecretManager.checkAccess(requested, allowed); BlockTokenSecretManager.checkAccess(requested, allowed, "StorageTypes");
if (!expAccess) { if (!expAccess) {
fail("No expected access with allowed StorageTypes " fail("No expected access with allowed StorageTypes "
+ Arrays.toString(allowed) + " and requested StorageTypes " + Arrays.toString(allowed) + " and requested StorageTypes "
@ -1467,4 +1488,56 @@ private void testStorageTypeCheckAccessResult(StorageType[] requested,
} }
} }
} }
@Test
public void testStorageIDCheckAccess() {
testStorageIDCheckAccessResult(
new String[]{"DN1-Storage1"},
new String[]{"DN1-Storage1"}, true);
testStorageIDCheckAccessResult(new String[]{"DN1-Storage1", "DN2-Storage1"},
new String[]{"DN1-Storage1"},
true);
testStorageIDCheckAccessResult(new String[]{"DN1-Storage1", "DN2-Storage1"},
new String[]{"DN1-Storage1", "DN1-Storage2"}, false);
testStorageIDCheckAccessResult(
new String[]{"DN1-Storage1", "DN1-Storage2"},
new String[]{"DN1-Storage1"}, true);
testStorageIDCheckAccessResult(
new String[]{"DN1-Storage1", "DN1-Storage2"},
new String[]{"DN2-Storage1"}, false);
testStorageIDCheckAccessResult(
new String[]{"DN1-Storage2", "DN2-Storage2"},
new String[]{"DN1-Storage1", "DN2-Storage1"}, false);
testStorageIDCheckAccessResult(new String[0], new String[0], false);
testStorageIDCheckAccessResult(new String[0], new String[]{"DN1-Storage1"},
true);
testStorageIDCheckAccessResult(new String[]{"DN1-Storage1"}, new String[0],
false);
}
private void testStorageIDCheckAccessResult(String[] requested,
String[] allowed, boolean expAccess) {
try {
BlockTokenSecretManager.checkAccess(requested, allowed, "StorageIDs");
if (!expAccess) {
fail("No expected access with allowed StorageIDs"
+ Arrays.toString(allowed) + " and requested StorageIDs"
+ Arrays.toString(requested));
}
} catch (SecretManager.InvalidToken e) {
if (expAccess) {
fail("Expected access with allowed StorageIDs "
+ Arrays.toString(allowed) + " and requested StorageIDs"
+ Arrays.toString(requested));
}
}
}
} }

View File

@ -559,6 +559,7 @@ void writeBlock(ExtendedBlock block, BlockConstructionStage stage,
BlockTokenSecretManager.DUMMY_TOKEN, "cl", BlockTokenSecretManager.DUMMY_TOKEN, "cl",
new DatanodeInfo[1], new StorageType[1], null, stage, new DatanodeInfo[1], new StorageType[1], null, stage,
0, block.getNumBytes(), block.getNumBytes(), newGS, 0, block.getNumBytes(), block.getNumBytes(), newGS,
checksum, CachingStrategy.newDefaultStrategy(), false, false, null); checksum, CachingStrategy.newDefaultStrategy(), false, false,
null, null, new String[0]);
} }
} }

View File

@ -98,11 +98,11 @@ public FsDatasetChecker(DataStorage storage, Configuration conf) {
* correctly propagate the hint to FsDatasetSpi. * correctly propagate the hint to FsDatasetSpi.
*/ */
@Override @Override
public synchronized ReplicaHandler createRbw( public synchronized ReplicaHandler createRbw(StorageType storageType,
StorageType storageType, ExtendedBlock b, boolean allowLazyPersist) String storageId, ExtendedBlock b, boolean allowLazyPersist)
throws IOException { throws IOException {
assertThat(b.getLocalBlock().getNumBytes(), is(EXPECTED_BLOCK_LENGTH)); assertThat(b.getLocalBlock().getNumBytes(), is(EXPECTED_BLOCK_LENGTH));
return super.createRbw(storageType, b, allowLazyPersist); return super.createRbw(storageType, storageId, b, allowLazyPersist);
} }
} }
} }

View File

@ -151,7 +151,7 @@ public GetReplicaVisibleLengthResponseProto answer(
assertTrue("Received BlockTokenIdentifier is wrong", ident.equals(id)); assertTrue("Received BlockTokenIdentifier is wrong", ident.equals(id));
sm.checkAccess(id, null, PBHelperClient.convert(req.getBlock()), sm.checkAccess(id, null, PBHelperClient.convert(req.getBlock()),
BlockTokenIdentifier.AccessMode.WRITE, BlockTokenIdentifier.AccessMode.WRITE,
new StorageType[]{StorageType.DEFAULT}); new StorageType[]{StorageType.DEFAULT}, null);
result = id.getBlockId(); result = id.getBlockId();
} }
return GetReplicaVisibleLengthResponseProto.newBuilder() return GetReplicaVisibleLengthResponseProto.newBuilder()
@ -160,11 +160,11 @@ public GetReplicaVisibleLengthResponseProto answer(
} }
private BlockTokenIdentifier generateTokenId(BlockTokenSecretManager sm, private BlockTokenIdentifier generateTokenId(BlockTokenSecretManager sm,
ExtendedBlock block, ExtendedBlock block, EnumSet<BlockTokenIdentifier.AccessMode> accessModes,
EnumSet<BlockTokenIdentifier.AccessMode> accessModes, StorageType[] storageTypes, String[] storageIds)
StorageType... storageTypes) throws IOException { throws IOException {
Token<BlockTokenIdentifier> token = sm.generateToken(block, accessModes, Token<BlockTokenIdentifier> token = sm.generateToken(block, accessModes,
storageTypes); storageTypes, storageIds);
BlockTokenIdentifier id = sm.createIdentifier(); BlockTokenIdentifier id = sm.createIdentifier();
id.readFields(new DataInputStream(new ByteArrayInputStream(token id.readFields(new DataInputStream(new ByteArrayInputStream(token
.getIdentifier()))); .getIdentifier())));
@ -178,29 +178,28 @@ private void testWritable(boolean enableProtobuf) throws Exception {
enableProtobuf); enableProtobuf);
TestWritable.testWritable(generateTokenId(sm, block3, TestWritable.testWritable(generateTokenId(sm, block3,
EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class), EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class),
StorageType.DEFAULT)); new StorageType[]{StorageType.DEFAULT}, null));
TestWritable.testWritable(generateTokenId(sm, block3, TestWritable.testWritable(generateTokenId(sm, block3,
EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE), EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE),
StorageType.DEFAULT)); new StorageType[]{StorageType.DEFAULT}, null));
TestWritable.testWritable(generateTokenId(sm, block3, TestWritable.testWritable(generateTokenId(sm, block3,
EnumSet.allOf(BlockTokenIdentifier.AccessMode.class), EnumSet.allOf(BlockTokenIdentifier.AccessMode.class),
StorageType.DEFAULT)); new StorageType[]{StorageType.DEFAULT}, null));
TestWritable.testWritable(generateTokenId(sm, block1, TestWritable.testWritable(generateTokenId(sm, block1,
EnumSet.allOf(BlockTokenIdentifier.AccessMode.class), EnumSet.allOf(BlockTokenIdentifier.AccessMode.class),
StorageType.DEFAULT)); new StorageType[]{StorageType.DEFAULT}, null));
TestWritable.testWritable(generateTokenId(sm, block2, TestWritable.testWritable(generateTokenId(sm, block2,
EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE), EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE),
StorageType.DEFAULT)); new StorageType[]{StorageType.DEFAULT}, null));
TestWritable.testWritable(generateTokenId(sm, block3, TestWritable.testWritable(generateTokenId(sm, block3,
EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class), EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class),
StorageType.DEFAULT)); new StorageType[]{StorageType.DEFAULT}, null));
// We must be backwards compatible when adding storageType // We must be backwards compatible when adding storageType
TestWritable.testWritable(generateTokenId(sm, block3, TestWritable.testWritable(generateTokenId(sm, block3,
EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class), EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class), null, null));
(StorageType[]) null));
TestWritable.testWritable(generateTokenId(sm, block3, TestWritable.testWritable(generateTokenId(sm, block3,
EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class), EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class),
StorageType.EMPTY_ARRAY)); StorageType.EMPTY_ARRAY, null));
} }
@Test @Test
@ -215,35 +214,36 @@ public void testWritableProtobuf() throws Exception {
private static void checkAccess(BlockTokenSecretManager m, private static void checkAccess(BlockTokenSecretManager m,
Token<BlockTokenIdentifier> t, ExtendedBlock blk, Token<BlockTokenIdentifier> t, ExtendedBlock blk,
BlockTokenIdentifier.AccessMode mode) throws SecretManager.InvalidToken { BlockTokenIdentifier.AccessMode mode, StorageType[] storageTypes,
m.checkAccess(t, null, blk, mode, new StorageType[]{ StorageType.DEFAULT }); String[] storageIds) throws SecretManager.InvalidToken {
m.checkAccess(t, null, blk, mode, storageTypes, storageIds);
} }
private void tokenGenerationAndVerification(BlockTokenSecretManager master, private void tokenGenerationAndVerification(BlockTokenSecretManager master,
BlockTokenSecretManager slave, StorageType... storageTypes) BlockTokenSecretManager slave, StorageType[] storageTypes,
throws Exception { String[] storageIds) throws Exception {
// single-mode tokens // single-mode tokens
for (BlockTokenIdentifier.AccessMode mode : BlockTokenIdentifier.AccessMode for (BlockTokenIdentifier.AccessMode mode : BlockTokenIdentifier.AccessMode
.values()) { .values()) {
// generated by master // generated by master
Token<BlockTokenIdentifier> token1 = master.generateToken(block1, Token<BlockTokenIdentifier> token1 = master.generateToken(block1,
EnumSet.of(mode), storageTypes); EnumSet.of(mode), storageTypes, storageIds);
checkAccess(master, token1, block1, mode); checkAccess(master, token1, block1, mode, storageTypes, storageIds);
checkAccess(slave, token1, block1, mode); checkAccess(slave, token1, block1, mode, storageTypes, storageIds);
// generated by slave // generated by slave
Token<BlockTokenIdentifier> token2 = slave.generateToken(block2, Token<BlockTokenIdentifier> token2 = slave.generateToken(block2,
EnumSet.of(mode), storageTypes); EnumSet.of(mode), storageTypes, storageIds);
checkAccess(master, token2, block2, mode); checkAccess(master, token2, block2, mode, storageTypes, storageIds);
checkAccess(slave, token2, block2, mode); checkAccess(slave, token2, block2, mode, storageTypes, storageIds);
} }
// multi-mode tokens // multi-mode tokens
Token<BlockTokenIdentifier> mtoken = master.generateToken(block3, Token<BlockTokenIdentifier> mtoken = master.generateToken(block3,
EnumSet.allOf(BlockTokenIdentifier.AccessMode.class), EnumSet.allOf(BlockTokenIdentifier.AccessMode.class),
storageTypes); storageTypes, storageIds);
for (BlockTokenIdentifier.AccessMode mode : BlockTokenIdentifier.AccessMode for (BlockTokenIdentifier.AccessMode mode : BlockTokenIdentifier.AccessMode
.values()) { .values()) {
checkAccess(master, mtoken, block3, mode); checkAccess(master, mtoken, block3, mode, storageTypes, storageIds);
checkAccess(slave, mtoken, block3, mode); checkAccess(slave, mtoken, block3, mode, storageTypes, storageIds);
} }
} }
@ -259,18 +259,18 @@ private void testBlockTokenSecretManager(boolean enableProtobuf)
ExportedBlockKeys keys = masterHandler.exportKeys(); ExportedBlockKeys keys = masterHandler.exportKeys();
slaveHandler.addKeys(keys); slaveHandler.addKeys(keys);
tokenGenerationAndVerification(masterHandler, slaveHandler, tokenGenerationAndVerification(masterHandler, slaveHandler,
StorageType.DEFAULT); new StorageType[]{StorageType.DEFAULT}, null);
tokenGenerationAndVerification(masterHandler, slaveHandler, null); tokenGenerationAndVerification(masterHandler, slaveHandler, null, null);
// key updating // key updating
masterHandler.updateKeys(); masterHandler.updateKeys();
tokenGenerationAndVerification(masterHandler, slaveHandler, tokenGenerationAndVerification(masterHandler, slaveHandler,
StorageType.DEFAULT); new StorageType[]{StorageType.DEFAULT}, null);
tokenGenerationAndVerification(masterHandler, slaveHandler, null); tokenGenerationAndVerification(masterHandler, slaveHandler, null, null);
keys = masterHandler.exportKeys(); keys = masterHandler.exportKeys();
slaveHandler.addKeys(keys); slaveHandler.addKeys(keys);
tokenGenerationAndVerification(masterHandler, slaveHandler, tokenGenerationAndVerification(masterHandler, slaveHandler,
StorageType.DEFAULT); new StorageType[]{StorageType.DEFAULT}, null);
tokenGenerationAndVerification(masterHandler, slaveHandler, null); tokenGenerationAndVerification(masterHandler, slaveHandler, null, null);
} }
@Test @Test
@ -315,7 +315,7 @@ private void testBlockTokenRpc(boolean enableProtobuf) throws Exception {
enableProtobuf); enableProtobuf);
Token<BlockTokenIdentifier> token = sm.generateToken(block3, Token<BlockTokenIdentifier> token = sm.generateToken(block3,
EnumSet.allOf(BlockTokenIdentifier.AccessMode.class), EnumSet.allOf(BlockTokenIdentifier.AccessMode.class),
new StorageType[]{StorageType.DEFAULT}); new StorageType[]{StorageType.DEFAULT}, new String[0]);
final Server server = createMockDatanode(sm, token, conf); final Server server = createMockDatanode(sm, token, conf);
@ -365,7 +365,7 @@ private void testBlockTokenRpcLeak(boolean enableProtobuf) throws Exception {
enableProtobuf); enableProtobuf);
Token<BlockTokenIdentifier> token = sm.generateToken(block3, Token<BlockTokenIdentifier> token = sm.generateToken(block3,
EnumSet.allOf(BlockTokenIdentifier.AccessMode.class), EnumSet.allOf(BlockTokenIdentifier.AccessMode.class),
new StorageType[]{StorageType.DEFAULT}); new StorageType[]{StorageType.DEFAULT}, new String[0]);
final Server server = createMockDatanode(sm, token, conf); final Server server = createMockDatanode(sm, token, conf);
server.start(); server.start();
@ -451,19 +451,23 @@ private void testBlockPoolTokenSecretManager(boolean enableProtobuf)
ExportedBlockKeys keys = masterHandler.exportKeys(); ExportedBlockKeys keys = masterHandler.exportKeys();
bpMgr.addKeys(bpid, keys); bpMgr.addKeys(bpid, keys);
String[] storageIds = new String[] {"DS-9001"};
tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid), tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid),
StorageType.DEFAULT); new StorageType[]{StorageType.DEFAULT}, storageIds);
tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid), null); tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid), null,
null);
// Test key updating // Test key updating
masterHandler.updateKeys(); masterHandler.updateKeys();
tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid), tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid),
StorageType.DEFAULT); new StorageType[]{StorageType.DEFAULT}, storageIds);
tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid), null); tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid), null,
null);
keys = masterHandler.exportKeys(); keys = masterHandler.exportKeys();
bpMgr.addKeys(bpid, keys); bpMgr.addKeys(bpid, keys);
tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid), tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid),
StorageType.DEFAULT); new StorageType[]{StorageType.DEFAULT}, new String[]{"DS-9001"});
tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid), null); tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid), null,
null);
} }
} }
@ -540,7 +544,7 @@ public void testLegacyBlockTokenBytesIsLegacy() throws IOException {
useProto); useProto);
Token<BlockTokenIdentifier> token = sm.generateToken(block1, Token<BlockTokenIdentifier> token = sm.generateToken(block1,
EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class), EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class),
new StorageType[]{StorageType.DEFAULT}); new StorageType[]{StorageType.DEFAULT}, new String[0]);
final byte[] tokenBytes = token.getIdentifier(); final byte[] tokenBytes = token.getIdentifier();
BlockTokenIdentifier legacyToken = new BlockTokenIdentifier(); BlockTokenIdentifier legacyToken = new BlockTokenIdentifier();
BlockTokenIdentifier protobufToken = new BlockTokenIdentifier(); BlockTokenIdentifier protobufToken = new BlockTokenIdentifier();
@ -605,7 +609,7 @@ public void testProtobufBlockTokenBytesIsProtobuf() throws IOException {
useProto); useProto);
Token<BlockTokenIdentifier> token = sm.generateToken(block1, Token<BlockTokenIdentifier> token = sm.generateToken(block1,
EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class), EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class),
StorageType.EMPTY_ARRAY); StorageType.EMPTY_ARRAY, new String[0]);
final byte[] tokenBytes = token.getIdentifier(); final byte[] tokenBytes = token.getIdentifier();
BlockTokenIdentifier legacyToken = new BlockTokenIdentifier(); BlockTokenIdentifier legacyToken = new BlockTokenIdentifier();
BlockTokenIdentifier protobufToken = new BlockTokenIdentifier(); BlockTokenIdentifier protobufToken = new BlockTokenIdentifier();
@ -699,7 +703,8 @@ public void testCraftedProtobufBlockTokenBytesIsProtobuf() throws
*/ */
BlockTokenIdentifier identifier = new BlockTokenIdentifier("user", BlockTokenIdentifier identifier = new BlockTokenIdentifier("user",
"blockpool", 123, EnumSet.allOf(BlockTokenIdentifier.AccessMode.class), "blockpool", 123, EnumSet.allOf(BlockTokenIdentifier.AccessMode.class),
new StorageType[]{StorageType.DISK, StorageType.ARCHIVE}, true); new StorageType[]{StorageType.DISK, StorageType.ARCHIVE},
new String[] {"fake-storage-id"}, true);
Calendar cal = new GregorianCalendar(); Calendar cal = new GregorianCalendar();
cal.set(2017, 1, 9, 0, 12, 35); cal.set(2017, 1, 9, 0, 12, 35);
long datetime = cal.getTimeInMillis(); long datetime = cal.getTimeInMillis();
@ -749,7 +754,8 @@ private void testBlockTokenSerialization(boolean useProto) throws
new StorageType[]{StorageType.RAM_DISK, StorageType.SSD, new StorageType[]{StorageType.RAM_DISK, StorageType.SSD,
StorageType.DISK, StorageType.ARCHIVE}; StorageType.DISK, StorageType.ARCHIVE};
BlockTokenIdentifier ident = new BlockTokenIdentifier("user", "bpool", BlockTokenIdentifier ident = new BlockTokenIdentifier("user", "bpool",
123, accessModes, storageTypes, useProto); 123, accessModes, storageTypes, new String[] {"fake-storage-id"},
useProto);
ident.setExpiryDate(1487080345L); ident.setExpiryDate(1487080345L);
BlockTokenIdentifier ret = writeAndReadBlockToken(ident); BlockTokenIdentifier ret = writeAndReadBlockToken(ident);
assertEquals(ret.getExpiryDate(), 1487080345L); assertEquals(ret.getExpiryDate(), 1487080345L);
@ -760,6 +766,7 @@ private void testBlockTokenSerialization(boolean useProto) throws
assertEquals(ret.getAccessModes(), assertEquals(ret.getAccessModes(),
EnumSet.allOf(BlockTokenIdentifier.AccessMode.class)); EnumSet.allOf(BlockTokenIdentifier.AccessMode.class));
assertArrayEquals(ret.getStorageTypes(), storageTypes); assertArrayEquals(ret.getStorageTypes(), storageTypes);
assertArrayEquals(ret.getStorageIds(), new String[] {"fake-storage-id"});
} }
@Test @Test
@ -767,5 +774,4 @@ public void testBlockTokenSerialization() throws IOException {
testBlockTokenSerialization(false); testBlockTokenSerialization(false);
testBlockTokenSerialization(true); testBlockTokenSerialization(true);
} }
} }

View File

@ -389,7 +389,7 @@ public void blockReport_04() throws IOException {
// Create a bogus new block which will not be present on the namenode. // Create a bogus new block which will not be present on the namenode.
ExtendedBlock b = new ExtendedBlock( ExtendedBlock b = new ExtendedBlock(
poolId, rand.nextLong(), 1024L, rand.nextLong()); poolId, rand.nextLong(), 1024L, rand.nextLong());
dn.getFSDataset().createRbw(StorageType.DEFAULT, b, false); dn.getFSDataset().createRbw(StorageType.DEFAULT, null, b, false);
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false); StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false);

View File

@ -1023,21 +1023,22 @@ public synchronized ReplicaHandler recoverRbw(
@Override // FsDatasetSpi @Override // FsDatasetSpi
public synchronized ReplicaHandler createRbw( public synchronized ReplicaHandler createRbw(
StorageType storageType, ExtendedBlock b, StorageType storageType, String storageId, ExtendedBlock b,
boolean allowLazyPersist) throws IOException { boolean allowLazyPersist) throws IOException {
return createTemporary(storageType, b); return createTemporary(storageType, storageId, b);
} }
@Override // FsDatasetSpi @Override // FsDatasetSpi
public synchronized ReplicaHandler createTemporary( public synchronized ReplicaHandler createTemporary(
StorageType storageType, ExtendedBlock b) throws IOException { StorageType storageType, String storageId, ExtendedBlock b)
throws IOException {
if (isValidBlock(b)) { if (isValidBlock(b)) {
throw new ReplicaAlreadyExistsException("Block " + b + throw new ReplicaAlreadyExistsException("Block " + b +
" is valid, and cannot be written to."); " is valid, and cannot be written to.");
} }
if (isValidRbw(b)) { if (isValidRbw(b)) {
throw new ReplicaAlreadyExistsException("Block " + b + throw new ReplicaAlreadyExistsException("Block " + b +
" is being written, and cannot be written to."); " is being written, and cannot be written to.");
} }
final Map<Block, BInfo> map = getMap(b.getBlockPoolId()); final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
BInfo binfo = new BInfo(b.getBlockPoolId(), b.getLocalBlock(), true); BInfo binfo = new BInfo(b.getBlockPoolId(), b.getLocalBlock(), true);
@ -1419,7 +1420,7 @@ public void onFailLazyPersist(String bpId, long blockId) {
@Override @Override
public ReplicaInfo moveBlockAcrossStorage(ExtendedBlock block, public ReplicaInfo moveBlockAcrossStorage(ExtendedBlock block,
StorageType targetStorageType) throws IOException { StorageType targetStorageType, String storageId) throws IOException {
// TODO Auto-generated method stub // TODO Auto-generated method stub
return null; return null;
} }

View File

@ -647,7 +647,7 @@ public void testNoReplicaUnderRecovery() throws IOException {
if(LOG.isDebugEnabled()) { if(LOG.isDebugEnabled()) {
LOG.debug("Running " + GenericTestUtils.getMethodName()); LOG.debug("Running " + GenericTestUtils.getMethodName());
} }
dn.data.createRbw(StorageType.DEFAULT, block, false); dn.data.createRbw(StorageType.DEFAULT, null, block, false);
BlockRecoveryWorker.RecoveryTaskContiguous RecoveryTaskContiguous = BlockRecoveryWorker.RecoveryTaskContiguous RecoveryTaskContiguous =
recoveryWorker.new RecoveryTaskContiguous(rBlock); recoveryWorker.new RecoveryTaskContiguous(rBlock);
try { try {
@ -673,7 +673,7 @@ public void testNotMatchedReplicaID() throws IOException {
LOG.debug("Running " + GenericTestUtils.getMethodName()); LOG.debug("Running " + GenericTestUtils.getMethodName());
} }
ReplicaInPipeline replicaInfo = dn.data.createRbw( ReplicaInPipeline replicaInfo = dn.data.createRbw(
StorageType.DEFAULT, block, false).getReplica(); StorageType.DEFAULT, null, block, false).getReplica();
ReplicaOutputStreams streams = null; ReplicaOutputStreams streams = null;
try { try {
streams = replicaInfo.createStreams(true, streams = replicaInfo.createStreams(true,
@ -972,7 +972,7 @@ public void run() {
// Register this thread as the writer for the recoveringBlock. // Register this thread as the writer for the recoveringBlock.
LOG.debug("slowWriter creating rbw"); LOG.debug("slowWriter creating rbw");
ReplicaHandler replicaHandler = ReplicaHandler replicaHandler =
spyDN.data.createRbw(StorageType.DISK, block, false); spyDN.data.createRbw(StorageType.DISK, null, block, false);
replicaHandler.close(); replicaHandler.close();
LOG.debug("slowWriter created rbw"); LOG.debug("slowWriter created rbw");
// Tell the parent thread to start progressing. // Tell the parent thread to start progressing.

View File

@ -394,7 +394,7 @@ private boolean replaceBlock(
DataOutputStream out = new DataOutputStream(sock.getOutputStream()); DataOutputStream out = new DataOutputStream(sock.getOutputStream());
new Sender(out).replaceBlock(block, targetStorageType, new Sender(out).replaceBlock(block, targetStorageType,
BlockTokenSecretManager.DUMMY_TOKEN, source.getDatanodeUuid(), BlockTokenSecretManager.DUMMY_TOKEN, source.getDatanodeUuid(),
sourceProxy); sourceProxy, null);
out.flush(); out.flush();
// receiveResponse // receiveResponse
DataInputStream reply = new DataInputStream(sock.getInputStream()); DataInputStream reply = new DataInputStream(sock.getInputStream());

View File

@ -129,7 +129,7 @@ private void issueWriteBlockCall(DataXceiver xceiver, boolean lazyPersist)
DataChecksum.newDataChecksum(DataChecksum.Type.NULL, 0), DataChecksum.newDataChecksum(DataChecksum.Type.NULL, 0),
CachingStrategy.newDefaultStrategy(), CachingStrategy.newDefaultStrategy(),
lazyPersist, lazyPersist,
false, null); false, null, null, new String[0]);
} }
// Helper functions to setup the mock objects. // Helper functions to setup the mock objects.
@ -151,7 +151,7 @@ private static DataXceiver makeStubDataXceiver(
any(BlockConstructionStage.class), anyLong(), anyLong(), anyLong(), any(BlockConstructionStage.class), anyLong(), anyLong(), anyLong(),
anyString(), any(DatanodeInfo.class), any(DataNode.class), anyString(), any(DatanodeInfo.class), any(DataNode.class),
any(DataChecksum.class), any(CachingStrategy.class), any(DataChecksum.class), any(CachingStrategy.class),
captor.capture(), anyBoolean()); captor.capture(), anyBoolean(), any(String.class));
doReturn(mock(DataOutputStream.class)).when(xceiverSpy) doReturn(mock(DataOutputStream.class)).when(xceiverSpy)
.getBufferedOutputStream(); .getBufferedOutputStream();
return xceiverSpy; return xceiverSpy;

View File

@ -167,7 +167,8 @@ public void testReplicationError() throws Exception {
BlockTokenSecretManager.DUMMY_TOKEN, "", BlockTokenSecretManager.DUMMY_TOKEN, "",
new DatanodeInfo[0], new StorageType[0], null, new DatanodeInfo[0], new StorageType[0], null,
BlockConstructionStage.PIPELINE_SETUP_CREATE, 1, 0L, 0L, 0L, BlockConstructionStage.PIPELINE_SETUP_CREATE, 1, 0L, 0L, 0L,
checksum, CachingStrategy.newDefaultStrategy(), false, false, null); checksum, CachingStrategy.newDefaultStrategy(), false, false,
null, null, new String[0]);
out.flush(); out.flush();
// close the connection before sending the content of the block // close the connection before sending the content of the block
@ -274,7 +275,7 @@ public void testDataTransferWhenBytesPerChecksumIsZero() throws IOException {
dn1.getDatanodeId()); dn1.getDatanodeId());
dn0.transferBlock(block, new DatanodeInfo[]{dnd1}, dn0.transferBlock(block, new DatanodeInfo[]{dnd1},
new StorageType[]{StorageType.DISK}); new StorageType[]{StorageType.DISK}, new String[0]);
// Sleep for 1 second so the DataTrasnfer daemon can start transfer. // Sleep for 1 second so the DataTrasnfer daemon can start transfer.
try { try {
Thread.sleep(1000); Thread.sleep(1000);

View File

@ -81,7 +81,7 @@ static int addSomeBlocks(SimulatedFSDataset fsdataset, long startingBlockId,
// we pass expected len as zero, - fsdataset should use the sizeof actual // we pass expected len as zero, - fsdataset should use the sizeof actual
// data written // data written
ReplicaInPipeline bInfo = fsdataset.createRbw( ReplicaInPipeline bInfo = fsdataset.createRbw(
StorageType.DEFAULT, b, false).getReplica(); StorageType.DEFAULT, null, b, false).getReplica();
ReplicaOutputStreams out = bInfo.createStreams(true, ReplicaOutputStreams out = bInfo.createStreams(true,
DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512)); DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512));
try { try {
@ -368,7 +368,7 @@ public void run() {
ExtendedBlock block = new ExtendedBlock(newbpid,1); ExtendedBlock block = new ExtendedBlock(newbpid,1);
try { try {
// it will throw an exception if the block pool is not found // it will throw an exception if the block pool is not found
fsdataset.createTemporary(StorageType.DEFAULT, block); fsdataset.createTemporary(StorageType.DEFAULT, null, block);
} catch (IOException ioe) { } catch (IOException ioe) {
// JUnit does not capture exception in non-main thread, // JUnit does not capture exception in non-main thread,
// so cache it and then let main thread throw later. // so cache it and then let main thread throw later.

View File

@ -138,14 +138,15 @@ public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b, long blkoff,
} }
@Override @Override
public ReplicaHandler createTemporary(StorageType t, ExtendedBlock b) public ReplicaHandler createTemporary(StorageType t, String i,
ExtendedBlock b)
throws IOException { throws IOException {
return new ReplicaHandler(new ExternalReplicaInPipeline(), null); return new ReplicaHandler(new ExternalReplicaInPipeline(), null);
} }
@Override @Override
public ReplicaHandler createRbw(StorageType t, ExtendedBlock b, boolean tf) public ReplicaHandler createRbw(StorageType storageType, String id,
throws IOException { ExtendedBlock b, boolean tf) throws IOException {
return new ReplicaHandler(new ExternalReplicaInPipeline(), null); return new ReplicaHandler(new ExternalReplicaInPipeline(), null);
} }
@ -332,7 +333,8 @@ public void onFailLazyPersist(String bpId, long blockId) {
} }
@Override @Override
public ReplicaInfo moveBlockAcrossStorage(ExtendedBlock block, StorageType targetStorageType) throws IOException { public ReplicaInfo moveBlockAcrossStorage(ExtendedBlock block,
StorageType targetStorageType, String storageId) throws IOException {
return null; return null;
} }

View File

@ -89,10 +89,12 @@ public void testTwoUnbalancedVolumes() throws Exception {
// than the threshold of 1MB. // than the threshold of 1MB.
volumes.add(Mockito.mock(FsVolumeSpi.class)); volumes.add(Mockito.mock(FsVolumeSpi.class));
Mockito.when(volumes.get(1).getAvailable()).thenReturn(1024L * 1024L * 3); Mockito.when(volumes.get(1).getAvailable()).thenReturn(1024L * 1024L * 3);
Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100,
Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100)); null));
Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100)); Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100,
Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100)); null));
Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100,
null));
} }
@Test(timeout=60000) @Test(timeout=60000)
@ -115,21 +117,29 @@ public void testThreeUnbalancedVolumes() throws Exception {
// Third volume, again with 3MB free space. // Third volume, again with 3MB free space.
volumes.add(Mockito.mock(FsVolumeSpi.class)); volumes.add(Mockito.mock(FsVolumeSpi.class));
Mockito.when(volumes.get(2).getAvailable()).thenReturn(1024L * 1024L * 3); Mockito.when(volumes.get(2).getAvailable()).thenReturn(1024L * 1024L * 3);
// We should alternate assigning between the two volumes with a lot of free // We should alternate assigning between the two volumes with a lot of free
// space. // space.
initPolicy(policy, 1.0f); initPolicy(policy, 1.0f);
Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100)); Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100,
Assert.assertEquals(volumes.get(2), policy.chooseVolume(volumes, 100)); null));
Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100)); Assert.assertEquals(volumes.get(2), policy.chooseVolume(volumes, 100,
Assert.assertEquals(volumes.get(2), policy.chooseVolume(volumes, 100)); null));
Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100,
null));
Assert.assertEquals(volumes.get(2), policy.chooseVolume(volumes, 100,
null));
// All writes should be assigned to the volume with the least free space. // All writes should be assigned to the volume with the least free space.
initPolicy(policy, 0.0f); initPolicy(policy, 0.0f);
Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100)); Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100,
Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100)); null));
Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100)); Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100,
Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100)); null));
Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100,
null));
Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100,
null));
} }
@Test(timeout=60000) @Test(timeout=60000)
@ -156,22 +166,30 @@ public void testFourUnbalancedVolumes() throws Exception {
// Fourth volume, again with 3MB free space. // Fourth volume, again with 3MB free space.
volumes.add(Mockito.mock(FsVolumeSpi.class)); volumes.add(Mockito.mock(FsVolumeSpi.class));
Mockito.when(volumes.get(3).getAvailable()).thenReturn(1024L * 1024L * 3); Mockito.when(volumes.get(3).getAvailable()).thenReturn(1024L * 1024L * 3);
// We should alternate assigning between the two volumes with a lot of free // We should alternate assigning between the two volumes with a lot of free
// space. // space.
initPolicy(policy, 1.0f); initPolicy(policy, 1.0f);
Assert.assertEquals(volumes.get(2), policy.chooseVolume(volumes, 100)); Assert.assertEquals(volumes.get(2), policy.chooseVolume(volumes, 100,
Assert.assertEquals(volumes.get(3), policy.chooseVolume(volumes, 100)); null));
Assert.assertEquals(volumes.get(2), policy.chooseVolume(volumes, 100)); Assert.assertEquals(volumes.get(3), policy.chooseVolume(volumes, 100,
Assert.assertEquals(volumes.get(3), policy.chooseVolume(volumes, 100)); null));
Assert.assertEquals(volumes.get(2), policy.chooseVolume(volumes, 100,
null));
Assert.assertEquals(volumes.get(3), policy.chooseVolume(volumes, 100,
null));
// We should alternate assigning between the two volumes with less free // We should alternate assigning between the two volumes with less free
// space. // space.
initPolicy(policy, 0.0f); initPolicy(policy, 0.0f);
Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100)); Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100,
Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100)); null));
Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100)); Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100,
Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100)); null));
Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100,
null));
Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100,
null));
} }
@Test(timeout=60000) @Test(timeout=60000)
@ -190,13 +208,14 @@ public void testNotEnoughSpaceOnSelectedVolume() throws Exception {
// than the threshold of 1MB. // than the threshold of 1MB.
volumes.add(Mockito.mock(FsVolumeSpi.class)); volumes.add(Mockito.mock(FsVolumeSpi.class));
Mockito.when(volumes.get(1).getAvailable()).thenReturn(1024L * 1024L * 3); Mockito.when(volumes.get(1).getAvailable()).thenReturn(1024L * 1024L * 3);
// All writes should be assigned to the volume with the least free space. // All writes should be assigned to the volume with the least free space.
// However, if the volume with the least free space doesn't have enough // However, if the volume with the least free space doesn't have enough
// space to accept the replica size, and another volume does have enough // space to accept the replica size, and another volume does have enough
// free space, that should be chosen instead. // free space, that should be chosen instead.
initPolicy(policy, 0.0f); initPolicy(policy, 0.0f);
Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 1024L * 1024L * 2)); Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes,
1024L * 1024L * 2, null));
} }
@Test(timeout=60000) @Test(timeout=60000)
@ -220,10 +239,11 @@ public void testAvailableSpaceChanges() throws Exception {
.thenReturn(1024L * 1024L * 3) .thenReturn(1024L * 1024L * 3)
.thenReturn(1024L * 1024L * 3) .thenReturn(1024L * 1024L * 3)
.thenReturn(1024L * 1024L * 1); // After the third check, return 1MB. .thenReturn(1024L * 1024L * 1); // After the third check, return 1MB.
// Should still be able to get a volume for the replica even though the // Should still be able to get a volume for the replica even though the
// available space on the second volume changed. // available space on the second volume changed.
Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100)); Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes,
100, null));
} }
@Test(timeout=60000) @Test(timeout=60000)
@ -271,12 +291,12 @@ public void doRandomizedTest(float preferencePercent, int lowSpaceVolumes,
Mockito.when(volume.getAvailable()).thenReturn(1024L * 1024L * 3); Mockito.when(volume.getAvailable()).thenReturn(1024L * 1024L * 3);
volumes.add(volume); volumes.add(volume);
} }
initPolicy(policy, preferencePercent); initPolicy(policy, preferencePercent);
long lowAvailableSpaceVolumeSelected = 0; long lowAvailableSpaceVolumeSelected = 0;
long highAvailableSpaceVolumeSelected = 0; long highAvailableSpaceVolumeSelected = 0;
for (int i = 0; i < RANDOMIZED_ITERATIONS; i++) { for (int i = 0; i < RANDOMIZED_ITERATIONS; i++) {
FsVolumeSpi volume = policy.chooseVolume(volumes, 100); FsVolumeSpi volume = policy.chooseVolume(volumes, 100, null);
for (int j = 0; j < volumes.size(); j++) { for (int j = 0; j < volumes.size(); j++) {
// Note how many times the first low available volume was selected // Note how many times the first low available volume was selected
if (volume == volumes.get(j) && j == 0) { if (volume == volumes.get(j) && j == 0) {

View File

@ -50,20 +50,21 @@ public static void testRR(VolumeChoosingPolicy<FsVolumeSpi> policy)
// Second volume, with 200 bytes of space. // Second volume, with 200 bytes of space.
volumes.add(Mockito.mock(FsVolumeSpi.class)); volumes.add(Mockito.mock(FsVolumeSpi.class));
Mockito.when(volumes.get(1).getAvailable()).thenReturn(200L); Mockito.when(volumes.get(1).getAvailable()).thenReturn(200L);
// Test two rounds of round-robin choosing // Test two rounds of round-robin choosing
Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 0)); Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 0, null));
Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 0)); Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 0, null));
Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 0)); Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 0, null));
Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 0)); Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 0, null));
// The first volume has only 100L space, so the policy should // The first volume has only 100L space, so the policy should
// wisely choose the second one in case we ask for more. // wisely choose the second one in case we ask for more.
Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 150)); Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 150,
null));
// Fail if no volume can be chosen? // Fail if no volume can be chosen?
try { try {
policy.chooseVolume(volumes, Long.MAX_VALUE); policy.chooseVolume(volumes, Long.MAX_VALUE, null);
Assert.fail(); Assert.fail();
} catch (IOException e) { } catch (IOException e) {
// Passed. // Passed.
@ -93,7 +94,7 @@ public static void testRRPolicyExceptionMessage(
int blockSize = 700; int blockSize = 700;
try { try {
policy.chooseVolume(volumes, blockSize); policy.chooseVolume(volumes, blockSize, null);
Assert.fail("expected to throw DiskOutOfSpaceException"); Assert.fail("expected to throw DiskOutOfSpaceException");
} catch(DiskOutOfSpaceException e) { } catch(DiskOutOfSpaceException e) {
Assert.assertEquals("Not returnig the expected message", Assert.assertEquals("Not returnig the expected message",
@ -137,21 +138,21 @@ public static void testRRPolicyWithStorageTypes(
Mockito.when(ssdVolumes.get(1).getAvailable()).thenReturn(100L); Mockito.when(ssdVolumes.get(1).getAvailable()).thenReturn(100L);
Assert.assertEquals(diskVolumes.get(0), Assert.assertEquals(diskVolumes.get(0),
policy.chooseVolume(diskVolumes, 0)); policy.chooseVolume(diskVolumes, 0, null));
// Independent Round-Robin for different storage type // Independent Round-Robin for different storage type
Assert.assertEquals(ssdVolumes.get(0), Assert.assertEquals(ssdVolumes.get(0),
policy.chooseVolume(ssdVolumes, 0)); policy.chooseVolume(ssdVolumes, 0, null));
// Take block size into consideration // Take block size into consideration
Assert.assertEquals(ssdVolumes.get(0), Assert.assertEquals(ssdVolumes.get(0),
policy.chooseVolume(ssdVolumes, 150L)); policy.chooseVolume(ssdVolumes, 150L, null));
Assert.assertEquals(diskVolumes.get(1), Assert.assertEquals(diskVolumes.get(1),
policy.chooseVolume(diskVolumes, 0)); policy.chooseVolume(diskVolumes, 0, null));
Assert.assertEquals(diskVolumes.get(0), Assert.assertEquals(diskVolumes.get(0),
policy.chooseVolume(diskVolumes, 50L)); policy.chooseVolume(diskVolumes, 50L, null));
try { try {
policy.chooseVolume(diskVolumes, 200L); policy.chooseVolume(diskVolumes, 200L, null);
Assert.fail("Should throw an DiskOutOfSpaceException before this!"); Assert.fail("Should throw an DiskOutOfSpaceException before this!");
} catch (DiskOutOfSpaceException e) { } catch (DiskOutOfSpaceException e) {
// Pass. // Pass.

View File

@ -259,7 +259,7 @@ public void testRemoveVolumes() throws IOException {
String bpid = BLOCK_POOL_IDS[NUM_BLOCKS % BLOCK_POOL_IDS.length]; String bpid = BLOCK_POOL_IDS[NUM_BLOCKS % BLOCK_POOL_IDS.length];
ExtendedBlock eb = new ExtendedBlock(bpid, i); ExtendedBlock eb = new ExtendedBlock(bpid, i);
try (ReplicaHandler replica = try (ReplicaHandler replica =
dataset.createRbw(StorageType.DEFAULT, eb, false)) { dataset.createRbw(StorageType.DEFAULT, null, eb, false)) {
} }
} }
final String[] dataDirs = final String[] dataDirs =
@ -566,7 +566,7 @@ public void run() {
class ResponderThread extends Thread { class ResponderThread extends Thread {
public void run() { public void run() {
try (ReplicaHandler replica = dataset try (ReplicaHandler replica = dataset
.createRbw(StorageType.DEFAULT, eb, false)) { .createRbw(StorageType.DEFAULT, null, eb, false)) {
LOG.info("CreateRbw finished"); LOG.info("CreateRbw finished");
startFinalizeLatch.countDown(); startFinalizeLatch.countDown();

View File

@ -101,7 +101,7 @@ public Boolean get() {
} }
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
try (FsVolumeReference ref = try (FsVolumeReference ref =
volumeList.getNextVolume(StorageType.DEFAULT, 128)) { volumeList.getNextVolume(StorageType.DEFAULT, null, 128)) {
// volume No.2 will not be chosen. // volume No.2 will not be chosen.
assertNotEquals(ref.getVolume(), volumes.get(1)); assertNotEquals(ref.getVolume(), volumes.get(1));
} }

View File

@ -353,7 +353,7 @@ private void testWriteToRbw(FsDatasetImpl dataSet, ExtendedBlock[] blocks) throw
} }
try { try {
dataSet.createRbw(StorageType.DEFAULT, blocks[FINALIZED], false); dataSet.createRbw(StorageType.DEFAULT, null, blocks[FINALIZED], false);
Assert.fail("Should not have created a replica that's already " + Assert.fail("Should not have created a replica that's already " +
"finalized " + blocks[FINALIZED]); "finalized " + blocks[FINALIZED]);
} catch (ReplicaAlreadyExistsException e) { } catch (ReplicaAlreadyExistsException e) {
@ -371,7 +371,7 @@ private void testWriteToRbw(FsDatasetImpl dataSet, ExtendedBlock[] blocks) throw
} }
try { try {
dataSet.createRbw(StorageType.DEFAULT, blocks[TEMPORARY], false); dataSet.createRbw(StorageType.DEFAULT, null, blocks[TEMPORARY], false);
Assert.fail("Should not have created a replica that had created as " + Assert.fail("Should not have created a replica that had created as " +
"temporary " + blocks[TEMPORARY]); "temporary " + blocks[TEMPORARY]);
} catch (ReplicaAlreadyExistsException e) { } catch (ReplicaAlreadyExistsException e) {
@ -381,7 +381,7 @@ private void testWriteToRbw(FsDatasetImpl dataSet, ExtendedBlock[] blocks) throw
0L, blocks[RBW].getNumBytes()); // expect to be successful 0L, blocks[RBW].getNumBytes()); // expect to be successful
try { try {
dataSet.createRbw(StorageType.DEFAULT, blocks[RBW], false); dataSet.createRbw(StorageType.DEFAULT, null, blocks[RBW], false);
Assert.fail("Should not have created a replica that had created as RBW " + Assert.fail("Should not have created a replica that had created as RBW " +
blocks[RBW]); blocks[RBW]);
} catch (ReplicaAlreadyExistsException e) { } catch (ReplicaAlreadyExistsException e) {
@ -397,7 +397,7 @@ private void testWriteToRbw(FsDatasetImpl dataSet, ExtendedBlock[] blocks) throw
} }
try { try {
dataSet.createRbw(StorageType.DEFAULT, blocks[RWR], false); dataSet.createRbw(StorageType.DEFAULT, null, blocks[RWR], false);
Assert.fail("Should not have created a replica that was waiting to be " + Assert.fail("Should not have created a replica that was waiting to be " +
"recovered " + blocks[RWR]); "recovered " + blocks[RWR]);
} catch (ReplicaAlreadyExistsException e) { } catch (ReplicaAlreadyExistsException e) {
@ -413,7 +413,7 @@ private void testWriteToRbw(FsDatasetImpl dataSet, ExtendedBlock[] blocks) throw
} }
try { try {
dataSet.createRbw(StorageType.DEFAULT, blocks[RUR], false); dataSet.createRbw(StorageType.DEFAULT, null, blocks[RUR], false);
Assert.fail("Should not have created a replica that was under recovery " + Assert.fail("Should not have created a replica that was under recovery " +
blocks[RUR]); blocks[RUR]);
} catch (ReplicaAlreadyExistsException e) { } catch (ReplicaAlreadyExistsException e) {
@ -430,49 +430,49 @@ private void testWriteToRbw(FsDatasetImpl dataSet, ExtendedBlock[] blocks) throw
e.getMessage().contains(ReplicaNotFoundException.NON_EXISTENT_REPLICA)); e.getMessage().contains(ReplicaNotFoundException.NON_EXISTENT_REPLICA));
} }
dataSet.createRbw(StorageType.DEFAULT, blocks[NON_EXISTENT], false); dataSet.createRbw(StorageType.DEFAULT, null, blocks[NON_EXISTENT], false);
} }
private void testWriteToTemporary(FsDatasetImpl dataSet, ExtendedBlock[] blocks) throws IOException { private void testWriteToTemporary(FsDatasetImpl dataSet, ExtendedBlock[] blocks) throws IOException {
try { try {
dataSet.createTemporary(StorageType.DEFAULT, blocks[FINALIZED]); dataSet.createTemporary(StorageType.DEFAULT, null, blocks[FINALIZED]);
Assert.fail("Should not have created a temporary replica that was " + Assert.fail("Should not have created a temporary replica that was " +
"finalized " + blocks[FINALIZED]); "finalized " + blocks[FINALIZED]);
} catch (ReplicaAlreadyExistsException e) { } catch (ReplicaAlreadyExistsException e) {
} }
try { try {
dataSet.createTemporary(StorageType.DEFAULT, blocks[TEMPORARY]); dataSet.createTemporary(StorageType.DEFAULT, null, blocks[TEMPORARY]);
Assert.fail("Should not have created a replica that had created as" + Assert.fail("Should not have created a replica that had created as" +
"temporary " + blocks[TEMPORARY]); "temporary " + blocks[TEMPORARY]);
} catch (ReplicaAlreadyExistsException e) { } catch (ReplicaAlreadyExistsException e) {
} }
try { try {
dataSet.createTemporary(StorageType.DEFAULT, blocks[RBW]); dataSet.createTemporary(StorageType.DEFAULT, null, blocks[RBW]);
Assert.fail("Should not have created a replica that had created as RBW " + Assert.fail("Should not have created a replica that had created as RBW " +
blocks[RBW]); blocks[RBW]);
} catch (ReplicaAlreadyExistsException e) { } catch (ReplicaAlreadyExistsException e) {
} }
try { try {
dataSet.createTemporary(StorageType.DEFAULT, blocks[RWR]); dataSet.createTemporary(StorageType.DEFAULT, null, blocks[RWR]);
Assert.fail("Should not have created a replica that was waiting to be " + Assert.fail("Should not have created a replica that was waiting to be " +
"recovered " + blocks[RWR]); "recovered " + blocks[RWR]);
} catch (ReplicaAlreadyExistsException e) { } catch (ReplicaAlreadyExistsException e) {
} }
try { try {
dataSet.createTemporary(StorageType.DEFAULT, blocks[RUR]); dataSet.createTemporary(StorageType.DEFAULT, null, blocks[RUR]);
Assert.fail("Should not have created a replica that was under recovery " + Assert.fail("Should not have created a replica that was under recovery " +
blocks[RUR]); blocks[RUR]);
} catch (ReplicaAlreadyExistsException e) { } catch (ReplicaAlreadyExistsException e) {
} }
dataSet.createTemporary(StorageType.DEFAULT, blocks[NON_EXISTENT]); dataSet.createTemporary(StorageType.DEFAULT, null, blocks[NON_EXISTENT]);
try { try {
dataSet.createTemporary(StorageType.DEFAULT, blocks[NON_EXISTENT]); dataSet.createTemporary(StorageType.DEFAULT, null, blocks[NON_EXISTENT]);
Assert.fail("Should not have created a replica that had already been " Assert.fail("Should not have created a replica that had already been "
+ "created " + blocks[NON_EXISTENT]); + "created " + blocks[NON_EXISTENT]);
} catch (Exception e) { } catch (Exception e) {
@ -485,7 +485,8 @@ private void testWriteToTemporary(FsDatasetImpl dataSet, ExtendedBlock[] blocks)
blocks[NON_EXISTENT].setGenerationStamp(newGenStamp); blocks[NON_EXISTENT].setGenerationStamp(newGenStamp);
try { try {
ReplicaInPipeline replicaInfo = ReplicaInPipeline replicaInfo =
dataSet.createTemporary(StorageType.DEFAULT, blocks[NON_EXISTENT]).getReplica(); dataSet.createTemporary(StorageType.DEFAULT, null,
blocks[NON_EXISTENT]).getReplica();
Assert.assertTrue(replicaInfo.getGenerationStamp() == newGenStamp); Assert.assertTrue(replicaInfo.getGenerationStamp() == newGenStamp);
Assert.assertTrue( Assert.assertTrue(
replicaInfo.getBlockId() == blocks[NON_EXISTENT].getBlockId()); replicaInfo.getBlockId() == blocks[NON_EXISTENT].getBlockId());

View File

@ -0,0 +1,330 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.ReconfigurationException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.*;
import org.apache.hadoop.hdfs.protocol.*;
import org.apache.hadoop.hdfs.server.blockmanagement.*;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
import org.apache.hadoop.net.Node;
import org.junit.After;
import org.junit.Test;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/**
* Test to ensure that the StorageType and StorageID sent from Namenode
* to DFSClient are respected.
*/
public class TestNamenodeStorageDirectives {
public static final Logger LOG =
LoggerFactory.getLogger(TestNamenodeStorageDirectives.class);
private static final int BLOCK_SIZE = 512;
private MiniDFSCluster cluster;
@After
public void tearDown() {
shutdown();
}
private void startDFSCluster(int numNameNodes, int numDataNodes,
int storagePerDataNode, StorageType[][] storageTypes)
throws IOException {
startDFSCluster(numNameNodes, numDataNodes, storagePerDataNode,
storageTypes, RoundRobinVolumeChoosingPolicy.class,
BlockPlacementPolicyDefault.class);
}
private void startDFSCluster(int numNameNodes, int numDataNodes,
int storagePerDataNode, StorageType[][] storageTypes,
Class<? extends VolumeChoosingPolicy> volumeChoosingPolicy,
Class<? extends BlockPlacementPolicy> blockPlacementPolicy) throws
IOException {
shutdown();
Configuration conf = new Configuration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
/*
* Lower the DN heartbeat, DF rate, and recheck interval to one second
* so state about failures and datanode death propagates faster.
*/
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_DF_INTERVAL_KEY, 1000);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
1000);
/* Allow 1 volume failure */
conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1);
conf.setTimeDuration(DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY,
0, TimeUnit.MILLISECONDS);
conf.setClass(
DFSConfigKeys.DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_POLICY_KEY,
volumeChoosingPolicy, VolumeChoosingPolicy.class);
conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
blockPlacementPolicy, BlockPlacementPolicy.class);
MiniDFSNNTopology nnTopology =
MiniDFSNNTopology.simpleFederatedTopology(numNameNodes);
cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(nnTopology)
.numDataNodes(numDataNodes)
.storagesPerDatanode(storagePerDataNode)
.storageTypes(storageTypes)
.build();
cluster.waitActive();
}
private void shutdown() {
if (cluster != null) {
cluster.shutdown();
cluster = null;
}
}
private void createFile(Path path, int numBlocks, short replicateFactor)
throws IOException, InterruptedException, TimeoutException {
createFile(0, path, numBlocks, replicateFactor);
}
private void createFile(int fsIdx, Path path, int numBlocks,
short replicateFactor)
throws IOException, TimeoutException, InterruptedException {
final int seed = 0;
final DistributedFileSystem fs = cluster.getFileSystem(fsIdx);
DFSTestUtil.createFile(fs, path, BLOCK_SIZE * numBlocks,
replicateFactor, seed);
DFSTestUtil.waitReplication(fs, path, replicateFactor);
}
private boolean verifyFileReplicasOnStorageType(Path path, int numBlocks,
StorageType storageType) throws IOException {
MiniDFSCluster.NameNodeInfo info = cluster.getNameNodeInfos()[0];
InetSocketAddress addr = info.nameNode.getServiceRpcAddress();
assert addr.getPort() != 0;
DFSClient client = new DFSClient(addr, cluster.getConfiguration(0));
FileSystem fs = cluster.getFileSystem();
if (!fs.exists(path)) {
LOG.info("verifyFileReplicasOnStorageType: file {} does not exist", path);
return false;
}
long fileLength = client.getFileInfo(path.toString()).getLen();
int foundBlocks = 0;
LocatedBlocks locatedBlocks =
client.getLocatedBlocks(path.toString(), 0, fileLength);
for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) {
for (StorageType st : locatedBlock.getStorageTypes()) {
if (st == storageType) {
foundBlocks++;
}
}
}
LOG.info("Found {}/{} blocks on StorageType {}",
foundBlocks, numBlocks, storageType);
final boolean isValid = foundBlocks >= numBlocks;
return isValid;
}
private void testStorageTypes(StorageType[][] storageTypes,
String storagePolicy, StorageType[] expectedStorageTypes,
StorageType[] unexpectedStorageTypes) throws ReconfigurationException,
InterruptedException, TimeoutException, IOException {
final int numDataNodes = storageTypes.length;
final int storagePerDataNode = storageTypes[0].length;
startDFSCluster(1, numDataNodes, storagePerDataNode, storageTypes);
cluster.getFileSystem(0).setStoragePolicy(new Path("/"), storagePolicy);
Path testFile = new Path("/test");
final short replFactor = 2;
final int numBlocks = 10;
createFile(testFile, numBlocks, replFactor);
for (StorageType storageType: expectedStorageTypes) {
assertTrue(verifyFileReplicasOnStorageType(testFile, numBlocks,
storageType));
}
for (StorageType storageType: unexpectedStorageTypes) {
assertFalse(verifyFileReplicasOnStorageType(testFile, numBlocks,
storageType));
}
}
/**
* Verify that writing to SSD and DISK will write to the correct Storage
* Types.
* @throws IOException
*/
@Test(timeout=60000)
public void testTargetStorageTypes() throws ReconfigurationException,
InterruptedException, TimeoutException, IOException {
// DISK and not anything else.
testStorageTypes(new StorageType[][]{
{StorageType.SSD, StorageType.DISK},
{StorageType.SSD, StorageType.DISK}},
"ONE_SSD",
new StorageType[]{StorageType.SSD, StorageType.DISK},
new StorageType[]{StorageType.RAM_DISK, StorageType.ARCHIVE});
// only on SSD.
testStorageTypes(new StorageType[][]{
{StorageType.SSD, StorageType.DISK},
{StorageType.SSD, StorageType.DISK}},
"ALL_SSD",
new StorageType[]{StorageType.SSD},
new StorageType[]{StorageType.RAM_DISK, StorageType.DISK,
StorageType.ARCHIVE});
// only on SSD.
testStorageTypes(new StorageType[][]{
{StorageType.SSD, StorageType.DISK, StorageType.DISK},
{StorageType.SSD, StorageType.DISK, StorageType.DISK},
{StorageType.DISK, StorageType.DISK, StorageType.DISK}},
"ALL_SSD",
new StorageType[]{StorageType.SSD},
new StorageType[]{StorageType.RAM_DISK, StorageType.DISK,
StorageType.ARCHIVE});
// DISK and not anything else.
testStorageTypes(new StorageType[][] {
{StorageType.RAM_DISK, StorageType.SSD},
{StorageType.SSD, StorageType.DISK},
{StorageType.SSD, StorageType.DISK}},
"HOT",
new StorageType[]{StorageType.DISK},
new StorageType[] {StorageType.RAM_DISK, StorageType.SSD,
StorageType.ARCHIVE});
testStorageTypes(new StorageType[][] {
{StorageType.RAM_DISK, StorageType.SSD},
{StorageType.SSD, StorageType.DISK},
{StorageType.ARCHIVE, StorageType.ARCHIVE},
{StorageType.ARCHIVE, StorageType.ARCHIVE}},
"WARM",
new StorageType[]{StorageType.DISK, StorageType.ARCHIVE},
new StorageType[]{StorageType.RAM_DISK, StorageType.SSD});
testStorageTypes(new StorageType[][] {
{StorageType.RAM_DISK, StorageType.SSD},
{StorageType.SSD, StorageType.DISK},
{StorageType.ARCHIVE, StorageType.ARCHIVE},
{StorageType.ARCHIVE, StorageType.ARCHIVE}},
"COLD",
new StorageType[]{StorageType.ARCHIVE},
new StorageType[]{StorageType.RAM_DISK, StorageType.SSD,
StorageType.DISK});
// We wait for Lasy Persist to write to disk.
testStorageTypes(new StorageType[][] {
{StorageType.RAM_DISK, StorageType.SSD},
{StorageType.SSD, StorageType.DISK},
{StorageType.SSD, StorageType.DISK}},
"LAZY_PERSIST",
new StorageType[]{StorageType.DISK},
new StorageType[]{StorageType.RAM_DISK, StorageType.SSD,
StorageType.ARCHIVE});
}
/**
* A VolumeChoosingPolicy test stub used to verify that the storageId passed
* in is indeed in the list of volumes.
* @param <V>
*/
private static class TestVolumeChoosingPolicy<V extends FsVolumeSpi>
extends RoundRobinVolumeChoosingPolicy<V> {
static String expectedStorageId;
@Override
public V chooseVolume(List<V> volumes, long replicaSize, String storageId)
throws IOException {
assertEquals(expectedStorageId, storageId);
return super.chooseVolume(volumes, replicaSize, storageId);
}
}
private static class TestBlockPlacementPolicy
extends BlockPlacementPolicyDefault {
static DatanodeStorageInfo[] dnStorageInfosToReturn;
@Override
public DatanodeStorageInfo[] chooseTarget(String srcPath, int numOfReplicas,
Node writer, List<DatanodeStorageInfo> chosenNodes,
boolean returnChosenNodes, Set<Node> excludedNodes, long blocksize,
final BlockStoragePolicy storagePolicy, EnumSet<AddBlockFlag> flags) {
return dnStorageInfosToReturn;
}
}
private DatanodeStorageInfo getDatanodeStorageInfo(int dnIndex)
throws UnregisteredNodeException {
if (cluster == null) {
return null;
}
DatanodeID dnId = cluster.getDataNodes().get(dnIndex).getDatanodeId();
DatanodeManager dnManager = cluster.getNamesystem()
.getBlockManager().getDatanodeManager();
return dnManager.getDatanode(dnId).getStorageInfos()[0];
}
@Test(timeout=60000)
public void testStorageIDBlockPlacementSpecific()
throws ReconfigurationException, InterruptedException, TimeoutException,
IOException {
final StorageType[][] storageTypes = {
{StorageType.DISK, StorageType.DISK},
{StorageType.DISK, StorageType.DISK},
{StorageType.DISK, StorageType.DISK},
{StorageType.DISK, StorageType.DISK},
{StorageType.DISK, StorageType.DISK},
};
final int numDataNodes = storageTypes.length;
final int storagePerDataNode = storageTypes[0].length;
startDFSCluster(1, numDataNodes, storagePerDataNode, storageTypes,
TestVolumeChoosingPolicy.class, TestBlockPlacementPolicy.class);
Path testFile = new Path("/test");
final short replFactor = 1;
final int numBlocks = 10;
DatanodeStorageInfo dnInfoToUse = getDatanodeStorageInfo(0);
TestBlockPlacementPolicy.dnStorageInfosToReturn =
new DatanodeStorageInfo[] {dnInfoToUse};
TestVolumeChoosingPolicy.expectedStorageId = dnInfoToUse.getStorageID();
//file creation invokes both BlockPlacementPolicy and VolumeChoosingPolicy,
//and will test that the storage ids match
createFile(testFile, numBlocks, replFactor);
}
}