diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java index be6aa83ce6..456dcc1589 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java @@ -165,6 +165,10 @@ private static long getConfiguredScanPeriodMs(Configuration conf) { } } + public BlockScanner(DataNode datanode) { + this(datanode, datanode.getConf()); + } + public BlockScanner(DataNode datanode, Configuration conf) { this.datanode = datanode; this.conf = new Conf(conf); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java index 757d7f5eda..823d05c425 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java @@ -56,6 +56,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BP_READY_TIMEOUT_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BP_READY_TIMEOUT_DEFAULT; +import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; @@ -72,7 +73,6 @@ */ @InterfaceAudience.Private public class DNConf { - final Configuration conf; final int socketTimeout; final int socketWriteTimeout; final int socketKeepaliveTimeout; @@ -120,73 +120,77 @@ public class DNConf { private final int volFailuresTolerated; private final int volsConfigured; private final int maxDataLength; + private Configurable dn; - public DNConf(Configuration conf) { - this.conf = conf; - socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, + public DNConf(final Configurable dn) { + this.dn = dn; + socketTimeout = getConf().getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, HdfsConstants.READ_TIMEOUT); - socketWriteTimeout = conf.getInt(DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY, + socketWriteTimeout = getConf().getInt(DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY, HdfsConstants.WRITE_TIMEOUT); - socketKeepaliveTimeout = conf.getInt( + socketKeepaliveTimeout = getConf().getInt( DFSConfigKeys.DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY, DFSConfigKeys.DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_DEFAULT); - this.transferSocketSendBufferSize = conf.getInt( + this.transferSocketSendBufferSize = getConf().getInt( DFSConfigKeys.DFS_DATANODE_TRANSFER_SOCKET_SEND_BUFFER_SIZE_KEY, DFSConfigKeys.DFS_DATANODE_TRANSFER_SOCKET_SEND_BUFFER_SIZE_DEFAULT); - this.transferSocketRecvBufferSize = conf.getInt( + this.transferSocketRecvBufferSize = getConf().getInt( DFSConfigKeys.DFS_DATANODE_TRANSFER_SOCKET_RECV_BUFFER_SIZE_KEY, DFSConfigKeys.DFS_DATANODE_TRANSFER_SOCKET_RECV_BUFFER_SIZE_DEFAULT); - this.tcpNoDelay = conf.getBoolean( + this.tcpNoDelay = getConf().getBoolean( DFSConfigKeys.DFS_DATA_TRANSFER_SERVER_TCPNODELAY, DFSConfigKeys.DFS_DATA_TRANSFER_SERVER_TCPNODELAY_DEFAULT); /* Based on results on different platforms, we might need set the default * to false on some of them. */ - transferToAllowed = conf.getBoolean( + transferToAllowed = getConf().getBoolean( DFS_DATANODE_TRANSFERTO_ALLOWED_KEY, DFS_DATANODE_TRANSFERTO_ALLOWED_DEFAULT); - writePacketSize = conf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY, + writePacketSize = getConf().getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY, DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT); - readaheadLength = conf.getLong( + readaheadLength = getConf().getLong( HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_KEY, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); - maxDataLength = conf.getInt(DFSConfigKeys.IPC_MAXIMUM_DATA_LENGTH, + maxDataLength = getConf().getInt(DFSConfigKeys.IPC_MAXIMUM_DATA_LENGTH, DFSConfigKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT); - dropCacheBehindWrites = conf.getBoolean( + dropCacheBehindWrites = getConf().getBoolean( DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_KEY, DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_DEFAULT); - syncBehindWrites = conf.getBoolean( + syncBehindWrites = getConf().getBoolean( DFSConfigKeys.DFS_DATANODE_SYNC_BEHIND_WRITES_KEY, DFSConfigKeys.DFS_DATANODE_SYNC_BEHIND_WRITES_DEFAULT); - syncBehindWritesInBackground = conf.getBoolean( + syncBehindWritesInBackground = getConf().getBoolean( DFSConfigKeys.DFS_DATANODE_SYNC_BEHIND_WRITES_IN_BACKGROUND_KEY, DFSConfigKeys.DFS_DATANODE_SYNC_BEHIND_WRITES_IN_BACKGROUND_DEFAULT); - dropCacheBehindReads = conf.getBoolean( + dropCacheBehindReads = getConf().getBoolean( DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_KEY, DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_DEFAULT); - connectToDnViaHostname = conf.getBoolean( + connectToDnViaHostname = getConf().getBoolean( DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME, DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT); - this.blockReportInterval = conf.getLong(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, + this.blockReportInterval = getConf().getLong( + DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT); - this.ibrInterval = conf.getLong( + this.ibrInterval = getConf().getLong( DFSConfigKeys.DFS_BLOCKREPORT_INCREMENTAL_INTERVAL_MSEC_KEY, DFSConfigKeys.DFS_BLOCKREPORT_INCREMENTAL_INTERVAL_MSEC_DEFAULT); - this.blockReportSplitThreshold = conf.getLong(DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY, - DFS_BLOCKREPORT_SPLIT_THRESHOLD_DEFAULT); - this.cacheReportInterval = conf.getLong(DFS_CACHEREPORT_INTERVAL_MSEC_KEY, + this.blockReportSplitThreshold = getConf().getLong( + DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY, + DFS_BLOCKREPORT_SPLIT_THRESHOLD_DEFAULT); + this.cacheReportInterval = getConf().getLong( + DFS_CACHEREPORT_INTERVAL_MSEC_KEY, DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT); - this.dfsclientSlowIoWarningThresholdMs = conf.getLong( + this.dfsclientSlowIoWarningThresholdMs = getConf().getLong( HdfsClientConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY, HdfsClientConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT); - this.datanodeSlowIoWarningThresholdMs = conf.getLong( + this.datanodeSlowIoWarningThresholdMs = getConf().getLong( DFSConfigKeys.DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_KEY, DFSConfigKeys.DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_DEFAULT); - long initBRDelay = conf.getTimeDuration( + long initBRDelay = getConf().getTimeDuration( DFS_BLOCKREPORT_INITIAL_DELAY_KEY, DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT, TimeUnit.SECONDS) * 1000L; if (initBRDelay >= blockReportInterval) { @@ -197,11 +201,11 @@ public DNConf(Configuration conf) { } initialBlockReportDelayMs = initBRDelay; - heartBeatInterval = conf.getTimeDuration(DFS_HEARTBEAT_INTERVAL_KEY, + heartBeatInterval = getConf().getTimeDuration(DFS_HEARTBEAT_INTERVAL_KEY, DFS_HEARTBEAT_INTERVAL_DEFAULT, TimeUnit.SECONDS) * 1000L; long confLifelineIntervalMs = - conf.getLong(DFS_DATANODE_LIFELINE_INTERVAL_SECONDS_KEY, - 3 * conf.getTimeDuration(DFS_HEARTBEAT_INTERVAL_KEY, + getConf().getLong(DFS_DATANODE_LIFELINE_INTERVAL_SECONDS_KEY, + 3 * getConf().getTimeDuration(DFS_HEARTBEAT_INTERVAL_KEY, DFS_HEARTBEAT_INTERVAL_DEFAULT, TimeUnit.SECONDS)) * 1000L; if (confLifelineIntervalMs <= heartBeatInterval) { confLifelineIntervalMs = 3 * heartBeatInterval; @@ -215,47 +219,50 @@ public DNConf(Configuration conf) { lifelineIntervalMs = confLifelineIntervalMs; // do we need to sync block file contents to disk when blockfile is closed? - this.syncOnClose = conf.getBoolean(DFS_DATANODE_SYNCONCLOSE_KEY, + this.syncOnClose = getConf().getBoolean(DFS_DATANODE_SYNCONCLOSE_KEY, DFS_DATANODE_SYNCONCLOSE_DEFAULT); - this.minimumNameNodeVersion = conf.get(DFS_DATANODE_MIN_SUPPORTED_NAMENODE_VERSION_KEY, + this.minimumNameNodeVersion = getConf().get( + DFS_DATANODE_MIN_SUPPORTED_NAMENODE_VERSION_KEY, DFS_DATANODE_MIN_SUPPORTED_NAMENODE_VERSION_DEFAULT); - this.encryptDataTransfer = conf.getBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY, + this.encryptDataTransfer = getConf().getBoolean( + DFS_ENCRYPT_DATA_TRANSFER_KEY, DFS_ENCRYPT_DATA_TRANSFER_DEFAULT); - this.encryptionAlgorithm = conf.get(DFS_DATA_ENCRYPTION_ALGORITHM_KEY); - this.trustedChannelResolver = TrustedChannelResolver.getInstance(conf); + this.encryptionAlgorithm = getConf().get(DFS_DATA_ENCRYPTION_ALGORITHM_KEY); + this.trustedChannelResolver = TrustedChannelResolver.getInstance(getConf()); this.saslPropsResolver = DataTransferSaslUtil.getSaslPropertiesResolver( - conf); - this.ignoreSecurePortsForTesting = conf.getBoolean( + getConf()); + this.ignoreSecurePortsForTesting = getConf().getBoolean( IGNORE_SECURE_PORTS_FOR_TESTING_KEY, IGNORE_SECURE_PORTS_FOR_TESTING_DEFAULT); - this.xceiverStopTimeout = conf.getLong( + this.xceiverStopTimeout = getConf().getLong( DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY, DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_DEFAULT); - this.maxLockedMemory = conf.getLong( + this.maxLockedMemory = getConf().getLong( DFS_DATANODE_MAX_LOCKED_MEMORY_KEY, DFS_DATANODE_MAX_LOCKED_MEMORY_DEFAULT); - this.restartReplicaExpiry = conf.getLong( + this.restartReplicaExpiry = getConf().getLong( DFS_DATANODE_RESTART_REPLICA_EXPIRY_KEY, DFS_DATANODE_RESTART_REPLICA_EXPIRY_DEFAULT) * 1000L; - this.allowNonLocalLazyPersist = conf.getBoolean( + this.allowNonLocalLazyPersist = getConf().getBoolean( DFS_DATANODE_NON_LOCAL_LAZY_PERSIST, DFS_DATANODE_NON_LOCAL_LAZY_PERSIST_DEFAULT); - this.bpReadyTimeout = conf.getTimeDuration( + this.bpReadyTimeout = getConf().getTimeDuration( DFS_DATANODE_BP_READY_TIMEOUT_KEY, DFS_DATANODE_BP_READY_TIMEOUT_DEFAULT, TimeUnit.SECONDS); this.volFailuresTolerated = - conf.getInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, + getConf().getInt( + DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT); String[] dataDirs = - conf.getTrimmedStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY); + getConf().getTrimmedStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY); this.volsConfigured = (dataDirs == null) ? 0 : dataDirs.length; } @@ -270,7 +277,7 @@ String getMinimumNameNodeVersion() { * @return Configuration the configuration */ public Configuration getConf() { - return conf; + return this.dn.getConf(); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 09ecac1dce..abff79695d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -363,7 +363,6 @@ public static InetSocketAddress createSocketAddr(String target) { private SecureResources secureResources = null; // dataDirs must be accessed while holding the DataNode lock. private List dataDirs; - private Configuration conf; private final String confVersion; private final long maxNumberOfBlocksToLog; private final boolean pipelineSupportECN; @@ -419,7 +418,7 @@ private static Tracer createTracer(Configuration conf) { this.confVersion = null; this.usersWithLocalPathAccess = null; this.connectToDnViaHostname = false; - this.blockScanner = new BlockScanner(this, conf); + this.blockScanner = new BlockScanner(this, this.getConf()); this.pipelineSupportECN = false; this.checkDiskErrorInterval = ThreadLocalRandom.current().nextInt(5000, (int) (5000 * 1.25)); @@ -438,7 +437,7 @@ private static Tracer createTracer(Configuration conf) { this.tracer = createTracer(conf); this.tracerConfigurationManager = new TracerConfigurationManager(DATANODE_HTRACE_PREFIX, conf); - this.blockScanner = new BlockScanner(this, conf); + this.blockScanner = new BlockScanner(this); this.lastDiskErrorCheck = 0; this.maxNumberOfBlocksToLog = conf.getLong(DFS_MAX_NUM_BLOCKS_TO_LOG_KEY, DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT); @@ -487,7 +486,7 @@ private static Tracer createTracer(Configuration conf) { try { hostName = getHostName(conf); LOG.info("Configured hostname is " + hostName); - startDataNode(conf, dataDirs, resources); + startDataNode(dataDirs, resources); } catch (IOException ie) { shutdown(); throw ie; @@ -527,7 +526,7 @@ public String reconfigurePropertyImpl(String property, String newVal) try { LOG.info("Reconfiguring " + property + " to " + newVal); this.refreshVolumes(newVal); - return conf.get(DFS_DATANODE_DATA_DIR_KEY); + return getConf().get(DFS_DATANODE_DATA_DIR_KEY); } catch (IOException e) { rootException = e; } finally { @@ -650,7 +649,7 @@ ChangedVolumes parseChangedVolumes(String newVolumes) throws IOException { // Use the existing StorageLocation to detect storage type changes. Map existingLocations = new HashMap<>(); - for (StorageLocation loc : getStorageLocations(this.conf)) { + for (StorageLocation loc : getStorageLocations(getConf())) { existingLocations.put(loc.getFile().getCanonicalPath(), loc); } @@ -846,7 +845,7 @@ private synchronized void removeVolumes( it.remove(); } } - conf.set(DFS_DATANODE_DATA_DIR_KEY, Joiner.on(",").join(dataDirs)); + getConf().set(DFS_DATANODE_DATA_DIR_KEY, Joiner.on(",").join(dataDirs)); if (ioe != null) { throw ioe; @@ -904,14 +903,14 @@ private static String getHostName(Configuration config) * for information related to the different configuration options and * Http Policy is decided. */ - private void startInfoServer(Configuration conf) + private void startInfoServer() throws IOException { // SecureDataNodeStarter will bind the privileged port to the channel if // the DN is started by JSVC, pass it along. ServerSocketChannel httpServerChannel = secureResources != null ? secureResources.getHttpServerChannel() : null; - this.httpServer = new DatanodeHttpServer(conf, this, httpServerChannel); + httpServer = new DatanodeHttpServer(getConf(), this, httpServerChannel); httpServer.start(); if (httpServer.getHttpAddress() != null) { infoPort = httpServer.getHttpAddress().getPort(); @@ -933,24 +932,24 @@ private void startPlugins(Configuration conf) { } } - private void initIpcServer(Configuration conf) throws IOException { + private void initIpcServer() throws IOException { InetSocketAddress ipcAddr = NetUtils.createSocketAddr( - conf.getTrimmed(DFS_DATANODE_IPC_ADDRESS_KEY)); + getConf().getTrimmed(DFS_DATANODE_IPC_ADDRESS_KEY)); // Add all the RPC protocols that the Datanode implements - RPC.setProtocolEngine(conf, ClientDatanodeProtocolPB.class, + RPC.setProtocolEngine(getConf(), ClientDatanodeProtocolPB.class, ProtobufRpcEngine.class); ClientDatanodeProtocolServerSideTranslatorPB clientDatanodeProtocolXlator = new ClientDatanodeProtocolServerSideTranslatorPB(this); BlockingService service = ClientDatanodeProtocolService .newReflectiveBlockingService(clientDatanodeProtocolXlator); - ipcServer = new RPC.Builder(conf) + ipcServer = new RPC.Builder(getConf()) .setProtocol(ClientDatanodeProtocolPB.class) .setInstance(service) .setBindAddress(ipcAddr.getHostName()) .setPort(ipcAddr.getPort()) .setNumHandlers( - conf.getInt(DFS_DATANODE_HANDLER_COUNT_KEY, + getConf().getInt(DFS_DATANODE_HANDLER_COUNT_KEY, DFS_DATANODE_HANDLER_COUNT_DEFAULT)).setVerbose(false) .setSecretManager(blockPoolTokenSecretManager).build(); @@ -958,29 +957,32 @@ private void initIpcServer(Configuration conf) throws IOException { = new ReconfigurationProtocolServerSideTranslatorPB(this); service = ReconfigurationProtocolService .newReflectiveBlockingService(reconfigurationProtocolXlator); - DFSUtil.addPBProtocol(conf, ReconfigurationProtocolPB.class, service, + DFSUtil.addPBProtocol(getConf(), ReconfigurationProtocolPB.class, service, ipcServer); InterDatanodeProtocolServerSideTranslatorPB interDatanodeProtocolXlator = new InterDatanodeProtocolServerSideTranslatorPB(this); service = InterDatanodeProtocolService .newReflectiveBlockingService(interDatanodeProtocolXlator); - DFSUtil.addPBProtocol(conf, InterDatanodeProtocolPB.class, service, + DFSUtil.addPBProtocol(getConf(), InterDatanodeProtocolPB.class, service, ipcServer); TraceAdminProtocolServerSideTranslatorPB traceAdminXlator = new TraceAdminProtocolServerSideTranslatorPB(this); BlockingService traceAdminService = TraceAdminService .newReflectiveBlockingService(traceAdminXlator); - DFSUtil.addPBProtocol(conf, TraceAdminProtocolPB.class, traceAdminService, + DFSUtil.addPBProtocol( + getConf(), + TraceAdminProtocolPB.class, + traceAdminService, ipcServer); LOG.info("Opened IPC server at " + ipcServer.getListenerAddress()); // set service-level authorization security policy - if (conf.getBoolean( + if (getConf().getBoolean( CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) { - ipcServer.refreshServiceAcl(conf, new HDFSPolicyProvider()); + ipcServer.refreshServiceAcl(getConf(), new HDFSPolicyProvider()); } } @@ -1071,17 +1073,17 @@ private void shutdownDiskBalancer() { } } - private void initDataXceiver(Configuration conf) throws IOException { + private void initDataXceiver() throws IOException { // find free port or use privileged port provided TcpPeerServer tcpPeerServer; if (secureResources != null) { tcpPeerServer = new TcpPeerServer(secureResources); } else { - int backlogLength = conf.getInt( + int backlogLength = getConf().getInt( CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_KEY, CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_DEFAULT); tcpPeerServer = new TcpPeerServer(dnConf.socketWriteTimeout, - DataNode.getStreamingAddr(conf), backlogLength); + DataNode.getStreamingAddr(getConf()), backlogLength); } if (dnConf.getTransferSocketRecvBufferSize() > 0) { tcpPeerServer.setReceiveBufferSize( @@ -1090,24 +1092,27 @@ private void initDataXceiver(Configuration conf) throws IOException { streamingAddr = tcpPeerServer.getStreamingAddr(); LOG.info("Opened streaming server at " + streamingAddr); this.threadGroup = new ThreadGroup("dataXceiverServer"); - xserver = new DataXceiverServer(tcpPeerServer, conf, this); + xserver = new DataXceiverServer(tcpPeerServer, getConf(), this); this.dataXceiverServer = new Daemon(threadGroup, xserver); this.threadGroup.setDaemon(true); // auto destroy when empty - if (conf.getBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, - HdfsClientConfigKeys.Read.ShortCircuit.DEFAULT) || - conf.getBoolean(HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, - HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT)) { + if (getConf().getBoolean( + HdfsClientConfigKeys.Read.ShortCircuit.KEY, + HdfsClientConfigKeys.Read.ShortCircuit.DEFAULT) || + getConf().getBoolean( + HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, + HdfsClientConfigKeys + .DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT)) { DomainPeerServer domainPeerServer = - getDomainPeerServer(conf, streamingAddr.getPort()); + getDomainPeerServer(getConf(), streamingAddr.getPort()); if (domainPeerServer != null) { this.localDataXceiverServer = new Daemon(threadGroup, - new DataXceiverServer(domainPeerServer, conf, this)); + new DataXceiverServer(domainPeerServer, getConf(), this)); LOG.info("Listening on UNIX domain socket: " + domainPeerServer.getBindPath()); } } - this.shortCircuitRegistry = new ShortCircuitRegistry(conf); + this.shortCircuitRegistry = new ShortCircuitRegistry(getConf()); } private static DomainPeerServer getDomainPeerServer(Configuration conf, @@ -1288,26 +1293,23 @@ boolean areCacheReportsDisabledForTests() { /** * This method starts the data node with the specified conf. * - * @param conf - the configuration - * if conf's CONFIG_PROPERTY_SIMULATED property is set - * then a simulated storage based data node is created. + * If conf's CONFIG_PROPERTY_SIMULATED property is set + * then a simulated storage based data node is created. * * @param dataDirs - only for a non-simulated storage data node * @throws IOException */ - void startDataNode(Configuration conf, - List dataDirs, + void startDataNode(List dataDirectories, SecureResources resources ) throws IOException { // settings global for all BPs in the Data Node this.secureResources = resources; synchronized (this) { - this.dataDirs = dataDirs; + this.dataDirs = dataDirectories; } - this.conf = conf; - this.dnConf = new DNConf(conf); - checkSecureConfig(dnConf, conf, resources); + this.dnConf = new DNConf(this); + checkSecureConfig(dnConf, getConf(), resources); if (dnConf.maxLockedMemory > 0) { if (!NativeIO.POSIX.getCacheManipulator().verifyCanMlock()) { @@ -1347,10 +1349,10 @@ void startDataNode(Configuration conf, // global DN settings registerMXBean(); - initDataXceiver(conf); - startInfoServer(conf); + initDataXceiver(); + startInfoServer(); pauseMonitor = new JvmPauseMonitor(); - pauseMonitor.init(conf); + pauseMonitor.init(getConf()); pauseMonitor.start(); // BlockPoolTokenSecretManager is required to create ipc server. @@ -1360,24 +1362,24 @@ void startDataNode(Configuration conf, dnUserName = UserGroupInformation.getCurrentUser().getShortUserName(); LOG.info("dnUserName = " + dnUserName); LOG.info("supergroup = " + supergroup); - initIpcServer(conf); + initIpcServer(); - metrics = DataNodeMetrics.create(conf, getDisplayName()); + metrics = DataNodeMetrics.create(getConf(), getDisplayName()); metrics.getJvmMetrics().setPauseMonitor(pauseMonitor); - ecWorker = new ErasureCodingWorker(conf, this); + ecWorker = new ErasureCodingWorker(getConf(), this); blockRecoveryWorker = new BlockRecoveryWorker(this); blockPoolManager = new BlockPoolManager(this); - blockPoolManager.refreshNamenodes(conf); + blockPoolManager.refreshNamenodes(getConf()); // Create the ReadaheadPool from the DataNode context so we can // exit without having to explicitly shutdown its thread pool. readaheadPool = ReadaheadPool.getInstance(); - saslClient = new SaslDataTransferClient(dnConf.conf, + saslClient = new SaslDataTransferClient(dnConf.getConf(), dnConf.saslPropsResolver, dnConf.trustedChannelResolver); saslServer = new SaslDataTransferServer(dnConf, blockPoolTokenSecretManager); - startMetricsLogger(conf); + startMetricsLogger(); } /** @@ -1592,10 +1594,10 @@ void initBlockPool(BPOfferService bpos) throws IOException { // failures. checkDiskError(); - data.addBlockPool(nsInfo.getBlockPoolID(), conf); + data.addBlockPool(nsInfo.getBlockPoolID(), getConf()); blockScanner.enableBlockPoolId(bpos.getBlockPoolId()); - initDirectoryScanner(conf); - initDiskBalancer(data, conf); + initDirectoryScanner(getConf()); + initDiskBalancer(data, getConf()); } List getAllBpOs() { @@ -1616,10 +1618,10 @@ int getBpOsCount() { */ private void initStorage(final NamespaceInfo nsInfo) throws IOException { final FsDatasetSpi.Factory> factory - = FsDatasetSpi.Factory.getFactory(conf); + = FsDatasetSpi.Factory.getFactory(getConf()); if (!factory.isSimulated()) { - final StartupOption startOpt = getStartupOption(conf); + final StartupOption startOpt = getStartupOption(getConf()); if (startOpt == null) { throw new IOException("Startup option not set."); } @@ -1639,7 +1641,7 @@ private void initStorage(final NamespaceInfo nsInfo) throws IOException { synchronized(this) { if (data == null) { - data = factory.newInstance(this, storage, conf); + data = factory.newInstance(this, storage, getConf()); } } } @@ -1720,7 +1722,7 @@ public Socket newSocket() throws IOException { */ DatanodeProtocolClientSideTranslatorPB connectToNN( InetSocketAddress nnAddr) throws IOException { - return new DatanodeProtocolClientSideTranslatorPB(nnAddr, conf); + return new DatanodeProtocolClientSideTranslatorPB(nnAddr, getConf()); } /** @@ -1733,7 +1735,7 @@ DatanodeProtocolClientSideTranslatorPB connectToNN( DatanodeLifelineProtocolClientSideTranslatorPB connectToLifelineNN( InetSocketAddress lifelineNnAddr) throws IOException { return new DatanodeLifelineProtocolClientSideTranslatorPB(lifelineNnAddr, - conf); + getConf()); } public static InterDatanodeProtocol createInterDataNodeProtocolProxy( @@ -2388,7 +2390,7 @@ public void run() { unbufIn = saslStreams.in; out = new DataOutputStream(new BufferedOutputStream(unbufOut, - DFSUtilClient.getSmallBufferSize(conf))); + DFSUtilClient.getSmallBufferSize(getConf()))); in = new DataInputStream(unbufIn); blockSender = new BlockSender(b, 0, b.getNumBytes(), false, false, true, DataNode.this, null, cachingStrategy); @@ -2508,7 +2510,7 @@ public void runDatanodeDaemon() throws IOException { } ipcServer.setTracer(tracer); ipcServer.start(); - startPlugins(conf); + startPlugins(getConf()); } /** @@ -3051,8 +3053,8 @@ public void refreshNamenodes(Configuration conf) throws IOException { @Override // ClientDatanodeProtocol public void refreshNamenodes() throws IOException { checkSuperuserPrivilege(); - conf = new Configuration(); - refreshNamenodes(conf); + setConf(new Configuration()); + refreshNamenodes(getConf()); } @Override // ClientDatanodeProtocol @@ -3327,8 +3329,8 @@ IOStreamPair connectToDN(DatanodeInfo datanodeID, int timeout, Token blockToken) throws IOException { - return DFSUtilClient.connectToDN(datanodeID, timeout, conf, saslClient, - NetUtils.getDefaultSocketFactory(getConf()), false, + return DFSUtilClient.connectToDN(datanodeID, timeout, getConf(), + saslClient, NetUtils.getDefaultSocketFactory(getConf()), false, getDataEncryptionKeyFactoryForBlock(block), blockToken); } @@ -3341,7 +3343,7 @@ private void initOOBTimeout() { final int numOobTypes = oobEnd - oobStart + 1; oobTimeouts = new long[numOobTypes]; - final String[] ele = conf.get(DFS_DATANODE_OOB_TIMEOUT_KEY, + final String[] ele = getConf().get(DFS_DATANODE_OOB_TIMEOUT_KEY, DFS_DATANODE_OOB_TIMEOUT_DEFAULT).split(","); for (int i = 0; i < numOobTypes; i++) { oobTimeouts[i] = (i < ele.length) ? Long.parseLong(ele[i]) : 0; @@ -3367,10 +3369,9 @@ public long getOOBTimeout(Status status) * Start a timer to periodically write DataNode metrics to the log file. This * behavior can be disabled by configuration. * - * @param metricConf */ - protected void startMetricsLogger(Configuration metricConf) { - long metricsLoggerPeriodSec = metricConf.getInt( + protected void startMetricsLogger() { + long metricsLoggerPeriodSec = getConf().getInt( DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_KEY, DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_DEFAULT); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java index 29db702cf2..2d50c75763 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java @@ -116,7 +116,7 @@ public void setupMocks() throws Exception { File dnDataDir = new File(new File(TEST_BUILD_DATA, "dfs"), "data"); conf.set(DFS_DATANODE_DATA_DIR_KEY, dnDataDir.toURI().toString()); Mockito.doReturn(conf).when(mockDn).getConf(); - Mockito.doReturn(new DNConf(conf)).when(mockDn).getDnConf(); + Mockito.doReturn(new DNConf(mockDn)).when(mockDn).getDnConf(); Mockito.doReturn(DataNodeMetrics.create(conf, "fake dn")) .when(mockDn).getMetrics(); @@ -338,7 +338,7 @@ public void testBPInitErrorHandling() throws Exception { new File(TEST_BUILD_DATA, "testBPInitErrorHandling"), "data"); conf.set(DFS_DATANODE_DATA_DIR_KEY, dnDataDir.toURI().toString()); Mockito.doReturn(conf).when(mockDn).getConf(); - Mockito.doReturn(new DNConf(conf)).when(mockDn).getDnConf(); + Mockito.doReturn(new DNConf(mockDn)).when(mockDn).getDnConf(); Mockito.doReturn(DataNodeMetrics.create(conf, "fake dn")). when(mockDn).getMetrics(); final AtomicInteger count = new AtomicInteger(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverLazyPersistHint.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverLazyPersistHint.java index c21cc86783..b2bfe4998a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverLazyPersistHint.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverLazyPersistHint.java @@ -171,12 +171,13 @@ private static DataNode getMockDn(NonLocalLazyPersist nonLocalLazyPersist) conf.setBoolean( DFS_DATANODE_NON_LOCAL_LAZY_PERSIST, nonLocalLazyPersist == NonLocalLazyPersist.ALLOWED); - DNConf dnConf = new DNConf(conf); + DatanodeRegistration mockDnReg = mock(DatanodeRegistration.class); DataNodeMetrics mockMetrics = mock(DataNodeMetrics.class); DataNode mockDn = mock(DataNode.class); - when(mockDn.getDnConf()).thenReturn(dnConf); when(mockDn.getConf()).thenReturn(conf); + DNConf dnConf = new DNConf(mockDn); + when(mockDn.getDnConf()).thenReturn(dnConf); when(mockDn.getMetrics()).thenReturn(mockMetrics); when(mockDn.getDNRegistrationForBP("Dummy-pool")).thenReturn(mockDnReg); return mockDn; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java index a330fbfb27..179b617211 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java @@ -163,11 +163,11 @@ public void setUp() throws IOException { storage = mock(DataStorage.class); this.conf = new Configuration(); this.conf.setLong(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 0); - final DNConf dnConf = new DNConf(conf); when(datanode.getConf()).thenReturn(conf); + final DNConf dnConf = new DNConf(datanode); when(datanode.getDnConf()).thenReturn(dnConf); - final BlockScanner disabledBlockScanner = new BlockScanner(datanode, conf); + final BlockScanner disabledBlockScanner = new BlockScanner(datanode); when(datanode.getBlockScanner()).thenReturn(disabledBlockScanner); final ShortCircuitRegistry shortCircuitRegistry = new ShortCircuitRegistry(conf); @@ -326,7 +326,7 @@ public void testChangeVolumeWithRunningCheckDirs() throws IOException { RoundRobinVolumeChoosingPolicy blockChooser = new RoundRobinVolumeChoosingPolicy<>(); conf.setLong(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1); - final BlockScanner blockScanner = new BlockScanner(datanode, conf); + final BlockScanner blockScanner = new BlockScanner(datanode); final FsVolumeList volumeList = new FsVolumeList( Collections.emptyList(), blockScanner, blockChooser); final List oldVolumes = new ArrayList<>();