From 6824abc19e12ed142d9f32b8706ef73d97edd1cc Mon Sep 17 00:00:00 2001 From: Arpit Agarwal Date: Thu, 21 Aug 2014 01:13:49 +0000 Subject: [PATCH] HDFS-6758. Block writer should pass the expected block size to DataXceiverServer (Arpit Agarwal) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1619275 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 5 +- .../apache/hadoop/hdfs/DFSOutputStream.java | 8 +- .../hdfs/server/datanode/DataXceiver.java | 4 +- .../server/datanode/DataXceiverServer.java | 7 +- .../TestWriteBlockGetsBlockLengthHint.java | 106 ++++++++++++++++++ 5 files changed, 122 insertions(+), 8 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 0221018c46..14ef2f1989 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -500,7 +500,10 @@ Release 2.6.0 - UNRELEASED hadoop.security.saslproperties.resolver.class. (Benoy Antony via cnauroth) HDFS-6878. Change MiniDFSCluster to support StorageType configuration - for individual directories (Arpit Agarwal) + for individual directories. (Arpit Agarwal) + + HDFS-6758. block writer should pass the expected block size to + DataXceiverServer. (Arpit Agarwal) OPTIMIZATIONS diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index 4ba2a3ba8d..14977a2507 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -1342,8 +1342,14 @@ private boolean createBlockOutputStream(DatanodeInfo[] nodes, // BlockConstructionStage bcs = recoveryFlag? stage.getRecoveryStage(): stage; + + // We cannot change the block length in 'block' as it counts the number + // of bytes ack'ed. + ExtendedBlock blockCopy = new ExtendedBlock(block); + blockCopy.setNumBytes(blockSize); + // send the request - new Sender(out).writeBlock(block, nodeStorageTypes[0], accessToken, + new Sender(out).writeBlock(blockCopy, nodeStorageTypes[0], accessToken, dfsClient.clientName, nodes, nodeStorageTypes, null, bcs, nodes.length, block.getNumBytes(), bytesSent, newGS, checksum, cachingStrategy.get()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index 2d72f0110a..4575c9353c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -576,7 +576,9 @@ public void writeBlock(final ExtendedBlock block, // forward the original version of the block to downstream mirrors, so // make a copy here. final ExtendedBlock originalBlock = new ExtendedBlock(block); - block.setNumBytes(dataXceiverServer.estimateBlockSize); + if (block.getNumBytes() == 0) { + block.setNumBytes(dataXceiverServer.estimateBlockSize); + } LOG.info("Receiving " + block + " src: " + remoteAddress + " dest: " + localAddress); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java index 2c03a5123b..a1f766d624 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java @@ -100,11 +100,8 @@ synchronized void release() { /** * We need an estimate for block size to check if the disk partition has - * enough space. For now we set it to be the default block size set - * in the server side configuration, which is not ideal because the - * default block size should be a client-size configuration. - * A better solution is to include in the header the estimated block size, - * i.e. either the actual block size or the default block size. + * enough space. Newer clients pass the expected block size to the DataNode. + * For older clients we just use the server-side default block size. */ final long estimateBlockSize; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java new file mode 100644 index 0000000000..29ac3f2c47 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.server.datanode.*; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.Test; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.*; + + +/** + * Test to verify that the DFSClient passes the expected block length to + * the DataNode via DataTransferProtocol. + */ +public class TestWriteBlockGetsBlockLengthHint { + static final long DEFAULT_BLOCK_LENGTH = 1024; + static final long EXPECTED_BLOCK_LENGTH = DEFAULT_BLOCK_LENGTH * 2; + + @Test + public void blockLengthHintIsPropagated() throws IOException { + final String METHOD_NAME = GenericTestUtils.getMethodName(); + final Path path = new Path("/" + METHOD_NAME + ".dat"); + + Configuration conf = new HdfsConfiguration(); + FsDatasetChecker.setFactory(conf); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_LENGTH); + conf.setInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + + try { + cluster.waitActive(); + + // FsDatasetChecker#createRbw asserts during block creation if the test + // fails. + DFSTestUtil.createFile( + cluster.getFileSystem(), + path, + 4096, // Buffer size. + EXPECTED_BLOCK_LENGTH, + EXPECTED_BLOCK_LENGTH, + (short) 1, + 0x1BAD5EED); + } finally { + cluster.shutdown(); + } + } + + static class FsDatasetChecker extends SimulatedFSDataset { + static class Factory extends FsDatasetSpi.Factory { + @Override + public SimulatedFSDataset newInstance(DataNode datanode, + DataStorage storage, Configuration conf) throws IOException { + return new FsDatasetChecker(storage, conf); + } + + @Override + public boolean isSimulated() { + return true; + } + } + + public static void setFactory(Configuration conf) { + conf.set(DFSConfigKeys.DFS_DATANODE_FSDATASET_FACTORY_KEY, + Factory.class.getName()); + } + + public FsDatasetChecker(DataStorage storage, Configuration conf) { + super(storage, conf); + } + + /** + * Override createRbw to verify that the block length that is passed + * is correct. This requires both DFSOutputStream and BlockReceiver to + * correctly propagate the hint to FsDatasetSpi. + */ + @Override + public synchronized ReplicaInPipelineInterface createRbw( + StorageType storageType, ExtendedBlock b) throws IOException { + assertThat(b.getLocalBlock().getNumBytes(), is(EXPECTED_BLOCK_LENGTH)); + return super.createRbw(storageType, b); + } + } +}