Merge r1609845 through r1612502 from trunk.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-6584@1612505 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2014-07-22 08:23:34 +00:00
commit ee13f8bf3e
37 changed files with 656 additions and 218 deletions

View File

@ -324,6 +324,10 @@ Release 2.6.0 - UNRELEASED
HDFS-6616. Add exclude-datanodes feature to WebHDFS redirection so that it
will not redirect retries to the same datanode. (zhaoyunjiong via szetszwo)
HDFS-6702. Change DFSClient to pass the StorageType from the namenode to
datanodes and change datanode to write block replicas using the specified
storage type. (szetszwo)
OPTIMIZATIONS
HDFS-6690. Deduplicate xattr names in memory. (wang)

View File

@ -313,6 +313,7 @@ class DataStreamer extends Daemon {
private DataInputStream blockReplyStream;
private ResponseProcessor response = null;
private volatile DatanodeInfo[] nodes = null; // list of targets for current block
private volatile StorageType[] storageTypes = null;
private volatile String[] storageIDs = null;
private final LoadingCache<DatanodeInfo, DatanodeInfo> excludedNodes =
CacheBuilder.newBuilder()
@ -417,10 +418,12 @@ private DataStreamer(LocatedBlock lastBlock, HdfsFileStatus stat,
}
private void setPipeline(LocatedBlock lb) {
setPipeline(lb.getLocations(), lb.getStorageIDs());
setPipeline(lb.getLocations(), lb.getStorageTypes(), lb.getStorageIDs());
}
private void setPipeline(DatanodeInfo[] nodes, String[] storageIDs) {
private void setPipeline(DatanodeInfo[] nodes, StorageType[] storageTypes,
String[] storageIDs) {
this.nodes = nodes;
this.storageTypes = storageTypes;
this.storageIDs = storageIDs;
}
@ -446,7 +449,7 @@ private void endBlock() {
this.setName("DataStreamer for file " + src);
closeResponder();
closeStream();
setPipeline(null, null);
setPipeline(null, null, null);
stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
}
@ -1031,10 +1034,12 @@ private void addDatanode2ExistingPipeline() throws IOException {
//transfer replica
final DatanodeInfo src = d == 0? nodes[1]: nodes[d - 1];
final DatanodeInfo[] targets = {nodes[d]};
transfer(src, targets, lb.getBlockToken());
final StorageType[] targetStorageTypes = {storageTypes[d]};
transfer(src, targets, targetStorageTypes, lb.getBlockToken());
}
private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets,
final StorageType[] targetStorageTypes,
final Token<BlockTokenIdentifier> blockToken) throws IOException {
//transfer replica to the new datanode
Socket sock = null;
@ -1056,7 +1061,7 @@ private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets,
//send the TRANSFER_BLOCK request
new Sender(out).transferBlock(block, blockToken, dfsClient.clientName,
targets);
targets, targetStorageTypes);
out.flush();
//ack
@ -1135,16 +1140,15 @@ private boolean setupPipelineForAppendOrRecovery() throws IOException {
failed.add(nodes[errorIndex]);
DatanodeInfo[] newnodes = new DatanodeInfo[nodes.length-1];
System.arraycopy(nodes, 0, newnodes, 0, errorIndex);
System.arraycopy(nodes, errorIndex+1, newnodes, errorIndex,
newnodes.length-errorIndex);
arraycopy(nodes, newnodes, errorIndex);
final StorageType[] newStorageTypes = new StorageType[newnodes.length];
arraycopy(storageTypes, newStorageTypes, errorIndex);
final String[] newStorageIDs = new String[newnodes.length];
System.arraycopy(storageIDs, 0, newStorageIDs, 0, errorIndex);
System.arraycopy(storageIDs, errorIndex+1, newStorageIDs, errorIndex,
newStorageIDs.length-errorIndex);
arraycopy(storageIDs, newStorageIDs, errorIndex);
setPipeline(newnodes, newStorageIDs);
setPipeline(newnodes, newStorageTypes, newStorageIDs);
// Just took care of a node error while waiting for a node restart
if (restartingNodeIndex >= 0) {
@ -1181,7 +1185,7 @@ private boolean setupPipelineForAppendOrRecovery() throws IOException {
// set up the pipeline again with the remaining nodes
if (failPacket) { // for testing
success = createBlockOutputStream(nodes, newGS, isRecovery);
success = createBlockOutputStream(nodes, storageTypes, newGS, isRecovery);
failPacket = false;
try {
// Give DNs time to send in bad reports. In real situations,
@ -1190,7 +1194,7 @@ private boolean setupPipelineForAppendOrRecovery() throws IOException {
Thread.sleep(2000);
} catch (InterruptedException ie) {}
} else {
success = createBlockOutputStream(nodes, newGS, isRecovery);
success = createBlockOutputStream(nodes, storageTypes, newGS, isRecovery);
}
if (restartingNodeIndex >= 0) {
@ -1242,6 +1246,7 @@ private boolean setupPipelineForAppendOrRecovery() throws IOException {
private LocatedBlock nextBlockOutputStream() throws IOException {
LocatedBlock lb = null;
DatanodeInfo[] nodes = null;
StorageType[] storageTypes = null;
int count = dfsClient.getConf().nBlockWriteRetry;
boolean success = false;
ExtendedBlock oldBlock = block;
@ -1264,11 +1269,12 @@ private LocatedBlock nextBlockOutputStream() throws IOException {
bytesSent = 0;
accessToken = lb.getBlockToken();
nodes = lb.getLocations();
storageTypes = lb.getStorageTypes();
//
// Connect to first DataNode in the list.
//
success = createBlockOutputStream(nodes, 0L, false);
success = createBlockOutputStream(nodes, storageTypes, 0L, false);
if (!success) {
DFSClient.LOG.info("Abandoning " + block);
@ -1289,8 +1295,8 @@ private LocatedBlock nextBlockOutputStream() throws IOException {
// connects to the first datanode in the pipeline
// Returns true if success, otherwise return failure.
//
private boolean createBlockOutputStream(DatanodeInfo[] nodes, long newGS,
boolean recoveryFlag) {
private boolean createBlockOutputStream(DatanodeInfo[] nodes,
StorageType[] nodeStorageTypes, long newGS, boolean recoveryFlag) {
if (nodes.length == 0) {
DFSClient.LOG.info("nodes are empty for write pipeline of block "
+ block);
@ -1332,9 +1338,10 @@ private boolean createBlockOutputStream(DatanodeInfo[] nodes, long newGS,
// Xmit header info to datanode
//
BlockConstructionStage bcs = recoveryFlag? stage.getRecoveryStage(): stage;
// send the request
new Sender(out).writeBlock(block, accessToken, dfsClient.clientName,
nodes, null, recoveryFlag? stage.getRecoveryStage() : stage,
new Sender(out).writeBlock(block, nodeStorageTypes[0], accessToken,
dfsClient.clientName, nodes, nodeStorageTypes, null, bcs,
nodes.length, block.getNumBytes(), bytesSent, newGS, checksum,
cachingStrategy.get());
@ -2197,4 +2204,9 @@ ExtendedBlock getBlock() {
public long getFileId() {
return fileId;
}
private static <T> void arraycopy(T[] srcs, T[] dsts, int skipIndex) {
System.arraycopy(srcs, 0, dsts, 0, skipIndex);
System.arraycopy(srcs, skipIndex+1, dsts, skipIndex, dsts.length-skipIndex);
}
}

View File

@ -23,6 +23,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
@ -71,11 +72,20 @@ public void readBlock(final ExtendedBlock blk,
/**
* Write a block to a datanode pipeline.
*
* The receiver datanode of this call is the next datanode in the pipeline.
* The other downstream datanodes are specified by the targets parameter.
* Note that the receiver {@link DatanodeInfo} is not required in the
* parameter list since the receiver datanode knows its info. However, the
* {@link StorageType} for storing the replica in the receiver datanode is a
* parameter since the receiver datanode may support multiple storage types.
*
* @param blk the block being written.
* @param storageType for storing the replica in the receiver datanode.
* @param blockToken security token for accessing the block.
* @param clientName client's name.
* @param targets target datanodes in the pipeline.
* @param targets other downstream datanodes in the pipeline.
* @param targetStorageTypes target {@link StorageType}s corresponding
* to the target datanodes.
* @param source source datanode.
* @param stage pipeline stage.
* @param pipelineSize the size of the pipeline.
@ -84,9 +94,11 @@ public void readBlock(final ExtendedBlock blk,
* @param latestGenerationStamp the latest generation stamp of the block.
*/
public void writeBlock(final ExtendedBlock blk,
final StorageType storageType,
final Token<BlockTokenIdentifier> blockToken,
final String clientName,
final DatanodeInfo[] targets,
final StorageType[] targetStorageTypes,
final DatanodeInfo source,
final BlockConstructionStage stage,
final int pipelineSize,
@ -110,7 +122,8 @@ public void writeBlock(final ExtendedBlock blk,
public void transferBlock(final ExtendedBlock blk,
final Token<BlockTokenIdentifier> blockToken,
final String clientName,
final DatanodeInfo[] targets) throws IOException;
final DatanodeInfo[] targets,
final StorageType[] targetStorageTypes) throws IOException;
/**
* Request short circuit access file descriptors from a DataNode.
@ -148,11 +161,13 @@ public void requestShortCircuitFds(final ExtendedBlock blk,
* It is used for balancing purpose.
*
* @param blk the block being replaced.
* @param storageType the {@link StorageType} for storing the block.
* @param blockToken security token for accessing the block.
* @param delHint the hint for deleting the block in the original datanode.
* @param source the source datanode for receiving the block.
*/
public void replaceBlock(final ExtendedBlock blk,
final StorageType storageType,
final Token<BlockTokenIdentifier> blockToken,
final String delHint,
final DatanodeInfo source) throws IOException;

View File

@ -25,6 +25,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto;
@ -121,10 +122,13 @@ private void opReadBlock() throws IOException {
/** Receive OP_WRITE_BLOCK */
private void opWriteBlock(DataInputStream in) throws IOException {
final OpWriteBlockProto proto = OpWriteBlockProto.parseFrom(vintPrefixed(in));
final DatanodeInfo[] targets = PBHelper.convert(proto.getTargetsList());
writeBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()),
PBHelper.convertStorageType(proto.getStorageType()),
PBHelper.convert(proto.getHeader().getBaseHeader().getToken()),
proto.getHeader().getClientName(),
PBHelper.convert(proto.getTargetsList()),
targets,
PBHelper.convertStorageTypes(proto.getTargetStorageTypesList(), targets.length),
PBHelper.convert(proto.getSource()),
fromProto(proto.getStage()),
proto.getPipelineSize(),
@ -140,10 +144,12 @@ private void opWriteBlock(DataInputStream in) throws IOException {
private void opTransferBlock(DataInputStream in) throws IOException {
final OpTransferBlockProto proto =
OpTransferBlockProto.parseFrom(vintPrefixed(in));
final DatanodeInfo[] targets = PBHelper.convert(proto.getTargetsList());
transferBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()),
PBHelper.convert(proto.getHeader().getBaseHeader().getToken()),
proto.getHeader().getClientName(),
PBHelper.convert(proto.getTargetsList()));
targets,
PBHelper.convertStorageTypes(proto.getTargetStorageTypesList(), targets.length));
}
/** Receive {@link Op#REQUEST_SHORT_CIRCUIT_FDS} */
@ -176,6 +182,7 @@ private void opRequestShortCircuitShm(DataInputStream in) throws IOException {
private void opReplaceBlock(DataInputStream in) throws IOException {
OpReplaceBlockProto proto = OpReplaceBlockProto.parseFrom(vintPrefixed(in));
replaceBlock(PBHelper.convert(proto.getHeader().getBlock()),
PBHelper.convertStorageType(proto.getStorageType()),
PBHelper.convert(proto.getHeader().getToken()),
proto.getDelHint(),
PBHelper.convert(proto.getSource()));

View File

@ -25,6 +25,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
@ -111,9 +112,11 @@ public void readBlock(final ExtendedBlock blk,
@Override
public void writeBlock(final ExtendedBlock blk,
final StorageType storageType,
final Token<BlockTokenIdentifier> blockToken,
final String clientName,
final DatanodeInfo[] targets,
final StorageType[] targetStorageTypes,
final DatanodeInfo source,
final BlockConstructionStage stage,
final int pipelineSize,
@ -130,7 +133,9 @@ public void writeBlock(final ExtendedBlock blk,
OpWriteBlockProto.Builder proto = OpWriteBlockProto.newBuilder()
.setHeader(header)
.setStorageType(PBHelper.convertStorageType(storageType))
.addAllTargets(PBHelper.convert(targets, 1))
.addAllTargetStorageTypes(PBHelper.convertStorageTypes(targetStorageTypes, 1))
.setStage(toProto(stage))
.setPipelineSize(pipelineSize)
.setMinBytesRcvd(minBytesRcvd)
@ -150,12 +155,14 @@ public void writeBlock(final ExtendedBlock blk,
public void transferBlock(final ExtendedBlock blk,
final Token<BlockTokenIdentifier> blockToken,
final String clientName,
final DatanodeInfo[] targets) throws IOException {
final DatanodeInfo[] targets,
final StorageType[] targetStorageTypes) throws IOException {
OpTransferBlockProto proto = OpTransferBlockProto.newBuilder()
.setHeader(DataTransferProtoUtil.buildClientHeader(
blk, clientName, blockToken))
.addAllTargets(PBHelper.convert(targets))
.addAllTargetStorageTypes(PBHelper.convertStorageTypes(targetStorageTypes))
.build();
send(out, Op.TRANSFER_BLOCK, proto);
@ -196,11 +203,13 @@ public void requestShortCircuitShm(String clientName) throws IOException {
@Override
public void replaceBlock(final ExtendedBlock blk,
final StorageType storageType,
final Token<BlockTokenIdentifier> blockToken,
final String delHint,
final DatanodeInfo source) throws IOException {
OpReplaceBlockProto proto = OpReplaceBlockProto.newBuilder()
.setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken))
.setStorageType(PBHelper.convertStorageType(storageType))
.setDelHint(delHint)
.setSource(PBHelper.convertDatanodeInfo(source))
.build();

View File

@ -150,6 +150,7 @@
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.SnapshottableDirectoryStatusProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypeProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypesProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageUuidsProto;
import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.GetXAttrsResponseProto;
@ -674,14 +675,8 @@ public static LocatedBlock convert(LocatedBlockProto proto) {
targets[i] = PBHelper.convert(locs.get(i));
}
final int storageTypesCount = proto.getStorageTypesCount();
final StorageType[] storageTypes;
if (storageTypesCount == 0) {
storageTypes = null;
} else {
Preconditions.checkState(storageTypesCount == locs.size());
storageTypes = convertStorageTypeProtos(proto.getStorageTypesList());
}
final StorageType[] storageTypes = convertStorageTypes(
proto.getStorageTypesList(), locs.size());
final int storageIDsCount = proto.getStorageIDsCount();
final String[] storageIDs;
@ -969,6 +964,20 @@ public static BlockCommand convert(BlockCommandProto blkCmd) {
targets[i] = PBHelper.convert(targetList.get(i));
}
StorageType[][] targetStorageTypes = new StorageType[targetList.size()][];
List<StorageTypesProto> targetStorageTypesList = blkCmd.getTargetStorageTypesList();
if (targetStorageTypesList.isEmpty()) { // missing storage types
for(int i = 0; i < targetStorageTypes.length; i++) {
targetStorageTypes[i] = new StorageType[targets[i].length];
Arrays.fill(targetStorageTypes[i], StorageType.DEFAULT);
}
} else {
for(int i = 0; i < targetStorageTypes.length; i++) {
List<StorageTypeProto> p = targetStorageTypesList.get(i).getStorageTypesList();
targetStorageTypes[i] = p.toArray(new StorageType[p.size()]);
}
}
List<StorageUuidsProto> targetStorageUuidsList = blkCmd.getTargetStorageUuidsList();
String[][] targetStorageIDs = new String[targetStorageUuidsList.size()][];
for(int i = 0; i < targetStorageIDs.length; i++) {
@ -991,7 +1000,7 @@ public static BlockCommand convert(BlockCommandProto blkCmd) {
throw new AssertionError("Unknown action type: " + blkCmd.getAction());
}
return new BlockCommand(action, blkCmd.getBlockPoolId(), blocks, targets,
targetStorageIDs);
targetStorageTypes, targetStorageIDs);
}
public static BlockIdCommand convert(BlockIdCommandProto blkIdCmd) {
@ -1605,8 +1614,25 @@ private static StorageState convertState(State state) {
}
}
private static StorageTypeProto convertStorageType(
StorageType type) {
public static List<StorageTypeProto> convertStorageTypes(
StorageType[] types) {
return convertStorageTypes(types, 0);
}
public static List<StorageTypeProto> convertStorageTypes(
StorageType[] types, int startIdx) {
if (types == null) {
return null;
}
final List<StorageTypeProto> protos = new ArrayList<StorageTypeProto>(
types.length);
for (int i = startIdx; i < types.length; ++i) {
protos.add(convertStorageType(types[i]));
}
return protos;
}
public static StorageTypeProto convertStorageType(StorageType type) {
switch(type) {
case DISK:
return StorageTypeProto.DISK;
@ -1623,7 +1649,7 @@ private static StorageTypeProto convertStorageType(
public static DatanodeStorage convert(DatanodeStorageProto s) {
return new DatanodeStorage(s.getStorageUuid(),
PBHelper.convertState(s.getState()),
PBHelper.convertType(s.getStorageType()));
PBHelper.convertStorageType(s.getStorageType()));
}
private static State convertState(StorageState state) {
@ -1636,7 +1662,7 @@ private static State convertState(StorageState state) {
}
}
private static StorageType convertType(StorageTypeProto type) {
public static StorageType convertStorageType(StorageTypeProto type) {
switch(type) {
case DISK:
return StorageType.DISK;
@ -1650,11 +1676,16 @@ private static StorageType convertType(StorageTypeProto type) {
}
}
private static StorageType[] convertStorageTypeProtos(
List<StorageTypeProto> storageTypesList) {
final StorageType[] storageTypes = new StorageType[storageTypesList.size()];
for (int i = 0; i < storageTypes.length; ++i) {
storageTypes[i] = PBHelper.convertType(storageTypesList.get(i));
public static StorageType[] convertStorageTypes(
List<StorageTypeProto> storageTypesList, int expectedSize) {
final StorageType[] storageTypes = new StorageType[expectedSize];
if (storageTypesList.size() != expectedSize) { // missing storage types
Preconditions.checkState(storageTypesList.isEmpty());
Arrays.fill(storageTypes, StorageType.DEFAULT);
} else {
for (int i = 0; i < storageTypes.length; ++i) {
storageTypes[i] = convertStorageType(storageTypesList.get(i));
}
}
return storageTypes;
}

View File

@ -59,6 +59,7 @@
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@ -368,7 +369,7 @@ private void dispatch() {
in = new DataInputStream(new BufferedInputStream(unbufIn,
HdfsConstants.IO_FILE_BUFFER_SIZE));
sendRequest(out, eb, accessToken);
sendRequest(out, eb, StorageType.DEFAULT, accessToken);
receiveResponse(in);
bytesMoved.addAndGet(block.getNumBytes());
LOG.info("Successfully moved " + this);
@ -400,8 +401,9 @@ private void dispatch() {
/* Send a block replace request to the output stream*/
private void sendRequest(DataOutputStream out, ExtendedBlock eb,
StorageType storageType,
Token<BlockTokenIdentifier> accessToken) throws IOException {
new Sender(out).replaceBlock(eb, accessToken,
new Sender(out).replaceBlock(eb, storageType, accessToken,
source.getStorageID(), proxySource.getDatanode());
}

View File

@ -575,7 +575,8 @@ private boolean processCommandFromActive(DatanodeCommand cmd,
switch(cmd.getAction()) {
case DatanodeProtocol.DNA_TRANSFER:
// Send a copy of a block to another datanode
dn.transferBlocks(bcmd.getBlockPoolId(), bcmd.getBlocks(), bcmd.getTargets());
dn.transferBlocks(bcmd.getBlockPoolId(), bcmd.getBlocks(),
bcmd.getTargets(), bcmd.getTargetStorageTypes());
dn.metrics.incrBlocksReplicated(bcmd.getBlocks().length);
break;
case DatanodeProtocol.DNA_INVALIDATE:

View File

@ -37,6 +37,7 @@
import org.apache.commons.logging.Log;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FSOutputSummer;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@ -122,7 +123,8 @@ class BlockReceiver implements Closeable {
private boolean syncOnClose;
private long restartBudget;
BlockReceiver(final ExtendedBlock block, final DataInputStream in,
BlockReceiver(final ExtendedBlock block, final StorageType storageType,
final DataInputStream in,
final String inAddr, final String myAddr,
final BlockConstructionStage stage,
final long newGs, final long minBytesRcvd, final long maxBytesRcvd,
@ -162,11 +164,11 @@ class BlockReceiver implements Closeable {
// Open local disk out
//
if (isDatanode) { //replication or move
replicaInfo = datanode.data.createTemporary(block);
replicaInfo = datanode.data.createTemporary(storageType, block);
} else {
switch (stage) {
case PIPELINE_SETUP_CREATE:
replicaInfo = datanode.data.createRbw(block);
replicaInfo = datanode.data.createRbw(storageType, block);
datanode.notifyNamenodeReceivingBlock(
block, replicaInfo.getStorageUuid());
break;
@ -198,7 +200,7 @@ class BlockReceiver implements Closeable {
case TRANSFER_RBW:
case TRANSFER_FINALIZED:
// this is a transfer destination
replicaInfo = datanode.data.createTemporary(block);
replicaInfo = datanode.data.createTemporary(storageType, block);
break;
default: throw new IOException("Unsupported stage " + stage +
" while receiving block " + block + " from " + inAddr);

View File

@ -19,11 +19,66 @@
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ADMIN;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_INTERFACE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_INTERFACE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_NAMESERVER_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_NAMESERVER_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PLUGINS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_KEY;
import static org.apache.hadoop.util.ExitUtil.terminate;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.protobuf.BlockingService;
import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PrintStream;
import java.lang.management.ManagementFactory;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.URI;
import java.net.UnknownHostException;
import java.nio.channels.SocketChannel;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import javax.management.ObjectName;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -39,10 +94,23 @@
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HDFSPolicyProvider;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.net.DomainPeerServer;
import org.apache.hadoop.hdfs.net.TcpPeerServer;
import org.apache.hadoop.hdfs.protocol.*;
import org.apache.hadoop.hdfs.protocol.datatransfer.*;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DatanodeLocalInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferServer;
@ -50,9 +118,20 @@
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DNTransferAckProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.protocol.proto.InterDatanodeProtocolProtos.InterDatanodeProtocolService;
import org.apache.hadoop.hdfs.protocolPB.*;
import org.apache.hadoop.hdfs.security.token.block.*;
import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
@ -65,7 +144,11 @@
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
import org.apache.hadoop.hdfs.server.datanode.web.resources.DatanodeWebHdfsMethods;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.*;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
import org.apache.hadoop.hdfs.web.resources.Param;
import org.apache.hadoop.http.HttpConfig;
@ -88,22 +171,21 @@
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.*;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.JvmPauseMonitor;
import org.apache.hadoop.util.ServicePlugin;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.VersionInfo;
import org.mortbay.util.ajax.JSON;
import javax.management.ObjectName;
import java.io.*;
import java.lang.management.ManagementFactory;
import java.net.*;
import java.nio.channels.SocketChannel;
import java.security.PrivilegedExceptionAction;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
import static org.apache.hadoop.util.ExitUtil.terminate;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.protobuf.BlockingService;
/**********************************************************
* DataNode is a class (and program) that stores a set of
@ -1475,8 +1557,8 @@ int getXmitsInProgress() {
return xmitsInProgress.get();
}
private void transferBlock(ExtendedBlock block, DatanodeInfo xferTargets[])
throws IOException {
private void transferBlock(ExtendedBlock block, DatanodeInfo[] xferTargets,
StorageType[] xferTargetStorageTypes) throws IOException {
BPOfferService bpos = getBPOSForBlock(block);
DatanodeRegistration bpReg = getDNRegistrationForBP(block.getBlockPoolId());
@ -1512,16 +1594,17 @@ private void transferBlock(ExtendedBlock block, DatanodeInfo xferTargets[])
LOG.info(bpReg + " Starting thread to transfer " +
block + " to " + xfersBuilder);
new Daemon(new DataTransfer(xferTargets, block,
new Daemon(new DataTransfer(xferTargets, xferTargetStorageTypes, block,
BlockConstructionStage.PIPELINE_SETUP_CREATE, "")).start();
}
}
void transferBlocks(String poolId, Block blocks[],
DatanodeInfo xferTargets[][]) {
DatanodeInfo xferTargets[][], StorageType[][] xferTargetStorageTypes) {
for (int i = 0; i < blocks.length; i++) {
try {
transferBlock(new ExtendedBlock(poolId, blocks[i]), xferTargets[i]);
transferBlock(new ExtendedBlock(poolId, blocks[i]), xferTargets[i],
xferTargetStorageTypes[i]);
} catch (IOException ie) {
LOG.warn("Failed to transfer block " + blocks[i], ie);
}
@ -1624,6 +1707,7 @@ CHECKSUM_SIZE depends on CHECKSUM_TYPE (usually, 4 for CRC32)
*/
private class DataTransfer implements Runnable {
final DatanodeInfo[] targets;
final StorageType[] targetStorageTypes;
final ExtendedBlock b;
final BlockConstructionStage stage;
final private DatanodeRegistration bpReg;
@ -1634,7 +1718,8 @@ private class DataTransfer implements Runnable {
* Connect to the first item in the target list. Pass along the
* entire target list, the block, and the data.
*/
DataTransfer(DatanodeInfo targets[], ExtendedBlock b, BlockConstructionStage stage,
DataTransfer(DatanodeInfo targets[], StorageType[] targetStorageTypes,
ExtendedBlock b, BlockConstructionStage stage,
final String clientname) {
if (DataTransferProtocol.LOG.isDebugEnabled()) {
DataTransferProtocol.LOG.debug(getClass().getSimpleName() + ": "
@ -1644,6 +1729,7 @@ private class DataTransfer implements Runnable {
+ ", targests=" + Arrays.asList(targets));
}
this.targets = targets;
this.targetStorageTypes = targetStorageTypes;
this.b = b;
this.stage = stage;
BPOfferService bpos = blockPoolManager.get(b.getBlockPoolId());
@ -1702,7 +1788,8 @@ public void run() {
false, false, true, DataNode.this, null, cachingStrategy);
DatanodeInfo srcNode = new DatanodeInfo(bpReg);
new Sender(out).writeBlock(b, accessToken, clientname, targets, srcNode,
new Sender(out).writeBlock(b, targetStorageTypes[0], accessToken,
clientname, targets, targetStorageTypes, srcNode,
stage, 0, 0, 0, 0, blockSender.getChecksum(), cachingStrategy);
// send data & checksum
@ -2403,7 +2490,8 @@ private void checkReadAccess(final ExtendedBlock block) throws IOException {
* @param client client name
*/
void transferReplicaForPipelineRecovery(final ExtendedBlock b,
final DatanodeInfo[] targets, final String client) throws IOException {
final DatanodeInfo[] targets, final StorageType[] targetStorageTypes,
final String client) throws IOException {
final long storedGS;
final long visible;
final BlockConstructionStage stage;
@ -2436,7 +2524,7 @@ void transferReplicaForPipelineRecovery(final ExtendedBlock b,
b.setNumBytes(visible);
if (targets.length > 0) {
new DataTransfer(targets, b, stage, client).run();
new DataTransfer(targets, targetStorageTypes, b, stage, client).run();
}
}

View File

@ -45,6 +45,7 @@
import org.apache.commons.logging.Log;
import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@ -524,9 +525,11 @@ public void readBlock(final ExtendedBlock block,
@Override
public void writeBlock(final ExtendedBlock block,
final StorageType storageType,
final Token<BlockTokenIdentifier> blockToken,
final String clientname,
final DatanodeInfo[] targets,
final StorageType[] targetStorageTypes,
final DatanodeInfo srcDataNode,
final BlockConstructionStage stage,
final int pipelineSize,
@ -590,12 +593,13 @@ public void writeBlock(final ExtendedBlock block,
if (isDatanode ||
stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
// open a block receiver
blockReceiver = new BlockReceiver(block, in,
blockReceiver = new BlockReceiver(block, storageType, in,
peer.getRemoteAddressString(),
peer.getLocalAddressString(),
stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
clientname, srcDataNode, datanode, requestedChecksum,
cachingStrategy);
storageUuid = blockReceiver.getStorageUuid();
} else {
storageUuid = datanode.data.recoverClose(
@ -636,10 +640,10 @@ public void writeBlock(final ExtendedBlock block,
HdfsConstants.SMALL_BUFFER_SIZE));
mirrorIn = new DataInputStream(unbufMirrorIn);
new Sender(mirrorOut).writeBlock(originalBlock, blockToken,
clientname, targets, srcDataNode, stage, pipelineSize,
minBytesRcvd, maxBytesRcvd, latestGenerationStamp, requestedChecksum,
cachingStrategy);
new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
blockToken, clientname, targets, targetStorageTypes, srcDataNode,
stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
latestGenerationStamp, requestedChecksum, cachingStrategy);
mirrorOut.flush();
@ -754,7 +758,8 @@ public void writeBlock(final ExtendedBlock block,
public void transferBlock(final ExtendedBlock blk,
final Token<BlockTokenIdentifier> blockToken,
final String clientName,
final DatanodeInfo[] targets) throws IOException {
final DatanodeInfo[] targets,
final StorageType[] targetStorageTypes) throws IOException {
checkAccess(socketOut, true, blk, blockToken,
Op.TRANSFER_BLOCK, BlockTokenSecretManager.AccessMode.COPY);
previousOpClientName = clientName;
@ -763,7 +768,8 @@ public void transferBlock(final ExtendedBlock blk,
final DataOutputStream out = new DataOutputStream(
getOutputStream());
try {
datanode.transferReplicaForPipelineRecovery(blk, targets, clientName);
datanode.transferReplicaForPipelineRecovery(blk, targets,
targetStorageTypes, clientName);
writeResponse(Status.SUCCESS, null, out);
} finally {
IOUtils.closeStream(out);
@ -941,6 +947,7 @@ public void copyBlock(final ExtendedBlock block,
@Override
public void replaceBlock(final ExtendedBlock block,
final StorageType storageType,
final Token<BlockTokenIdentifier> blockToken,
final String delHint,
final DatanodeInfo proxySource) throws IOException {
@ -1026,8 +1033,8 @@ public void replaceBlock(final ExtendedBlock block,
DataChecksum remoteChecksum = DataTransferProtoUtil.fromProto(
checksumInfo.getChecksum());
// open a block receiver and check if the block does not exist
blockReceiver = new BlockReceiver(
block, proxyReply, proxySock.getRemoteSocketAddress().toString(),
blockReceiver = new BlockReceiver(block, storageType,
proxyReply, proxySock.getRemoteSocketAddress().toString(),
proxySock.getLocalSocketAddress().toString(),
null, 0, 0, 0, "", null, datanode, remoteChecksum,
CachingStrategy.newDropBehind());

View File

@ -28,6 +28,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
@ -176,8 +177,8 @@ public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b, long blkoff,
* @return the meta info of the replica which is being written to
* @throws IOException if an error occurs
*/
public ReplicaInPipelineInterface createTemporary(ExtendedBlock b
) throws IOException;
public ReplicaInPipelineInterface createTemporary(StorageType storageType,
ExtendedBlock b) throws IOException;
/**
* Creates a RBW replica and returns the meta info of the replica
@ -186,8 +187,8 @@ public ReplicaInPipelineInterface createTemporary(ExtendedBlock b
* @return the meta info of the replica which is being written to
* @throws IOException if an error occurs
*/
public ReplicaInPipelineInterface createRbw(ExtendedBlock b
) throws IOException;
public ReplicaInPipelineInterface createRbw(StorageType storageType,
ExtendedBlock b) throws IOException;
/**
* Recovers a RBW replica and returns the meta info of the replica

View File

@ -17,6 +17,28 @@
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import java.io.File;
import java.io.FileDescriptor;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import javax.management.NotCompliantMBeanException;
import javax.management.ObjectName;
import javax.management.StandardMBean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@ -24,12 +46,37 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.*;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.datanode.*;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.*;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.server.datanode.DataBlockScanner;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
import org.apache.hadoop.hdfs.server.datanode.Replica;
import org.apache.hadoop.hdfs.server.datanode.ReplicaAlreadyExistsException;
import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery;
import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
@ -43,15 +90,6 @@
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Time;
import javax.management.NotCompliantMBeanException;
import javax.management.ObjectName;
import javax.management.StandardMBean;
import java.io.*;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.*;
import java.util.concurrent.Executor;
/**************************************************
* FSDataset manages a set of data blocks. Each block
* has a unique name and an extent on disk.
@ -736,8 +774,8 @@ private void bumpReplicaGS(ReplicaInfo replicaInfo,
}
@Override // FsDatasetSpi
public synchronized ReplicaInPipeline createRbw(ExtendedBlock b)
throws IOException {
public synchronized ReplicaInPipeline createRbw(StorageType storageType,
ExtendedBlock b) throws IOException {
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
b.getBlockId());
if (replicaInfo != null) {
@ -746,7 +784,7 @@ public synchronized ReplicaInPipeline createRbw(ExtendedBlock b)
" and thus cannot be created.");
}
// create a new block
FsVolumeImpl v = volumes.getNextVolume(b.getNumBytes());
FsVolumeImpl v = volumes.getNextVolume(storageType, b.getNumBytes());
// create a rbw file to hold block in the designated volume
File f = v.createRbwFile(b.getBlockPoolId(), b.getLocalBlock());
ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(),
@ -874,8 +912,8 @@ public synchronized ReplicaInPipeline convertTemporaryToRbw(
}
@Override // FsDatasetSpi
public synchronized ReplicaInPipeline createTemporary(ExtendedBlock b)
throws IOException {
public synchronized ReplicaInPipeline createTemporary(StorageType storageType,
ExtendedBlock b) throws IOException {
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), b.getBlockId());
if (replicaInfo != null) {
throw new ReplicaAlreadyExistsException("Block " + b +
@ -883,7 +921,7 @@ public synchronized ReplicaInPipeline createTemporary(ExtendedBlock b)
" and thus cannot be created.");
}
FsVolumeImpl v = volumes.getNextVolume(b.getNumBytes());
FsVolumeImpl v = volumes.getNextVolume(storageType, b.getNumBytes());
// create a temporary file to hold block in the designated volume
File f = v.createTmpFile(b.getBlockPoolId(), b.getLocalBlock());
ReplicaInPipeline newReplicaInfo = new ReplicaInPipeline(b.getBlockId(),

View File

@ -18,13 +18,17 @@
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import java.io.IOException;
import java.util.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.Time;
class FsVolumeList {
/**
@ -52,11 +56,18 @@ int numberOfFailedVolumes() {
* by a single thread and next volume is chosen with no concurrent
* update to {@link #volumes}.
* @param blockSize free space needed on the volume
* @param storageType the desired {@link StorageType}
* @return next volume to store the block in.
*/
// TODO should choose volume with storage type
synchronized FsVolumeImpl getNextVolume(long blockSize) throws IOException {
return blockChooser.chooseVolume(volumes, blockSize);
synchronized FsVolumeImpl getNextVolume(StorageType storageType,
long blockSize) throws IOException {
final List<FsVolumeImpl> list = new ArrayList<FsVolumeImpl>(volumes.size());
for(FsVolumeImpl v : volumes) {
if (v.getStorageType() == storageType) {
list.add(v);
}
}
return blockChooser.chooseVolume(list, blockSize);
}
long getDfsUsed() throws IOException {

View File

@ -21,6 +21,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
@ -50,6 +51,7 @@ public class BlockCommand extends DatanodeCommand {
final String poolId;
final Block[] blocks;
final DatanodeInfo[][] targets;
final StorageType[][] targetStorageTypes;
final String[][] targetStorageIDs;
/**
@ -62,17 +64,20 @@ public BlockCommand(int action, String poolId,
this.poolId = poolId;
blocks = new Block[blocktargetlist.size()];
targets = new DatanodeInfo[blocks.length][];
targetStorageTypes = new StorageType[blocks.length][];
targetStorageIDs = new String[blocks.length][];
for(int i = 0; i < blocks.length; i++) {
BlockTargetPair p = blocktargetlist.get(i);
blocks[i] = p.block;
targets[i] = DatanodeStorageInfo.toDatanodeInfos(p.targets);
targetStorageTypes[i] = DatanodeStorageInfo.toStorageTypes(p.targets);
targetStorageIDs[i] = DatanodeStorageInfo.toStorageIDs(p.targets);
}
}
private static final DatanodeInfo[][] EMPTY_TARGET_DATANODES = {};
private static final StorageType[][] EMPTY_TARGET_STORAGE_TYPES = {};
private static final String[][] EMPTY_TARGET_STORAGEIDS = {};
/**
@ -81,7 +86,7 @@ public BlockCommand(int action, String poolId,
*/
public BlockCommand(int action, String poolId, Block blocks[]) {
this(action, poolId, blocks, EMPTY_TARGET_DATANODES,
EMPTY_TARGET_STORAGEIDS);
EMPTY_TARGET_STORAGE_TYPES, EMPTY_TARGET_STORAGEIDS);
}
/**
@ -89,11 +94,13 @@ public BlockCommand(int action, String poolId, Block blocks[]) {
* @param blocks blocks related to the action
*/
public BlockCommand(int action, String poolId, Block[] blocks,
DatanodeInfo[][] targets, String[][] targetStorageIDs) {
DatanodeInfo[][] targets, StorageType[][] targetStorageTypes,
String[][] targetStorageIDs) {
super(action);
this.poolId = poolId;
this.blocks = blocks;
this.targets = targets;
this.targetStorageTypes = targetStorageTypes;
this.targetStorageIDs = targetStorageIDs;
}
@ -109,6 +116,10 @@ public DatanodeInfo[][] getTargets() {
return targets;
}
public StorageType[][] getTargetStorageTypes() {
return targetStorageTypes;
}
public String[][] getTargetStorageIDs() {
return targetStorageIDs;
}

View File

@ -113,6 +113,7 @@ message BlockCommandProto {
repeated BlockProto blocks = 3;
repeated DatanodeInfosProto targets = 4;
repeated StorageUuidsProto targetStorageUuids = 5;
repeated StorageTypesProto targetStorageTypes = 6;
}
/**

View File

@ -107,17 +107,21 @@ message OpWriteBlockProto {
*/
required ChecksumProto requestedChecksum = 9;
optional CachingStrategyProto cachingStrategy = 10;
optional StorageTypeProto storageType = 11 [default = DISK];
repeated StorageTypeProto targetStorageTypes = 12;
}
message OpTransferBlockProto {
required ClientOperationHeaderProto header = 1;
repeated DatanodeInfoProto targets = 2;
repeated StorageTypeProto targetStorageTypes = 3;
}
message OpReplaceBlockProto {
required BaseHeaderProto header = 1;
required string delHint = 2;
required DatanodeInfoProto source = 3;
optional StorageTypeProto storageType = 4 [default = DISK];
}
message OpCopyBlockProto {

View File

@ -137,6 +137,13 @@ enum StorageTypeProto {
ARCHIVE = 3;
}
/**
* A list of storage types.
*/
message StorageTypesProto {
repeated StorageTypeProto storageTypes = 1;
}
/**
* A list of storage IDs.
*/

View File

@ -380,7 +380,7 @@ public static boolean allBlockReplicasCorrupt(MiniDFSCluster cluster,
*/
public static void waitForReplication(MiniDFSCluster cluster, ExtendedBlock b,
int racks, int replicas, int neededReplicas)
throws IOException, TimeoutException, InterruptedException {
throws TimeoutException, InterruptedException {
int curRacks = 0;
int curReplicas = 0;
int curNeededReplicas = 0;
@ -414,7 +414,7 @@ public static void waitForReplication(MiniDFSCluster cluster, ExtendedBlock b,
*/
public static void waitCorruptReplicas(FileSystem fs, FSNamesystem ns,
Path file, ExtendedBlock b, int corruptRepls)
throws IOException, TimeoutException, InterruptedException {
throws TimeoutException, InterruptedException {
int count = 0;
final int ATTEMPTS = 50;
int repls = ns.getBlockManager().numCorruptReplicas(b.getLocalBlock());
@ -839,7 +839,8 @@ public static BlockOpResponseProto transferRbw(final ExtendedBlock b,
// send the request
new Sender(out).transferBlock(b, new Token<BlockTokenIdentifier>(),
dfsClient.clientName, new DatanodeInfo[]{datanodes[1]});
dfsClient.clientName, new DatanodeInfo[]{datanodes[1]},
new StorageType[]{StorageType.DEFAULT});
out.flush();
return BlockOpResponseProto.parseDelimitedFrom(in);

View File

@ -125,17 +125,16 @@ private void sendRecvData(String testDescription,
throw eof;
}
LOG.info("Received: " +new String(retBuf));
LOG.info("Expected: " + StringUtils.byteToHexString(recvBuf.toByteArray()));
String received = StringUtils.byteToHexString(retBuf);
String expected = StringUtils.byteToHexString(recvBuf.toByteArray());
LOG.info("Received: " + received);
LOG.info("Expected: " + expected);
if (eofExpected) {
throw new IOException("Did not recieve IOException when an exception " +
"is expected while reading from " + datanode);
}
byte[] needed = recvBuf.toByteArray();
assertEquals(StringUtils.byteToHexString(needed),
StringUtils.byteToHexString(retBuf));
assertEquals(expected, received);
} finally {
IOUtils.closeSocket(sock);
}
@ -184,10 +183,7 @@ private void testWrite(ExtendedBlock block, BlockConstructionStage stage, long n
String description, Boolean eofExcepted) throws IOException {
sendBuf.reset();
recvBuf.reset();
sender.writeBlock(block, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
new DatanodeInfo[1], null, stage,
0, block.getNumBytes(), block.getNumBytes(), newGS,
DEFAULT_CHECKSUM, CachingStrategy.newDefaultStrategy());
writeBlock(block, stage, newGS, DEFAULT_CHECKSUM);
if (eofExcepted) {
sendResponse(Status.ERROR, null, null, recvOut);
sendRecvData(description, true);
@ -343,10 +339,7 @@ public void testDataTransferProtocol() throws IOException {
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build();
try {
cluster.waitActive();
DFSClient dfsClient = new DFSClient(
new InetSocketAddress("localhost", cluster.getNameNodePort()),
conf);
datanode = dfsClient.datanodeReport(DatanodeReportType.LIVE)[0];
datanode = cluster.getFileSystem().getDataNodeStats(DatanodeReportType.LIVE)[0];
dnAddr = NetUtils.createSocketAddr(datanode.getXferAddr());
FileSystem fileSys = cluster.getFileSystem();
@ -381,23 +374,14 @@ public void testDataTransferProtocol() throws IOException {
DataChecksum badChecksum = Mockito.spy(DEFAULT_CHECKSUM);
Mockito.doReturn(-1).when(badChecksum).getBytesPerChecksum();
sender.writeBlock(new ExtendedBlock(poolId, newBlockId),
BlockTokenSecretManager.DUMMY_TOKEN, "cl",
new DatanodeInfo[1], null,
BlockConstructionStage.PIPELINE_SETUP_CREATE,
0, 0L, 0L, 0L,
badChecksum, CachingStrategy.newDefaultStrategy());
writeBlock(poolId, newBlockId, badChecksum);
recvBuf.reset();
sendResponse(Status.ERROR, null, null, recvOut);
sendRecvData("wrong bytesPerChecksum while writing", true);
sendBuf.reset();
recvBuf.reset();
sender.writeBlock(new ExtendedBlock(poolId, ++newBlockId),
BlockTokenSecretManager.DUMMY_TOKEN, "cl",
new DatanodeInfo[1], null,
BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0L, 0L, 0L,
DEFAULT_CHECKSUM, CachingStrategy.newDefaultStrategy());
writeBlock(poolId, ++newBlockId, DEFAULT_CHECKSUM);
PacketHeader hdr = new PacketHeader(
4, // size of packet
@ -416,11 +400,7 @@ public void testDataTransferProtocol() throws IOException {
// test for writing a valid zero size block
sendBuf.reset();
recvBuf.reset();
sender.writeBlock(new ExtendedBlock(poolId, ++newBlockId),
BlockTokenSecretManager.DUMMY_TOKEN, "cl",
new DatanodeInfo[1], null,
BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0L, 0L, 0L,
DEFAULT_CHECKSUM, CachingStrategy.newDefaultStrategy());
writeBlock(poolId, ++newBlockId, DEFAULT_CHECKSUM);
hdr = new PacketHeader(
8, // size of packet
@ -532,4 +512,18 @@ public void testPacketHeader() throws IOException {
assertTrue(hdr.sanityCheck(99));
assertFalse(hdr.sanityCheck(100));
}
void writeBlock(String poolId, long blockId, DataChecksum checksum) throws IOException {
writeBlock(new ExtendedBlock(poolId, blockId),
BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, checksum);
}
void writeBlock(ExtendedBlock block, BlockConstructionStage stage,
long newGS, DataChecksum checksum) throws IOException {
sender.writeBlock(block, StorageType.DEFAULT,
BlockTokenSecretManager.DUMMY_TOKEN, "cl",
new DatanodeInfo[1], new StorageType[1], null, stage,
0, block.getNumBytes(), block.getNumBytes(), newGS,
checksum, CachingStrategy.newDefaultStrategy());
}
}

View File

@ -550,8 +550,10 @@ public void testConvertBlockCommand() {
dnInfos[1][0] = DFSTestUtil.getLocalDatanodeInfo();
dnInfos[1][1] = DFSTestUtil.getLocalDatanodeInfo();
String[][] storageIDs = {{"s00"}, {"s10", "s11"}};
StorageType[][] storageTypes = {{StorageType.DEFAULT},
{StorageType.DEFAULT, StorageType.DEFAULT}};
BlockCommand bc = new BlockCommand(DatanodeProtocol.DNA_TRANSFER, "bp1",
blocks, dnInfos, storageIDs);
blocks, dnInfos, storageTypes, storageIDs);
BlockCommandProto bcProto = PBHelper.convert(bc);
BlockCommand bc2 = PBHelper.convert(bcProto);
assertEquals(bc.getAction(), bc2.getAction());

View File

@ -43,6 +43,7 @@
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@ -324,7 +325,7 @@ public void blockReport_02() throws IOException {
public void blockReport_03() throws IOException {
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path filePath = new Path("/" + METHOD_NAME + ".dat");
ArrayList<Block> blocks = writeFile(METHOD_NAME, FILE_SIZE, filePath);
writeFile(METHOD_NAME, FILE_SIZE, filePath);
// all blocks belong to the same file, hence same BP
DataNode dn = cluster.getDataNodes().get(DN_N0);
@ -363,7 +364,7 @@ public void blockReport_04() throws IOException {
// Create a bogus new block which will not be present on the namenode.
ExtendedBlock b = new ExtendedBlock(
poolId, rand.nextLong(), 1024L, rand.nextLong());
dn.getFSDataset().createRbw(b);
dn.getFSDataset().createRbw(StorageType.DEFAULT, b);
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false);

View File

@ -744,14 +744,14 @@ public synchronized ReplicaInPipelineInterface recoverRbw(ExtendedBlock b,
}
@Override // FsDatasetSpi
public synchronized ReplicaInPipelineInterface createRbw(ExtendedBlock b)
throws IOException {
return createTemporary(b);
public synchronized ReplicaInPipelineInterface createRbw(
StorageType storageType, ExtendedBlock b) throws IOException {
return createTemporary(storageType, b);
}
@Override // FsDatasetSpi
public synchronized ReplicaInPipelineInterface createTemporary(ExtendedBlock b)
throws IOException {
public synchronized ReplicaInPipelineInterface createTemporary(
StorageType storageType, ExtendedBlock b) throws IOException {
if (isValidBlock(b)) {
throw new ReplicaAlreadyExistsException("Block " + b +
" is valid, and cannot be written to.");

View File

@ -57,6 +57,7 @@
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@ -531,7 +532,7 @@ public void testNoReplicaUnderRecovery() throws IOException {
if(LOG.isDebugEnabled()) {
LOG.debug("Running " + GenericTestUtils.getMethodName());
}
dn.data.createRbw(block);
dn.data.createRbw(StorageType.DEFAULT, block);
try {
dn.syncBlock(rBlock, initBlockRecords(dn));
fail("Sync should fail");
@ -554,7 +555,8 @@ public void testNotMatchedReplicaID() throws IOException {
if(LOG.isDebugEnabled()) {
LOG.debug("Running " + GenericTestUtils.getMethodName());
}
ReplicaInPipelineInterface replicaInfo = dn.data.createRbw(block);
ReplicaInPipelineInterface replicaInfo = dn.data.createRbw(
StorageType.DEFAULT, block);
ReplicaOutputStreams streams = null;
try {
streams = replicaInfo.createStreams(true,

View File

@ -42,6 +42,7 @@
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
@ -264,7 +265,8 @@ private boolean replaceBlock( ExtendedBlock block, DatanodeInfo source,
sock.setKeepAlive(true);
// sendRequest
DataOutputStream out = new DataOutputStream(sock.getOutputStream());
new Sender(out).replaceBlock(block, BlockTokenSecretManager.DUMMY_TOKEN,
new Sender(out).replaceBlock(block, StorageType.DEFAULT,
BlockTokenSecretManager.DUMMY_TOKEN,
source.getDatanodeUuid(), sourceProxy);
out.flush();
// receiveResponse

View File

@ -35,6 +35,7 @@
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
@ -147,9 +148,9 @@ public void testReplicationError() throws Exception {
DataChecksum checksum = DataChecksum.newDataChecksum(
DataChecksum.Type.CRC32, 512);
new Sender(out).writeBlock(block.getBlock(),
new Sender(out).writeBlock(block.getBlock(), StorageType.DEFAULT,
BlockTokenSecretManager.DUMMY_TOKEN, "",
new DatanodeInfo[0], null,
new DatanodeInfo[0], new StorageType[0], null,
BlockConstructionStage.PIPELINE_SETUP_CREATE, 1, 0L, 0L, 0L,
checksum, CachingStrategy.newDefaultStrategy());
out.flush();

View File

@ -29,6 +29,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@ -65,7 +66,8 @@ int addSomeBlocks(SimulatedFSDataset fsdataset, int startingBlockId)
ExtendedBlock b = new ExtendedBlock(bpid, i, 0, 0);
// we pass expected len as zero, - fsdataset should use the sizeof actual
// data written
ReplicaInPipelineInterface bInfo = fsdataset.createRbw(b);
ReplicaInPipelineInterface bInfo = fsdataset.createRbw(
StorageType.DEFAULT, b);
ReplicaOutputStreams out = bInfo.createStreams(true,
DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512));
try {

View File

@ -21,6 +21,7 @@
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
@ -147,7 +148,7 @@ private ExtendedBlock[] setup(String bpid, FsDatasetImpl dataSet) throws IOExcep
};
ReplicaMap replicasMap = dataSet.volumeMap;
FsVolumeImpl vol = dataSet.volumes.getNextVolume(0);
FsVolumeImpl vol = dataSet.volumes.getNextVolume(StorageType.DEFAULT, 0);
ReplicaInfo replicaInfo = new FinalizedReplica(
blocks[FINALIZED].getLocalBlock(), vol, vol.getCurrentDir().getParentFile());
replicasMap.add(bpid, replicaInfo);
@ -357,7 +358,7 @@ private void testWriteToRbw(FsDatasetImpl dataSet, ExtendedBlock[] blocks) throw
}
try {
dataSet.createRbw(blocks[FINALIZED]);
dataSet.createRbw(StorageType.DEFAULT, blocks[FINALIZED]);
Assert.fail("Should not have created a replica that's already " +
"finalized " + blocks[FINALIZED]);
} catch (ReplicaAlreadyExistsException e) {
@ -375,7 +376,7 @@ private void testWriteToRbw(FsDatasetImpl dataSet, ExtendedBlock[] blocks) throw
}
try {
dataSet.createRbw(blocks[TEMPORARY]);
dataSet.createRbw(StorageType.DEFAULT, blocks[TEMPORARY]);
Assert.fail("Should not have created a replica that had created as " +
"temporary " + blocks[TEMPORARY]);
} catch (ReplicaAlreadyExistsException e) {
@ -385,7 +386,7 @@ private void testWriteToRbw(FsDatasetImpl dataSet, ExtendedBlock[] blocks) throw
0L, blocks[RBW].getNumBytes()); // expect to be successful
try {
dataSet.createRbw(blocks[RBW]);
dataSet.createRbw(StorageType.DEFAULT, blocks[RBW]);
Assert.fail("Should not have created a replica that had created as RBW " +
blocks[RBW]);
} catch (ReplicaAlreadyExistsException e) {
@ -401,7 +402,7 @@ private void testWriteToRbw(FsDatasetImpl dataSet, ExtendedBlock[] blocks) throw
}
try {
dataSet.createRbw(blocks[RWR]);
dataSet.createRbw(StorageType.DEFAULT, blocks[RWR]);
Assert.fail("Should not have created a replica that was waiting to be " +
"recovered " + blocks[RWR]);
} catch (ReplicaAlreadyExistsException e) {
@ -417,7 +418,7 @@ private void testWriteToRbw(FsDatasetImpl dataSet, ExtendedBlock[] blocks) throw
}
try {
dataSet.createRbw(blocks[RUR]);
dataSet.createRbw(StorageType.DEFAULT, blocks[RUR]);
Assert.fail("Should not have created a replica that was under recovery " +
blocks[RUR]);
} catch (ReplicaAlreadyExistsException e) {
@ -434,45 +435,45 @@ private void testWriteToRbw(FsDatasetImpl dataSet, ExtendedBlock[] blocks) throw
e.getMessage().contains(ReplicaNotFoundException.NON_EXISTENT_REPLICA));
}
dataSet.createRbw(blocks[NON_EXISTENT]);
dataSet.createRbw(StorageType.DEFAULT, blocks[NON_EXISTENT]);
}
private void testWriteToTemporary(FsDatasetImpl dataSet, ExtendedBlock[] blocks) throws IOException {
try {
dataSet.createTemporary(blocks[FINALIZED]);
dataSet.createTemporary(StorageType.DEFAULT, blocks[FINALIZED]);
Assert.fail("Should not have created a temporary replica that was " +
"finalized " + blocks[FINALIZED]);
} catch (ReplicaAlreadyExistsException e) {
}
try {
dataSet.createTemporary(blocks[TEMPORARY]);
dataSet.createTemporary(StorageType.DEFAULT, blocks[TEMPORARY]);
Assert.fail("Should not have created a replica that had created as" +
"temporary " + blocks[TEMPORARY]);
} catch (ReplicaAlreadyExistsException e) {
}
try {
dataSet.createTemporary(blocks[RBW]);
dataSet.createTemporary(StorageType.DEFAULT, blocks[RBW]);
Assert.fail("Should not have created a replica that had created as RBW " +
blocks[RBW]);
} catch (ReplicaAlreadyExistsException e) {
}
try {
dataSet.createTemporary(blocks[RWR]);
dataSet.createTemporary(StorageType.DEFAULT, blocks[RWR]);
Assert.fail("Should not have created a replica that was waiting to be " +
"recovered " + blocks[RWR]);
} catch (ReplicaAlreadyExistsException e) {
}
try {
dataSet.createTemporary(blocks[RUR]);
dataSet.createTemporary(StorageType.DEFAULT, blocks[RUR]);
Assert.fail("Should not have created a replica that was under recovery " +
blocks[RUR]);
} catch (ReplicaAlreadyExistsException e) {
}
dataSet.createTemporary(blocks[NON_EXISTENT]);
dataSet.createTemporary(StorageType.DEFAULT, blocks[NON_EXISTENT]);
}
}

View File

@ -56,6 +56,9 @@ Release 2.6.0 - UNRELEASED
YARN-2045. Data persisted in NM should be versioned (Junping Du via jlowe)
YARN-2013. The diagnostics is always the ExitCodeException stack when the container
crashes. (Tsuyoshi OZAWA via junping_du)
OPTIMIZATIONS
BUG FIXES
@ -409,6 +412,9 @@ Release 2.5.0 - UNRELEASED
YARN-2269. Remove external links from YARN UI. (Craig Welch via xgong)
YARN-2270. Made TestFSDownload#testDownloadPublicWithStatCache be skipped
when theres no ancestor permissions. (Akira Ajisaka via zjshen)
Release 2.4.1 - 2014-06-23
INCOMPATIBLE CHANGES

View File

@ -61,23 +61,27 @@ HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR}
function print_usage(){
echo "Usage: yarn [--config confdir] COMMAND"
echo "where COMMAND is one of:"
echo " resourcemanager -format deletes the RMStateStore"
echo " resourcemanager run the ResourceManager"
echo " nodemanager run a nodemanager on each slave"
echo " timelineserver run the timeline server"
echo " rmadmin admin tools"
echo " version print the version"
echo " jar <jar> run a jar file"
echo " application prints application(s) report/kill application"
echo " applicationattempt prints applicationattempt(s) report"
echo " container prints container(s) report"
echo " node prints node report(s)"
echo " logs dump container logs"
echo " classpath prints the class path needed to get the"
echo " Hadoop jar and the required libraries"
echo " daemonlog get/set the log level for each daemon"
echo " resourcemanager -format-state-store deletes the RMStateStore"
echo " resourcemanager run the ResourceManager"
echo " nodemanager run a nodemanager on each slave"
echo " timelineserver run the timeline server"
echo " rmadmin admin tools"
echo " version print the version"
echo " jar <jar> run a jar file"
echo " application prints application(s)"
echo " report/kill application"
echo " applicationattempt prints applicationattempt(s)"
echo " report"
echo " container prints container(s) report"
echo " node prints node report(s)"
echo " logs dump container logs"
echo " classpath prints the class path needed to"
echo " get the Hadoop jar and the"
echo " required libraries"
echo " daemonlog get/set the log level for each"
echo " daemon"
echo " or"
echo " CLASSNAME run the class named CLASSNAME"
echo " CLASSNAME run the class named CLASSNAME"
echo "Most commands print help when invoked w/o parameters."
}

View File

@ -177,9 +177,10 @@ private static boolean checkPublicPermsForAll(FileSystem fs,
/**
* Returns true if all ancestors of the specified path have the 'execute'
* permission set for all users (i.e. that other users can traverse
* the directory heirarchy to the given path)
* the directory hierarchy to the given path)
*/
private static boolean ancestorsHaveExecutePermissions(FileSystem fs,
@VisibleForTesting
static boolean ancestorsHaveExecutePermissions(FileSystem fs,
Path path, LoadingCache<Path,Future<FileStatus>> statCache)
throws IOException {
Path current = path;

View File

@ -23,6 +23,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeTrue;
import java.io.File;
import java.io.FileOutputStream;
@ -66,6 +67,7 @@
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
@ -308,6 +310,11 @@ public void testDownloadPublicWithStatCache() throws IOException,
FileContext files = FileContext.getLocalFSFileContext(conf);
Path basedir = files.makeQualified(new Path("target",
TestFSDownload.class.getSimpleName()));
// if test directory doesn't have ancestor permission, skip this test
FileSystem f = basedir.getFileSystem(conf);
assumeTrue(FSDownload.ancestorsHaveExecutePermissions(f, basedir, null));
files.mkdir(basedir, null, true);
conf.setStrings(TestFSDownload.class.getName(), basedir.toString());

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.nodemanager;
import com.google.common.base.Optional;
import static org.apache.hadoop.fs.CreateFlag.CREATE;
import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
@ -212,10 +213,21 @@ public int launchContainer(Container container,
&& exitCode != ExitCode.TERMINATED.getExitCode()) {
LOG.warn("Exception from container-launch with container ID: "
+ containerId + " and exit code: " + exitCode , e);
logOutput(shExec.getOutput());
String diagnostics = "Exception from container-launch: "
+ e + "\n"
+ StringUtils.stringifyException(e) + "\n" + shExec.getOutput();
StringBuilder builder = new StringBuilder();
builder.append("Exception from container-launch.\n");
builder.append("Container id: " + containerId + "\n");
builder.append("Exit code: " + exitCode + "\n");
if (!Optional.fromNullable(e.getMessage()).or("").isEmpty()) {
builder.append("Exception message: " + e.getMessage() + "\n");
}
builder.append("Stack trace: "
+ StringUtils.stringifyException(e) + "\n");
if (!shExec.getOutput().isEmpty()) {
builder.append("Shell output: " + shExec.getOutput() + "\n");
}
String diagnostics = builder.toString();
logOutput(diagnostics);
container.handle(new ContainerDiagnosticsUpdateEvent(containerId,
diagnostics));
} else {

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.nodemanager;
import com.google.common.base.Optional;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
@ -296,9 +297,21 @@ public int launchContainer(Container container,
&& exitCode != ExitCode.TERMINATED.getExitCode()) {
LOG.warn("Exception from container-launch with container ID: "
+ containerId + " and exit code: " + exitCode , e);
logOutput(shExec.getOutput());
String diagnostics = "Exception from container-launch: \n"
+ StringUtils.stringifyException(e) + "\n" + shExec.getOutput();
StringBuilder builder = new StringBuilder();
builder.append("Exception from container-launch.\n");
builder.append("Container id: " + containerId + "\n");
builder.append("Exit code: " + exitCode + "\n");
if (!Optional.fromNullable(e.getMessage()).or("").isEmpty()) {
builder.append("Exception message: " + e.getMessage() + "\n");
}
builder.append("Stack trace: "
+ StringUtils.stringifyException(e) + "\n");
if (!shExec.getOutput().isEmpty()) {
builder.append("Shell output: " + shExec.getOutput() + "\n");
}
String diagnostics = builder.toString();
logOutput(diagnostics);
container.handle(new ContainerDiagnosticsUpdateEvent(containerId,
diagnostics));
} else {

View File

@ -18,16 +18,37 @@
package org.apache.hadoop.yarn.server.nodemanager;
import static org.apache.hadoop.fs.CreateFlag.CREATE;
import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.InputStream;
import java.io.IOException;
import java.io.LineNumberReader;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import org.junit.Assert;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.AbstractFileSystem;
@ -45,15 +66,13 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.FakeFSDataInputStream;
import static org.apache.hadoop.fs.CreateFlag.*;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.*;
import org.mockito.ArgumentMatcher;
import org.mockito.Matchers;
import static org.mockito.Mockito.*;
import org.junit.After;
import org.junit.Assert;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
public class TestDefaultContainerExecutor {
@ -191,6 +210,92 @@ public void testDirPermissions() throws Exception {
}
}
@Test
public void testContainerLaunchError()
throws IOException, InterruptedException {
Path localDir = new Path(BASE_TMP_PATH, "localDir");
List<String> localDirs = new ArrayList<String>();
localDirs.add(localDir.toString());
List<String> logDirs = new ArrayList<String>();
Path logDir = new Path(BASE_TMP_PATH, "logDir");
logDirs.add(logDir.toString());
Configuration conf = new Configuration();
conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
conf.set(YarnConfiguration.NM_LOCAL_DIRS, localDir.toString());
conf.set(YarnConfiguration.NM_LOG_DIRS, logDir.toString());
FileContext lfs = FileContext.getLocalFSFileContext(conf);
DefaultContainerExecutor mockExec = spy(new DefaultContainerExecutor(lfs));
mockExec.setConf(conf);
doAnswer(
new Answer() {
@Override
public Object answer(InvocationOnMock invocationOnMock)
throws Throwable {
String diagnostics = (String) invocationOnMock.getArguments()[0];
assertTrue("Invalid Diagnostics message: " + diagnostics,
diagnostics.contains("No such file or directory"));
return null;
}
}
).when(mockExec).logOutput(any(String.class));
String appSubmitter = "nobody";
String appId = "APP_ID";
String containerId = "CONTAINER_ID";
Container container = mock(Container.class);
ContainerId cId = mock(ContainerId.class);
ContainerLaunchContext context = mock(ContainerLaunchContext.class);
HashMap<String, String> env = new HashMap<String, String>();
when(container.getContainerId()).thenReturn(cId);
when(container.getLaunchContext()).thenReturn(context);
try {
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocationOnMock)
throws Throwable {
ContainerDiagnosticsUpdateEvent event =
(ContainerDiagnosticsUpdateEvent) invocationOnMock
.getArguments()[0];
assertTrue("Invalid Diagnostics message: "
+ event.getDiagnosticsUpdate(),
event.getDiagnosticsUpdate().contains("No such file or directory")
);
return null;
}
}).when(container).handle(any(ContainerDiagnosticsUpdateEvent.class));
when(cId.toString()).thenReturn(containerId);
when(cId.getApplicationAttemptId()).thenReturn(
ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 1), 0));
when(context.getEnvironment()).thenReturn(env);
mockExec.createUserLocalDirs(localDirs, appSubmitter);
mockExec.createUserCacheDirs(localDirs, appSubmitter);
mockExec.createAppDirs(localDirs, appSubmitter, appId);
mockExec.createAppLogDirs(appId, logDirs);
Path scriptPath = new Path("file:///bin/echo");
Path tokensPath = new Path("file:///dev/null");
Path workDir = localDir;
Path pidFile = new Path(workDir, "pid.txt");
mockExec.init();
mockExec.activateContainer(cId, pidFile);
int ret = mockExec
.launchContainer(container, scriptPath, tokensPath, appSubmitter,
appId, workDir, localDirs, localDirs);
Assert.assertNotSame(0, ret);
} finally {
mockExec.deleteAsUser(appSubmitter, localDir);
mockExec.deleteAsUser(appSubmitter, logDir);
}
}
// @Test
// public void testInit() throws IOException, InterruptedException {
// Configuration conf = new Configuration();

View File

@ -19,8 +19,12 @@
package org.apache.hadoop.yarn.server.nodemanager;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import java.io.File;
@ -34,8 +38,6 @@
import java.util.LinkedList;
import java.util.List;
import org.junit.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -46,9 +48,13 @@
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
import org.junit.Assert;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
public class TestLinuxContainerExecutorWithMocks {
@ -216,7 +222,19 @@ public void testContainerLaunchError() throws IOException {
conf.set(YarnConfiguration.NM_LOCAL_DIRS, "file:///bin/echo");
conf.set(YarnConfiguration.NM_LOG_DIRS, "file:///dev/null");
mockExec = new LinuxContainerExecutor();
mockExec = spy(new LinuxContainerExecutor());
doAnswer(
new Answer() {
@Override
public Object answer(InvocationOnMock invocationOnMock)
throws Throwable {
String diagnostics = (String) invocationOnMock.getArguments()[0];
assertTrue("Invalid Diagnostics message: " + diagnostics,
diagnostics.contains("badcommand"));
return null;
}
}
).when(mockExec).logOutput(any(String.class));
dirsHandler = new LocalDirsHandlerService();
dirsHandler.init(conf);
mockExec.setConf(conf);
@ -233,7 +251,22 @@ public void testContainerLaunchError() throws IOException {
when(container.getContainerId()).thenReturn(cId);
when(container.getLaunchContext()).thenReturn(context);
doAnswer(
new Answer() {
@Override
public Object answer(InvocationOnMock invocationOnMock)
throws Throwable {
ContainerDiagnosticsUpdateEvent event =
(ContainerDiagnosticsUpdateEvent) invocationOnMock
.getArguments()[0];
assertTrue("Invalid Diagnostics message: " +
event.getDiagnosticsUpdate(),
event.getDiagnosticsUpdate().contains("badcommand"));
return null;
}
}
).when(container).handle(any(ContainerDiagnosticsUpdateEvent.class));
when(cId.toString()).thenReturn(containerId);
when(context.getEnvironment()).thenReturn(env);

View File

@ -1035,8 +1035,8 @@ public static void main(String argv[]) {
StringUtils.startupShutdownMessage(ResourceManager.class, argv, LOG);
try {
Configuration conf = new YarnConfiguration();
// If -format, then delete RMStateStore; else startup normally
if (argv.length == 1 && argv[0].equals("-format")) {
// If -format-state-store, then delete RMStateStore; else startup normally
if (argv.length == 1 && argv[0].equals("-format-state-store")) {
deleteRMStateStore(conf);
} else {
ResourceManager resourceManager = new ResourceManager();