diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt index 1f2bf649c4..8854e0702c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt @@ -6,4 +6,7 @@ HDFS-6922. Add LazyPersist flag to INodeFile, save it in FsImage and edit logs. (Arpit Agarwal) + + HDFS-6923. Propagate LazyPersist flag to DNs via DataTransferProtocol. + (Arpit Agarwal) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index 14977a2507..c255bf6bbc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -344,6 +344,7 @@ public DatanodeInfo load(DatanodeInfo key) throws Exception { private long restartDeadline = 0; // Deadline of DN restart private BlockConstructionStage stage; // block construction stage private long bytesSent = 0; // number of bytes that've been sent + private final boolean isLazyPersistFile; /** Nodes have been used in the pipeline before and have failed. */ private final List failed = new ArrayList(); @@ -358,8 +359,9 @@ public DatanodeInfo load(DatanodeInfo key) throws Exception { /** * Default construction for file create */ - private DataStreamer() { + private DataStreamer(HdfsFileStatus stat) { isAppend = false; + isLazyPersistFile = stat.isLazyPersist(); stage = BlockConstructionStage.PIPELINE_SETUP_CREATE; } @@ -377,6 +379,7 @@ private DataStreamer(LocatedBlock lastBlock, HdfsFileStatus stat, block = lastBlock.getBlock(); bytesSent = block.getNumBytes(); accessToken = lastBlock.getBlockToken(); + isLazyPersistFile = stat.isLazyPersist(); long usedInLastBlock = stat.getLen() % blockSize; int freeInLastBlock = (int)(blockSize - usedInLastBlock); @@ -1352,7 +1355,7 @@ private boolean createBlockOutputStream(DatanodeInfo[] nodes, new Sender(out).writeBlock(blockCopy, nodeStorageTypes[0], accessToken, dfsClient.clientName, nodes, nodeStorageTypes, null, bcs, nodes.length, block.getNumBytes(), bytesSent, newGS, checksum, - cachingStrategy.get()); + cachingStrategy.get(), isLazyPersistFile); // receive ack for connect BlockOpResponseProto resp = BlockOpResponseProto.parseFrom( @@ -1601,7 +1604,7 @@ private DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat, computePacketChunkSize(dfsClient.getConf().writePacketSize, checksum.getBytesPerChecksum()); - streamer = new DataStreamer(); + streamer = new DataStreamer(stat); if (favoredNodes != null && favoredNodes.length != 0) { streamer.setFavoredNodes(favoredNodes); } @@ -1650,7 +1653,7 @@ private DFSOutputStream(DFSClient dfsClient, String src, } else { computePacketChunkSize(dfsClient.getConf().writePacketSize, checksum.getBytesPerChecksum()); - streamer = new DataStreamer(); + streamer = new DataStreamer(stat); } this.fileEncryptionInfo = stat.getFileEncryptionInfo(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java index d54d5bed00..f6b99e6160 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java @@ -106,8 +106,8 @@ public void writeBlock(final ExtendedBlock blk, final long maxBytesRcvd, final long latestGenerationStamp, final DataChecksum requestedChecksum, - final CachingStrategy cachingStrategy) throws IOException; - + final CachingStrategy cachingStrategy, + final boolean allowLazyPersist) throws IOException; /** * Transfer a block to another datanode. * The block stage must be diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java index a09437c0b0..78693bb8e4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java @@ -137,7 +137,8 @@ private void opWriteBlock(DataInputStream in) throws IOException { fromProto(proto.getRequestedChecksum()), (proto.hasCachingStrategy() ? getCachingStrategy(proto.getCachingStrategy()) : - CachingStrategy.newDefaultStrategy())); + CachingStrategy.newDefaultStrategy()), + (proto.hasAllowLazyPersist() ? proto.getAllowLazyPersist() : false)); } /** Receive {@link Op#TRANSFER_BLOCK} */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java index 68da52399c..4298bb0692 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java @@ -124,7 +124,8 @@ public void writeBlock(final ExtendedBlock blk, final long maxBytesRcvd, final long latestGenerationStamp, DataChecksum requestedChecksum, - final CachingStrategy cachingStrategy) throws IOException { + final CachingStrategy cachingStrategy, + final boolean allowLazyPersist) throws IOException { ClientOperationHeaderProto header = DataTransferProtoUtil.buildClientHeader( blk, clientName, blockToken); @@ -142,7 +143,8 @@ public void writeBlock(final ExtendedBlock blk, .setMaxBytesRcvd(maxBytesRcvd) .setLatestGenerationStamp(latestGenerationStamp) .setRequestedChecksum(checksumProto) - .setCachingStrategy(getCachingStrategy(cachingStrategy)); + .setCachingStrategy(getCachingStrategy(cachingStrategy)) + .setAllowLazyPersist(allowLazyPersist); if (source != null) { proto.setSource(PBHelper.convertDatanodeInfo(source)); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 1ec91d005b..e86ea0d46a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -1809,7 +1809,8 @@ public void run() { new Sender(out).writeBlock(b, targetStorageTypes[0], accessToken, clientname, targets, targetStorageTypes, srcNode, - stage, 0, 0, 0, 0, blockSender.getChecksum(), cachingStrategy); + stage, 0, 0, 0, 0, blockSender.getChecksum(), cachingStrategy, + false); // send data & checksum blockSender.sendBlock(out, unbufOut, null); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index 4575c9353c..3b8304e718 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -544,7 +544,8 @@ public void writeBlock(final ExtendedBlock block, final long maxBytesRcvd, final long latestGenerationStamp, DataChecksum requestedChecksum, - CachingStrategy cachingStrategy) throws IOException { + CachingStrategy cachingStrategy, + final boolean allowLazyPersist) throws IOException { previousOpClientName = clientname; updateCurrentThreadName("Receiving block " + block); final boolean isDatanode = clientname.length() == 0; @@ -648,10 +649,11 @@ public void writeBlock(final ExtendedBlock block, HdfsConstants.SMALL_BUFFER_SIZE)); mirrorIn = new DataInputStream(unbufMirrorIn); + // Do not propagate allowLazyPersist to downstream DataNodes. new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0], blockToken, clientname, targets, targetStorageTypes, srcDataNode, stage, pipelineSize, minBytesRcvd, maxBytesRcvd, - latestGenerationStamp, requestedChecksum, cachingStrategy); + latestGenerationStamp, requestedChecksum, cachingStrategy, false); mirrorOut.flush(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto index 6283b569dd..13747ab558 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto @@ -109,6 +109,13 @@ message OpWriteBlockProto { optional CachingStrategyProto cachingStrategy = 10; optional StorageTypeProto storageType = 11 [default = DISK]; repeated StorageTypeProto targetStorageTypes = 12; + + /** + * Hint to the DataNode that the block can be allocated on transient + * storage i.e. memory and written to disk lazily. The DataNode is free + * to ignore this hint. + */ + optional bool allowLazyPersist = 13 [default = false]; } message OpTransferBlockProto { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java index bcb68e9ce7..3586551011 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java @@ -524,6 +524,6 @@ void writeBlock(ExtendedBlock block, BlockConstructionStage stage, BlockTokenSecretManager.DUMMY_TOKEN, "cl", new DatanodeInfo[1], new StorageType[1], null, stage, 0, block.getNumBytes(), block.getNumBytes(), newGS, - checksum, CachingStrategy.newDefaultStrategy()); + checksum, CachingStrategy.newDefaultStrategy(), false); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java index 4b5b6e1ec4..f440bb6fe5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java @@ -152,7 +152,7 @@ public void testReplicationError() throws Exception { BlockTokenSecretManager.DUMMY_TOKEN, "", new DatanodeInfo[0], new StorageType[0], null, BlockConstructionStage.PIPELINE_SETUP_CREATE, 1, 0L, 0L, 0L, - checksum, CachingStrategy.newDefaultStrategy()); + checksum, CachingStrategy.newDefaultStrategy(), false); out.flush(); // close the connection before sending the content of the block