From 905a127850d5e0cba85c2e075f989fa0f5cf129a Mon Sep 17 00:00:00 2001 From: Todd Lipcon Date: Fri, 18 Nov 2011 09:04:20 +0000 Subject: [PATCH] HDFS-2562. Refactor DN configuration variables out of DataNode class. Contributed by Todd Lipcon. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1203543 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../hdfs/server/datanode/BlockReceiver.java | 10 +- .../hdfs/server/datanode/BlockSender.java | 6 +- .../hadoop/hdfs/server/datanode/DNConf.java | 115 +++++++++++++++ .../hadoop/hdfs/server/datanode/DataNode.java | 134 ++++-------------- .../hdfs/server/datanode/DataXceiver.java | 50 +++---- .../datanode/TestInterDatanodeProtocol.java | 2 +- 7 files changed, 176 insertions(+), 144 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index a27543d301..3ee413ea7f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -114,6 +114,9 @@ Release 0.23.1 - UNRELEASED HDFS-2543. HADOOP_PREFIX cannot be overridden. (Bruno Mahé via tomwhite) + HDFS-2562. Refactor DN configuration variables out of DataNode class + (todd) + OPTIMIZATIONS HDFS-2130. Switch default checksum to CRC32C. (todd) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index 9277956e1f..4b961522d6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -185,8 +185,8 @@ class BlockReceiver implements Closeable { " while receiving block " + block + " from " + inAddr); } } - this.dropCacheBehindWrites = datanode.shouldDropCacheBehindWrites(); - this.syncBehindWrites = datanode.shouldSyncBehindWrites(); + this.dropCacheBehindWrites = datanode.getDnConf().dropCacheBehindWrites; + this.syncBehindWrites = datanode.getDnConf().syncBehindWrites; final boolean isCreate = isDatanode || isTransfer || stage == BlockConstructionStage.PIPELINE_SETUP_CREATE; @@ -249,7 +249,7 @@ public void close() throws IOException { try { if (checksumOut != null) { checksumOut.flush(); - if (datanode.syncOnClose && (cout instanceof FileOutputStream)) { + if (datanode.getDnConf().syncOnClose && (cout instanceof FileOutputStream)) { ((FileOutputStream)cout).getChannel().force(true); } checksumOut.close(); @@ -265,7 +265,7 @@ public void close() throws IOException { try { if (out != null) { out.flush(); - if (datanode.syncOnClose && (out instanceof FileOutputStream)) { + if (datanode.getDnConf().syncOnClose && (out instanceof FileOutputStream)) { ((FileOutputStream)out).getChannel().force(true); } out.close(); @@ -435,7 +435,7 @@ private void readNextPacket() throws IOException { * calculation in DFSClient to make the guess accurate. */ int chunkSize = bytesPerChecksum + checksumSize; - int chunksPerPacket = (datanode.writePacketSize - PacketHeader.PKT_HEADER_LEN + int chunksPerPacket = (datanode.getDnConf().writePacketSize - PacketHeader.PKT_HEADER_LEN + chunkSize - 1)/chunkSize; buf = ByteBuffer.allocate(PacketHeader.PKT_HEADER_LEN + Math.max(chunksPerPacket, 1) * chunkSize); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java index f4168ee1c9..cf4e803260 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java @@ -185,8 +185,8 @@ class BlockSender implements java.io.Closeable { this.corruptChecksumOk = corruptChecksumOk; this.verifyChecksum = verifyChecksum; this.clientTraceFmt = clientTraceFmt; - this.readaheadLength = datanode.getReadaheadLength(); - this.shouldDropCacheBehindRead = datanode.shouldDropCacheBehindReads(); + this.readaheadLength = datanode.getDnConf().readaheadLength; + this.shouldDropCacheBehindRead = datanode.getDnConf().dropCacheBehindReads; synchronized(datanode.data) { this.replica = getReplica(block, datanode); @@ -215,7 +215,7 @@ class BlockSender implements java.io.Closeable { // transferToFully() fails on 32 bit platforms for block sizes >= 2GB, // use normal transfer in those cases - this.transferToAllowed = datanode.transferToAllowed && + this.transferToAllowed = datanode.getDnConf().transferToAllowed && (!is32Bit || length <= Integer.MAX_VALUE); DataChecksum csum; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java new file mode 100644 index 0000000000..e4bf9a676d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SYNCONCLOSE_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SYNCONCLOSE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_TRANSFERTO_ALLOWED_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_TRANSFERTO_ALLOWED_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; + +/** + * Simple class encapsulating all of the configuration that the DataNode + * loads at startup time. + */ +class DNConf { + final int socketTimeout; + final int socketWriteTimeout; + final int socketKeepaliveTimeout; + + final boolean transferToAllowed; + final boolean dropCacheBehindWrites; + final boolean syncBehindWrites; + final boolean dropCacheBehindReads; + final boolean syncOnClose; + + + final long readaheadLength; + final long heartBeatInterval; + final long blockReportInterval; + final long deleteReportInterval; + final long initialBlockReportDelay; + final int writePacketSize; + + public DNConf(Configuration conf) { + socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, + HdfsServerConstants.READ_TIMEOUT); + socketWriteTimeout = conf.getInt(DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY, + HdfsServerConstants.WRITE_TIMEOUT); + socketKeepaliveTimeout = conf.getInt( + DFSConfigKeys.DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY, + DFSConfigKeys.DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_DEFAULT); + + /* Based on results on different platforms, we might need set the default + * to false on some of them. */ + transferToAllowed = conf.getBoolean( + DFS_DATANODE_TRANSFERTO_ALLOWED_KEY, + DFS_DATANODE_TRANSFERTO_ALLOWED_DEFAULT); + + writePacketSize = conf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY, + DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT); + + readaheadLength = conf.getLong( + DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_KEY, + DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); + dropCacheBehindWrites = conf.getBoolean( + DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_KEY, + DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_DEFAULT); + syncBehindWrites = conf.getBoolean( + DFSConfigKeys.DFS_DATANODE_SYNC_BEHIND_WRITES_KEY, + DFSConfigKeys.DFS_DATANODE_SYNC_BEHIND_WRITES_DEFAULT); + dropCacheBehindReads = conf.getBoolean( + DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_KEY, + DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_DEFAULT); + + this.blockReportInterval = conf.getLong(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, + DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT); + + long initBRDelay = conf.getLong( + DFS_BLOCKREPORT_INITIAL_DELAY_KEY, + DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT) * 1000L; + if (initBRDelay >= blockReportInterval) { + initBRDelay = 0; + DataNode.LOG.info("dfs.blockreport.initialDelay is greater than " + + "dfs.blockreport.intervalMsec." + " Setting initial delay to 0 msec:"); + } + initialBlockReportDelay = initBRDelay; + + heartBeatInterval = conf.getLong(DFS_HEARTBEAT_INTERVAL_KEY, + DFS_HEARTBEAT_INTERVAL_DEFAULT) * 1000L; + + this.deleteReportInterval = 100 * heartBeatInterval; + // do we need to sync block file contents to disk when blockfile is closed? + this.syncOnClose = conf.getBoolean(DFS_DATANODE_SYNCONCLOSE_KEY, + DFS_DATANODE_SYNCONCLOSE_DEFAULT); + + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 32fe56c63f..df8a2adcca 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -19,15 +19,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ADMIN; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY; @@ -51,17 +44,10 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SIMULATEDDATASTORAGE_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SIMULATEDDATASTORAGE_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STORAGEID_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SYNCONCLOSE_DEFAULT; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SYNCONCLOSE_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_TRANSFERTO_ALLOWED_DEFAULT; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_TRANSFERTO_ALLOWED_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_FEDERATION_NAMESERVICES; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEBHDFS_ENABLED_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEBHDFS_ENABLED_DEFAULT; import static org.apache.hadoop.hdfs.server.common.Util.now; @@ -104,7 +90,6 @@ import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HDFSPolicyProvider; import org.apache.hadoop.hdfs.HdfsConfiguration; @@ -402,10 +387,7 @@ void refreshNamenodes(Configuration conf) AtomicInteger xmitsInProgress = new AtomicInteger(); Daemon dataXceiverServer = null; ThreadGroup threadGroup = null; - long blockReportInterval; - long deleteReportInterval; - long initialBlockReportDelay = DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT * 1000L; - long heartBeatInterval; + private DNConf dnConf; private boolean heartbeatsDisabledForTests = false; private DataStorage storage = null; private HttpServer infoServer = null; @@ -415,18 +397,9 @@ void refreshNamenodes(Configuration conf) private volatile String hostName; // Host name of this datanode private static String dnThreadName; - int socketTimeout; - int socketWriteTimeout = 0; - boolean transferToAllowed = true; - private boolean dropCacheBehindWrites = false; - private boolean syncBehindWrites = false; - private boolean dropCacheBehindReads = false; - private long readaheadLength = 0; - int writePacketSize = 0; boolean isBlockTokenEnabled; BlockPoolTokenSecretManager blockPoolTokenSecretManager; - boolean syncOnClose; public DataBlockScanner blockScanner = null; private DirectoryScanner directoryScanner = null; @@ -494,51 +467,6 @@ private static String getHostName(Configuration config) return name; } - private void initConfig(Configuration conf) { - this.socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, - HdfsServerConstants.READ_TIMEOUT); - this.socketWriteTimeout = conf.getInt(DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY, - HdfsServerConstants.WRITE_TIMEOUT); - /* Based on results on different platforms, we might need set the default - * to false on some of them. */ - this.transferToAllowed = conf.getBoolean( - DFS_DATANODE_TRANSFERTO_ALLOWED_KEY, - DFS_DATANODE_TRANSFERTO_ALLOWED_DEFAULT); - this.writePacketSize = conf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY, - DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT); - - this.readaheadLength = conf.getLong( - DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_KEY, - DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); - this.dropCacheBehindWrites = conf.getBoolean( - DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_KEY, - DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_DEFAULT); - this.syncBehindWrites = conf.getBoolean( - DFSConfigKeys.DFS_DATANODE_SYNC_BEHIND_WRITES_KEY, - DFSConfigKeys.DFS_DATANODE_SYNC_BEHIND_WRITES_DEFAULT); - this.dropCacheBehindReads = conf.getBoolean( - DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_KEY, - DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_DEFAULT); - - this.blockReportInterval = conf.getLong(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, - DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT); - this.initialBlockReportDelay = conf.getLong( - DFS_BLOCKREPORT_INITIAL_DELAY_KEY, - DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT) * 1000L; - if (this.initialBlockReportDelay >= blockReportInterval) { - this.initialBlockReportDelay = 0; - LOG.info("dfs.blockreport.initialDelay is greater than " + - "dfs.blockreport.intervalMsec." + " Setting initial delay to 0 msec:"); - } - this.heartBeatInterval = conf.getLong(DFS_HEARTBEAT_INTERVAL_KEY, - DFS_HEARTBEAT_INTERVAL_DEFAULT) * 1000L; - - this.deleteReportInterval = 100 * heartBeatInterval; - // do we need to sync block file contents to disk when blockfile is closed? - this.syncOnClose = conf.getBoolean(DFS_DATANODE_SYNCONCLOSE_KEY, - DFS_DATANODE_SYNCONCLOSE_DEFAULT); - } - private void startInfoServer(Configuration conf) throws IOException { // create a servlet to serve full-file content InetSocketAddress infoSocAddr = DataNode.getInfoAddr(conf); @@ -709,7 +637,7 @@ private void initDataXceiver(Configuration conf) throws IOException { // find free port or use privileged port provided ServerSocket ss; if(secureResources == null) { - ss = (socketWriteTimeout > 0) ? + ss = (dnConf.socketWriteTimeout > 0) ? ServerSocketChannel.open().socket() : new ServerSocket(); Server.bind(ss, socAddr, 0); } else { @@ -794,11 +722,13 @@ static class BPOfferService implements Runnable { private volatile boolean shouldServiceRun = true; UpgradeManagerDatanode upgradeManager = null; private final DataNode dn; + private final DNConf dnConf; BPOfferService(InetSocketAddress nnAddr, DataNode dn) { this.dn = dn; this.bpRegistration = dn.createRegistration(); this.nnAddr = nnAddr; + this.dnConf = dn.getDnConf(); } /** @@ -900,9 +830,9 @@ void setupBP(Configuration conf) void scheduleBlockReport(long delay) { if (delay > 0) { // send BR after random delay lastBlockReport = System.currentTimeMillis() - - ( dn.blockReportInterval - DFSUtil.getRandom().nextInt((int)(delay))); + - ( dnConf.blockReportInterval - DFSUtil.getRandom().nextInt((int)(delay))); } else { // send at next heartbeat - lastBlockReport = lastHeartbeat - dn.blockReportInterval; + lastBlockReport = lastHeartbeat - dnConf.blockReportInterval; } resetBlockReportTime = true; // reset future BRs for randomness } @@ -1007,7 +937,7 @@ DatanodeCommand blockReport() throws IOException { // send block report if timer has expired. DatanodeCommand cmd = null; long startTime = now(); - if (startTime - lastBlockReport > dn.blockReportInterval) { + if (startTime - lastBlockReport > dnConf.blockReportInterval) { // Create block report long brCreateStartTime = now(); @@ -1029,7 +959,7 @@ DatanodeCommand blockReport() throws IOException { // If we have sent the first block report, then wait a random // time before we start the periodic block reports. if (resetBlockReportTime) { - lastBlockReport = startTime - DFSUtil.getRandom().nextInt((int)(dn.blockReportInterval)); + lastBlockReport = startTime - DFSUtil.getRandom().nextInt((int)(dnConf.blockReportInterval)); resetBlockReportTime = false; } else { /* say the last block report was at 8:20:14. The current report @@ -1039,7 +969,7 @@ DatanodeCommand blockReport() throws IOException { * 2) unexpected like 11:35:43, next report should be at 12:20:14 */ lastBlockReport += (now() - lastBlockReport) / - dn.blockReportInterval * dn.blockReportInterval; + dnConf.blockReportInterval * dnConf.blockReportInterval; } LOG.info("sent block report, processed command:" + cmd); } @@ -1101,10 +1031,10 @@ private synchronized void cleanUp() { */ private void offerService() throws Exception { LOG.info("For namenode " + nnAddr + " using DELETEREPORT_INTERVAL of " - + dn.deleteReportInterval + " msec " + " BLOCKREPORT_INTERVAL of " - + dn.blockReportInterval + "msec" + " Initial delay: " - + dn.initialBlockReportDelay + "msec" + "; heartBeatInterval=" - + dn.heartBeatInterval); + + dnConf.deleteReportInterval + " msec " + " BLOCKREPORT_INTERVAL of " + + dnConf.blockReportInterval + "msec" + " Initial delay: " + + dnConf.initialBlockReportDelay + "msec" + "; heartBeatInterval=" + + dnConf.heartBeatInterval); // // Now loop for a long time.... @@ -1116,7 +1046,7 @@ private void offerService() throws Exception { // // Every so often, send heartbeat or block-report // - if (startTime - lastHeartbeat > dn.heartBeatInterval) { + if (startTime - lastHeartbeat > dnConf.heartBeatInterval) { // // All heartbeat messages include following info: // -- Datanode name @@ -1140,7 +1070,7 @@ private void offerService() throws Exception { } } if (pendingReceivedRequests > 0 - || (startTime - lastDeletedReport > dn.deleteReportInterval)) { + || (startTime - lastDeletedReport > dnConf.deleteReportInterval)) { reportReceivedDeletedBlocks(); lastDeletedReport = startTime; } @@ -1157,7 +1087,7 @@ private void offerService() throws Exception { // There is no work to do; sleep until hearbeat timer elapses, // or work arrives, and then iterate again. // - long waitTime = dn.heartBeatInterval - + long waitTime = dnConf.heartBeatInterval - (System.currentTimeMillis() - lastHeartbeat); synchronized(receivedAndDeletedBlockList) { if (waitTime > 0 && pendingReceivedRequests == 0) { @@ -1180,7 +1110,7 @@ private void offerService() throws Exception { } LOG.warn("RemoteException in offerService", re); try { - long sleepTime = Math.min(1000, dn.heartBeatInterval); + long sleepTime = Math.min(1000, dnConf.heartBeatInterval); Thread.sleep(sleepTime); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); @@ -1248,7 +1178,7 @@ void register() throws IOException { LOG.info("in register:" + ";bpDNR="+bpRegistration.storageInfo); // random short delay - helps scatter the BR from all DNs - scheduleBlockReport(dn.initialBlockReportDelay); + scheduleBlockReport(dnConf.initialBlockReportDelay); } @@ -1458,11 +1388,11 @@ void startDataNode(Configuration conf, this.secureResources = resources; this.dataDirs = dataDirs; this.conf = conf; + this.dnConf = new DNConf(conf); storage = new DataStorage(); // global DN settings - initConfig(conf); registerMXBean(); initDataXceiver(conf); startInfoServer(conf); @@ -1710,7 +1640,7 @@ DatanodeRegistration getDNRegistrationByMachineName(String mName) { * Creates either NIO or regular depending on socketWriteTimeout. */ protected Socket newSocket() throws IOException { - return (socketWriteTimeout > 0) ? + return (dnConf.socketWriteTimeout > 0) ? SocketChannel.open().socket() : new Socket(); } @@ -2135,10 +2065,10 @@ public void run() { InetSocketAddress curTarget = NetUtils.createSocketAddr(targets[0].getName()); sock = newSocket(); - NetUtils.connect(sock, curTarget, socketTimeout); - sock.setSoTimeout(targets.length * socketTimeout); + NetUtils.connect(sock, curTarget, dnConf.socketTimeout); + sock.setSoTimeout(targets.length * dnConf.socketTimeout); - long writeTimeout = socketWriteTimeout + + long writeTimeout = dnConf.socketWriteTimeout + HdfsServerConstants.WRITE_TIMEOUT_EXTENSION * (targets.length-1); OutputStream baseStream = NetUtils.getOutputStream(sock, writeTimeout); out = new DataOutputStream(new BufferedOutputStream(baseStream, @@ -2581,7 +2511,7 @@ private void recoverBlock(RecoveringBlock rBlock) throws IOException { DatanodeRegistration bpReg = bpos.bpRegistration; InterDatanodeProtocol datanode = bpReg.equals(id)? this: DataNode.createInterDataNodeProtocolProxy(id, getConf(), - socketTimeout); + dnConf.socketTimeout); ReplicaRecoveryInfo info = callInitReplicaRecovery(datanode, rBlock); if (info != null && info.getGenerationStamp() >= block.getGenerationStamp() && @@ -2970,20 +2900,8 @@ public Long getBalancerBandwidth() { (DataXceiverServer) this.dataXceiverServer.getRunnable(); return dxcs.balanceThrottler.getBandwidth(); } - - long getReadaheadLength() { - return readaheadLength; - } - - boolean shouldDropCacheBehindWrites() { - return dropCacheBehindWrites; - } - - boolean shouldDropCacheBehindReads() { - return dropCacheBehindReads; - } - boolean shouldSyncBehindWrites() { - return syncBehindWrites; + DNConf getDnConf() { + return dnConf; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index d6a3963c0b..11282a5b7a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -38,7 +38,6 @@ import java.util.Arrays; import org.apache.commons.logging.Log; -import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; @@ -82,9 +81,9 @@ class DataXceiver extends Receiver implements Runnable { private final String remoteAddress; // address of remote side private final String localAddress; // local address of this daemon private final DataNode datanode; + private final DNConf dnConf; private final DataXceiverServer dataXceiverServer; - private int socketKeepaliveTimeout; private long opStartTime; //the start time of receiving an Op public DataXceiver(Socket s, DataNode datanode, @@ -95,14 +94,11 @@ public DataXceiver(Socket s, DataNode datanode, this.s = s; this.isLocal = s.getInetAddress().equals(s.getLocalAddress()); this.datanode = datanode; + this.dnConf = datanode.getDnConf(); this.dataXceiverServer = dataXceiverServer; remoteAddress = s.getRemoteSocketAddress().toString(); localAddress = s.getLocalSocketAddress().toString(); - socketKeepaliveTimeout = datanode.getConf().getInt( - DFSConfigKeys.DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY, - DFSConfigKeys.DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_DEFAULT); - if (LOG.isDebugEnabled()) { LOG.debug("Number of active connections is: " + datanode.getXceiverCount()); @@ -144,8 +140,8 @@ public void run() { try { if (opsProcessed != 0) { - assert socketKeepaliveTimeout > 0; - s.setSoTimeout(socketKeepaliveTimeout); + assert dnConf.socketKeepaliveTimeout > 0; + s.setSoTimeout(dnConf.socketKeepaliveTimeout); } op = readOp(); } catch (InterruptedIOException ignored) { @@ -180,7 +176,7 @@ public void run() { opStartTime = now(); processOp(op); ++opsProcessed; - } while (!s.isClosed() && socketKeepaliveTimeout > 0); + } while (!s.isClosed() && dnConf.socketKeepaliveTimeout > 0); } catch (Throwable t) { LOG.error(datanode.getMachineName() + ":DataXceiver error processing " + ((op == null) ? "unknown" : op.name()) + " operation " + @@ -205,7 +201,7 @@ public void readBlock(final ExtendedBlock block, final long blockOffset, final long length) throws IOException { OutputStream baseStream = NetUtils.getOutputStream(s, - datanode.socketWriteTimeout); + dnConf.socketWriteTimeout); DataOutputStream out = new DataOutputStream(new BufferedOutputStream( baseStream, HdfsConstants.SMALL_BUFFER_SIZE)); checkAccess(out, true, block, blockToken, @@ -231,13 +227,13 @@ public void readBlock(final ExtendedBlock block, } catch(IOException e) { String msg = "opReadBlock " + block + " received exception " + e; LOG.info(msg); - sendResponse(s, ERROR, msg, datanode.socketWriteTimeout); + sendResponse(s, ERROR, msg, dnConf.socketWriteTimeout); throw e; } // send op status writeSuccessWithChecksumInfo(blockSender, - getStreamWithTimeout(s, datanode.socketWriteTimeout)); + getStreamWithTimeout(s, dnConf.socketWriteTimeout)); long read = blockSender.sendBlock(out, baseStream, null); // send data @@ -335,7 +331,7 @@ public void writeBlock(final ExtendedBlock block, // reply to upstream datanode or client final DataOutputStream replyOut = new DataOutputStream( new BufferedOutputStream( - NetUtils.getOutputStream(s, datanode.socketWriteTimeout), + NetUtils.getOutputStream(s, dnConf.socketWriteTimeout), HdfsConstants.SMALL_BUFFER_SIZE)); checkAccess(replyOut, isClient, block, blockToken, Op.WRITE_BLOCK, BlockTokenSecretManager.AccessMode.WRITE); @@ -370,9 +366,9 @@ public void writeBlock(final ExtendedBlock block, mirrorTarget = NetUtils.createSocketAddr(mirrorNode); mirrorSock = datanode.newSocket(); try { - int timeoutValue = datanode.socketTimeout + int timeoutValue = dnConf.socketTimeout + (HdfsServerConstants.READ_TIMEOUT_EXTENSION * targets.length); - int writeTimeout = datanode.socketWriteTimeout + + int writeTimeout = dnConf.socketWriteTimeout + (HdfsServerConstants.WRITE_TIMEOUT_EXTENSION * targets.length); NetUtils.connect(mirrorSock, mirrorTarget, timeoutValue); mirrorSock.setSoTimeout(timeoutValue); @@ -508,7 +504,7 @@ public void transferBlock(final ExtendedBlock blk, updateCurrentThreadName(Op.TRANSFER_BLOCK + " " + blk); final DataOutputStream out = new DataOutputStream( - NetUtils.getOutputStream(s, datanode.socketWriteTimeout)); + NetUtils.getOutputStream(s, dnConf.socketWriteTimeout)); try { datanode.transferReplicaForPipelineRecovery(blk, targets, clientName); writeResponse(Status.SUCCESS, null, out); @@ -521,7 +517,7 @@ public void transferBlock(final ExtendedBlock blk, public void blockChecksum(final ExtendedBlock block, final Token blockToken) throws IOException { final DataOutputStream out = new DataOutputStream( - NetUtils.getOutputStream(s, datanode.socketWriteTimeout)); + NetUtils.getOutputStream(s, dnConf.socketWriteTimeout)); checkAccess(out, true, block, blockToken, Op.BLOCK_CHECKSUM, BlockTokenSecretManager.AccessMode.READ); updateCurrentThreadName("Reading metadata for block " + block); @@ -581,7 +577,7 @@ public void copyBlock(final ExtendedBlock block, LOG.warn("Invalid access token in request from " + remoteAddress + " for OP_COPY_BLOCK for block " + block + " : " + e.getLocalizedMessage()); - sendResponse(s, ERROR_ACCESS_TOKEN, "Invalid access token", datanode.socketWriteTimeout); + sendResponse(s, ERROR_ACCESS_TOKEN, "Invalid access token", dnConf.socketWriteTimeout); return; } @@ -591,7 +587,7 @@ public void copyBlock(final ExtendedBlock block, String msg = "Not able to copy block " + block.getBlockId() + " to " + s.getRemoteSocketAddress() + " because threads quota is exceeded."; LOG.info(msg); - sendResponse(s, ERROR, msg, datanode.socketWriteTimeout); + sendResponse(s, ERROR, msg, dnConf.socketWriteTimeout); return; } @@ -606,7 +602,7 @@ public void copyBlock(final ExtendedBlock block, // set up response stream OutputStream baseStream = NetUtils.getOutputStream( - s, datanode.socketWriteTimeout); + s, dnConf.socketWriteTimeout); reply = new DataOutputStream(new BufferedOutputStream( baseStream, HdfsConstants.SMALL_BUFFER_SIZE)); @@ -659,7 +655,7 @@ public void replaceBlock(final ExtendedBlock block, + " for OP_REPLACE_BLOCK for block " + block + " : " + e.getLocalizedMessage()); sendResponse(s, ERROR_ACCESS_TOKEN, "Invalid access token", - datanode.socketWriteTimeout); + dnConf.socketWriteTimeout); return; } } @@ -668,7 +664,7 @@ public void replaceBlock(final ExtendedBlock block, String msg = "Not able to receive block " + block.getBlockId() + " from " + s.getRemoteSocketAddress() + " because threads quota is exceeded."; LOG.warn(msg); - sendResponse(s, ERROR, msg, datanode.socketWriteTimeout); + sendResponse(s, ERROR, msg, dnConf.socketWriteTimeout); return; } @@ -684,11 +680,11 @@ public void replaceBlock(final ExtendedBlock block, InetSocketAddress proxyAddr = NetUtils.createSocketAddr( proxySource.getName()); proxySock = datanode.newSocket(); - NetUtils.connect(proxySock, proxyAddr, datanode.socketTimeout); - proxySock.setSoTimeout(datanode.socketTimeout); + NetUtils.connect(proxySock, proxyAddr, dnConf.socketTimeout); + proxySock.setSoTimeout(dnConf.socketTimeout); OutputStream baseStream = NetUtils.getOutputStream(proxySock, - datanode.socketWriteTimeout); + dnConf.socketWriteTimeout); proxyOut = new DataOutputStream(new BufferedOutputStream(baseStream, HdfsConstants.SMALL_BUFFER_SIZE)); @@ -750,7 +746,7 @@ public void replaceBlock(final ExtendedBlock block, // send response back try { - sendResponse(s, opStatus, errMsg, datanode.socketWriteTimeout); + sendResponse(s, opStatus, errMsg, dnConf.socketWriteTimeout); } catch (IOException ioe) { LOG.warn("Error writing reply back to " + s.getRemoteSocketAddress()); } @@ -826,7 +822,7 @@ private void checkAccess(DataOutputStream out, final boolean reply, if (reply) { if (out == null) { out = new DataOutputStream( - NetUtils.getOutputStream(s, datanode.socketWriteTimeout)); + NetUtils.getOutputStream(s, dnConf.socketWriteTimeout)); } BlockOpResponseProto.Builder resp = BlockOpResponseProto.newBuilder() diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java index 90869a2637..487adfe5bb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java @@ -154,7 +154,7 @@ public void testBlockMetaDataInfo() throws Exception { //connect to a data node DataNode datanode = cluster.getDataNode(datanodeinfo[0].getIpcPort()); InterDatanodeProtocol idp = DataNode.createInterDataNodeProtocolProxy( - datanodeinfo[0], conf, datanode.socketTimeout); + datanodeinfo[0], conf, datanode.getDnConf().socketTimeout); assertTrue(datanode != null); //stop block scanner, so we could compare lastScanTime