HDFS-6923. Propagate LazyPersist flag to DNs via DataTransferProtocol. (Arpit Agarwal)

This commit is contained in:
arp 2014-08-27 15:13:20 -07:00 committed by arp7
parent 042b33f20b
commit c2354a7f81
10 changed files with 33 additions and 14 deletions

View File

@ -7,3 +7,6 @@
HDFS-6922. Add LazyPersist flag to INodeFile, save it in FsImage and
edit logs. (Arpit Agarwal)
HDFS-6923. Propagate LazyPersist flag to DNs via DataTransferProtocol.
(Arpit Agarwal)

View File

@ -344,6 +344,7 @@ public DatanodeInfo load(DatanodeInfo key) throws Exception {
private long restartDeadline = 0; // Deadline of DN restart
private BlockConstructionStage stage; // block construction stage
private long bytesSent = 0; // number of bytes that've been sent
private final boolean isLazyPersistFile;
/** Nodes have been used in the pipeline before and have failed. */
private final List<DatanodeInfo> failed = new ArrayList<DatanodeInfo>();
@ -358,8 +359,9 @@ public DatanodeInfo load(DatanodeInfo key) throws Exception {
/**
* Default construction for file create
*/
private DataStreamer() {
private DataStreamer(HdfsFileStatus stat) {
isAppend = false;
isLazyPersistFile = stat.isLazyPersist();
stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
}
@ -377,6 +379,7 @@ private DataStreamer(LocatedBlock lastBlock, HdfsFileStatus stat,
block = lastBlock.getBlock();
bytesSent = block.getNumBytes();
accessToken = lastBlock.getBlockToken();
isLazyPersistFile = stat.isLazyPersist();
long usedInLastBlock = stat.getLen() % blockSize;
int freeInLastBlock = (int)(blockSize - usedInLastBlock);
@ -1352,7 +1355,7 @@ private boolean createBlockOutputStream(DatanodeInfo[] nodes,
new Sender(out).writeBlock(blockCopy, nodeStorageTypes[0], accessToken,
dfsClient.clientName, nodes, nodeStorageTypes, null, bcs,
nodes.length, block.getNumBytes(), bytesSent, newGS, checksum,
cachingStrategy.get());
cachingStrategy.get(), isLazyPersistFile);
// receive ack for connect
BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
@ -1601,7 +1604,7 @@ private DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
computePacketChunkSize(dfsClient.getConf().writePacketSize,
checksum.getBytesPerChecksum());
streamer = new DataStreamer();
streamer = new DataStreamer(stat);
if (favoredNodes != null && favoredNodes.length != 0) {
streamer.setFavoredNodes(favoredNodes);
}
@ -1650,7 +1653,7 @@ private DFSOutputStream(DFSClient dfsClient, String src,
} else {
computePacketChunkSize(dfsClient.getConf().writePacketSize,
checksum.getBytesPerChecksum());
streamer = new DataStreamer();
streamer = new DataStreamer(stat);
}
this.fileEncryptionInfo = stat.getFileEncryptionInfo();
}

View File

@ -106,8 +106,8 @@ public void writeBlock(final ExtendedBlock blk,
final long maxBytesRcvd,
final long latestGenerationStamp,
final DataChecksum requestedChecksum,
final CachingStrategy cachingStrategy) throws IOException;
final CachingStrategy cachingStrategy,
final boolean allowLazyPersist) throws IOException;
/**
* Transfer a block to another datanode.
* The block stage must be

View File

@ -137,7 +137,8 @@ private void opWriteBlock(DataInputStream in) throws IOException {
fromProto(proto.getRequestedChecksum()),
(proto.hasCachingStrategy() ?
getCachingStrategy(proto.getCachingStrategy()) :
CachingStrategy.newDefaultStrategy()));
CachingStrategy.newDefaultStrategy()),
(proto.hasAllowLazyPersist() ? proto.getAllowLazyPersist() : false));
}
/** Receive {@link Op#TRANSFER_BLOCK} */

View File

@ -124,7 +124,8 @@ public void writeBlock(final ExtendedBlock blk,
final long maxBytesRcvd,
final long latestGenerationStamp,
DataChecksum requestedChecksum,
final CachingStrategy cachingStrategy) throws IOException {
final CachingStrategy cachingStrategy,
final boolean allowLazyPersist) throws IOException {
ClientOperationHeaderProto header = DataTransferProtoUtil.buildClientHeader(
blk, clientName, blockToken);
@ -142,7 +143,8 @@ public void writeBlock(final ExtendedBlock blk,
.setMaxBytesRcvd(maxBytesRcvd)
.setLatestGenerationStamp(latestGenerationStamp)
.setRequestedChecksum(checksumProto)
.setCachingStrategy(getCachingStrategy(cachingStrategy));
.setCachingStrategy(getCachingStrategy(cachingStrategy))
.setAllowLazyPersist(allowLazyPersist);
if (source != null) {
proto.setSource(PBHelper.convertDatanodeInfo(source));

View File

@ -1809,7 +1809,8 @@ public void run() {
new Sender(out).writeBlock(b, targetStorageTypes[0], accessToken,
clientname, targets, targetStorageTypes, srcNode,
stage, 0, 0, 0, 0, blockSender.getChecksum(), cachingStrategy);
stage, 0, 0, 0, 0, blockSender.getChecksum(), cachingStrategy,
false);
// send data & checksum
blockSender.sendBlock(out, unbufOut, null);

View File

@ -544,7 +544,8 @@ public void writeBlock(final ExtendedBlock block,
final long maxBytesRcvd,
final long latestGenerationStamp,
DataChecksum requestedChecksum,
CachingStrategy cachingStrategy) throws IOException {
CachingStrategy cachingStrategy,
final boolean allowLazyPersist) throws IOException {
previousOpClientName = clientname;
updateCurrentThreadName("Receiving block " + block);
final boolean isDatanode = clientname.length() == 0;
@ -648,10 +649,11 @@ public void writeBlock(final ExtendedBlock block,
HdfsConstants.SMALL_BUFFER_SIZE));
mirrorIn = new DataInputStream(unbufMirrorIn);
// Do not propagate allowLazyPersist to downstream DataNodes.
new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
blockToken, clientname, targets, targetStorageTypes, srcDataNode,
stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
latestGenerationStamp, requestedChecksum, cachingStrategy);
latestGenerationStamp, requestedChecksum, cachingStrategy, false);
mirrorOut.flush();

View File

@ -109,6 +109,13 @@ message OpWriteBlockProto {
optional CachingStrategyProto cachingStrategy = 10;
optional StorageTypeProto storageType = 11 [default = DISK];
repeated StorageTypeProto targetStorageTypes = 12;
/**
* Hint to the DataNode that the block can be allocated on transient
* storage i.e. memory and written to disk lazily. The DataNode is free
* to ignore this hint.
*/
optional bool allowLazyPersist = 13 [default = false];
}
message OpTransferBlockProto {

View File

@ -524,6 +524,6 @@ void writeBlock(ExtendedBlock block, BlockConstructionStage stage,
BlockTokenSecretManager.DUMMY_TOKEN, "cl",
new DatanodeInfo[1], new StorageType[1], null, stage,
0, block.getNumBytes(), block.getNumBytes(), newGS,
checksum, CachingStrategy.newDefaultStrategy());
checksum, CachingStrategy.newDefaultStrategy(), false);
}
}

View File

@ -152,7 +152,7 @@ public void testReplicationError() throws Exception {
BlockTokenSecretManager.DUMMY_TOKEN, "",
new DatanodeInfo[0], new StorageType[0], null,
BlockConstructionStage.PIPELINE_SETUP_CREATE, 1, 0L, 0L, 0L,
checksum, CachingStrategy.newDefaultStrategy());
checksum, CachingStrategy.newDefaultStrategy(), false);
out.flush();
// close the connection before sending the content of the block