HDFS-8053. Move DFSIn/OutputStream and related classes to hadoop-hdfs-client. Contributed by Mingliang Liu.

This commit is contained in:
Haohui Mai 2015-09-26 11:08:25 -07:00
parent 861b52db24
commit bf37d3d80e
53 changed files with 173 additions and 118 deletions

View File

@ -32,4 +32,28 @@
<Method name="allocSlot" />
<Bug pattern="UL_UNRELEASED_LOCK" />
</Match>
<Match>
<Class name="org.apache.hadoop.hdfs.DFSInputStream"/>
<Field name="tcpReadsDisabledForTesting"/>
<Bug pattern="MS_SHOULD_BE_FINAL"/>
</Match>
<!--
ResponseProccessor is thread that is designed to catch RuntimeException.
-->
<Match>
<Class name="org.apache.hadoop.hdfs.DataStreamer$ResponseProcessor" />
<Method name="run" />
<Bug pattern="REC_CATCH_EXCEPTION" />
</Match>
<!--
We use a separate lock to guard cachingStrategy in order to separate
locks for p-reads from seek + read invocations.
-->
<Match>
<Class name="org.apache.hadoop.hdfs.DFSInputStream" />
<Field name="cachingStrategy" />
<Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match>
</FindBugsFilter>

View File

@ -31,8 +31,6 @@
import com.google.common.io.ByteArrayDataOutput;
import com.google.common.io.ByteStreams;
import org.apache.commons.lang.mutable.MutableBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
@ -56,7 +54,7 @@
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplicaInfo;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.hdfs.util.IOUtilsClient;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.security.AccessControlException;
@ -69,13 +67,16 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Utility class to create BlockReader implementations.
*/
@InterfaceAudience.Private
public class BlockReaderFactory implements ShortCircuitReplicaCreator {
static final Log LOG = LogFactory.getLog(BlockReaderFactory.class);
static final Logger LOG = LoggerFactory.getLogger(BlockReaderFactory.class);
public static class FailureInjector {
public void injectRequestFileDescriptorsFailure() throws IOException {
@ -551,14 +552,14 @@ public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
if (LOG.isDebugEnabled()) {
LOG.debug(this + ": closing stale domain peer " + peer, e);
}
IOUtils.cleanup(LOG, peer);
IOUtilsClient.cleanup(LOG, peer);
} else {
// Handle an I/O error we got when using a newly created socket.
// We temporarily disable the domain socket path for a few minutes in
// this case, to prevent wasting more time on it.
LOG.warn(this + ": I/O error requesting file descriptors. " +
"Disabling domain socket " + peer.getDomainSocket(), e);
IOUtils.cleanup(LOG, peer);
IOUtilsClient.cleanup(LOG, peer);
clientContext.getDomainSocketFactory()
.disableDomainSocketPath(pathInfo.getPath());
return null;
@ -617,7 +618,7 @@ private ShortCircuitReplicaInfo requestFileDescriptors(DomainPeer peer,
return null;
} finally {
if (replica == null) {
IOUtils.cleanup(DFSClient.LOG, fis[0], fis[1]);
IOUtilsClient.cleanup(DFSClient.LOG, fis[0], fis[1]);
}
}
case ERROR_UNSUPPORTED:
@ -685,7 +686,7 @@ private BlockReader getRemoteBlockReaderFromDomain() throws IOException {
blockReader = getRemoteBlockReader(peer);
return blockReader;
} catch (IOException ioe) {
IOUtils.cleanup(LOG, peer);
IOUtilsClient.cleanup(LOG, peer);
if (isSecurityException(ioe)) {
if (LOG.isTraceEnabled()) {
LOG.trace(this + ": got security exception while constructing " +
@ -712,7 +713,7 @@ private BlockReader getRemoteBlockReaderFromDomain() throws IOException {
}
} finally {
if (blockReader == null) {
IOUtils.cleanup(LOG, peer);
IOUtilsClient.cleanup(LOG, peer);
}
}
}
@ -769,7 +770,7 @@ private BlockReader getRemoteBlockReaderFromTcp() throws IOException {
}
} finally {
if (blockReader == null) {
IOUtils.cleanup(LOG, peer);
IOUtilsClient.cleanup(LOG, peer);
}
}
}

View File

@ -53,8 +53,6 @@
import javax.net.SocketFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
@ -152,6 +150,7 @@
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.hdfs.util.IOUtilsClient;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.IOUtils;
@ -186,6 +185,9 @@
import com.google.common.collect.Lists;
import com.google.common.net.InetAddresses;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/********************************************************
* DFSClient can connect to a Hadoop Filesystem and
* perform basic file tasks. It uses the ClientProtocol
@ -200,7 +202,7 @@
@InterfaceAudience.Private
public class DFSClient implements java.io.Closeable, RemotePeerFactory,
DataEncryptionKeyFactory {
public static final Log LOG = LogFactory.getLog(DFSClient.class);
public static final Logger LOG = LoggerFactory.getLogger(DFSClient.class);
public static final long SERVER_DEFAULTS_VALIDITY_PERIOD = 60 * 60 * 1000L; // 1 hour
private final Configuration conf;
@ -304,7 +306,7 @@ public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode,
this.stats = stats;
this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class);
this.dtpReplaceDatanodeOnFailure = ReplaceDatanodeOnFailure.get(conf);
this.smallBufferSize = DFSUtil.getSmallBufferSize(conf);
this.smallBufferSize = DFSUtilClient.getSmallBufferSize(conf);
this.ugi = UserGroupInformation.getCurrentUser();
@ -317,6 +319,7 @@ public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode,
HdfsClientConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_DEFAULT);
ProxyAndInfo<ClientProtocol> proxyInfo = null;
AtomicBoolean nnFallbackToSimpleAuth = new AtomicBoolean(false);
if (numResponseToDrop > 0) {
// This case is used for testing.
LOG.warn(HdfsClientConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY
@ -728,7 +731,7 @@ public static class Renewer extends TokenRenewer {
static {
//Ensure that HDFS Configuration files are loaded before trying to use
// the renewer.
HdfsConfiguration.init();
HdfsConfigurationLoader.init();
}
@Override
@ -1993,7 +1996,7 @@ private Type inferChecksumTypeByReading(LocatedBlock lb, DatanodeInfo dn)
return PBHelperClient.convert(reply.getReadOpChecksumInfo().getChecksum().getType());
} finally {
IOUtils.cleanup(null, pair.in, pair.out);
IOUtilsClient.cleanup(null, pair.in, pair.out);
}
}
@ -3026,7 +3029,7 @@ public Peer newConnectedPeer(InetSocketAddress addr,
return peer;
} finally {
if (!success) {
IOUtils.cleanup(LOG, peer);
IOUtilsClient.cleanup(LOG, peer);
IOUtils.closeSocket(sock);
}
}
@ -3098,11 +3101,11 @@ public void setKeyProvider(KeyProvider provider) {
/**
* Probe for encryption enabled on this filesystem.
* See {@link DFSUtil#isHDFSEncryptionEnabled(Configuration)}
* See {@link DFSUtilClient#isHDFSEncryptionEnabled(Configuration)}
* @return true if encryption is enabled
*/
public boolean isHDFSEncryptionEnabled() {
return DFSUtil.isHDFSEncryptionEnabled(this.conf);
return DFSUtilClient.isHDFSEncryptionEnabled(this.conf);
}
/**

View File

@ -30,12 +30,15 @@
@VisibleForTesting
@InterfaceAudience.Private
public class DFSClientFaultInjector {
public static DFSClientFaultInjector instance = new DFSClientFaultInjector();
private static DFSClientFaultInjector instance = new DFSClientFaultInjector();
public static AtomicLong exceptionNum = new AtomicLong(0);
public static DFSClientFaultInjector get() {
return instance;
}
public static void set(DFSClientFaultInjector instance) {
DFSClientFaultInjector.instance = instance;
}
public boolean corruptPacket() {
return false;

View File

@ -44,8 +44,8 @@
@InterfaceAudience.Public
@InterfaceStability.Unstable
public class DFSInotifyEventInputStream {
public static Logger LOG = LoggerFactory.getLogger(DFSInotifyEventInputStream
.class);
public static final Logger LOG = LoggerFactory.getLogger(
DFSInotifyEventInputStream.class);
/**
* The trace sampler to use when making RPCs to the NameNode.

View File

@ -25,6 +25,7 @@
import org.apache.hadoop.crypto.key.KeyProvider;
import org.apache.hadoop.crypto.key.KeyProviderFactory;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
@ -590,6 +591,29 @@ public static Peer peerFromSocketAndKey(
}
}
public static int getIoFileBufferSize(Configuration conf) {
return conf.getInt(
CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
}
public static int getSmallBufferSize(Configuration conf) {
return Math.min(getIoFileBufferSize(conf) / 2, 512);
}
/**
* Probe for HDFS Encryption being enabled; this uses the value of
* the option {@link HdfsClientConfigKeys#DFS_ENCRYPTION_KEY_PROVIDER_URI},
* returning true if that property contains a non-empty, non-whitespace
* string.
* @param conf configuration to probe
* @return true if encryption is considered enabled.
*/
public static boolean isHDFSEncryptionEnabled(Configuration conf) {
return !conf.getTrimmed(
HdfsClientConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI, "").isEmpty();
}
public static InetSocketAddress getNNAddress(String address) {
return NetUtils.createSocketAddr(address,
HdfsClientConfigKeys.DFS_NAMENODE_RPC_PORT_DEFAULT);

View File

@ -40,8 +40,6 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.BlockWrite;
@ -94,6 +92,9 @@
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/*********************************************************************
*
* The DataStreamer class is responsible for sending data packets to the
@ -117,7 +118,7 @@
@InterfaceAudience.Private
class DataStreamer extends Daemon {
static final Log LOG = LogFactory.getLog(DataStreamer.class);
static final Logger LOG = LoggerFactory.getLogger(DataStreamer.class);
/**
* Create a socket for a write pipeline
@ -1229,7 +1230,7 @@ private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets,
unbufOut = saslStreams.out;
unbufIn = saslStreams.in;
out = new DataOutputStream(new BufferedOutputStream(unbufOut,
DFSUtil.getSmallBufferSize(dfsClient.getConfiguration())));
DFSUtilClient.getSmallBufferSize(dfsClient.getConfiguration())));
in = new DataInputStream(unbufIn);
//send the TRANSFER_BLOCK request
@ -1494,7 +1495,7 @@ private boolean createBlockOutputStream(DatanodeInfo[] nodes,
unbufOut = saslStreams.out;
unbufIn = saslStreams.in;
out = new DataOutputStream(new BufferedOutputStream(unbufOut,
DFSUtil.getSmallBufferSize(dfsClient.getConfiguration())));
DFSUtilClient.getSmallBufferSize(dfsClient.getConfiguration())));
blockReplyStream = new DataInputStream(unbufIn);
//

View File

@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
/**
* Load default HDFS configuration resources.
*/
@InterfaceAudience.Private
class HdfsConfigurationLoader {
static {
// adds the default resources
Configuration.addDefaultResource("hdfs-default.xml");
Configuration.addDefaultResource("hdfs-site.xml");
}
/**
* This method is here so that when invoked, default resources are added if
* they haven't already been previously loaded. Upon loading this class, the
* static initializer block above will be executed to add the default
* resources. It is safe for this method to be called multiple times
* as the static initializer block will only get invoked once.
*/
public static void init() {
}
}

View File

@ -27,8 +27,6 @@
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.DFSClient;
@ -39,6 +37,8 @@
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* <p>
@ -73,7 +73,7 @@
*/
@InterfaceAudience.Private
public class LeaseRenewer {
static final Log LOG = LogFactory.getLog(LeaseRenewer.class);
static final Logger LOG = LoggerFactory.getLogger(LeaseRenewer.class);
static final long LEASE_RENEWER_GRACE_DEFAULT = 60*1000L;
static final long LEASE_RENEWER_SLEEP_DEFAULT = 1000L;

View File

@ -977,6 +977,9 @@ Release 2.8.0 - UNRELEASED
HDFS-8873. Allow the directoryScanner to be rate-limited (Daniel Templeton
via Colin P. McCabe)
HDFS-8053. Move DFSIn/OutputStream and related classes to
hadoop-hdfs-client. (Mingliang Liu via wheat9)
OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -73,15 +73,6 @@
<Bug pattern="OBL_UNSATISFIED_OBLIGATION" />
</Match>
<!--
ResponseProccessor is thread that is designed to catch RuntimeException.
-->
<Match>
<Class name="org.apache.hadoop.hdfs.DataStreamer$ResponseProcessor" />
<Method name="run" />
<Bug pattern="REC_CATCH_EXCEPTION" />
</Match>
<!--
lastAppliedTxid is carefully unsynchronized in the BackupNode in a couple spots.
See the comments in BackupImage for justification.
@ -196,14 +187,4 @@
<Method name="assertAllResultsEqual" />
<Bug pattern="NP_LOAD_OF_KNOWN_NULL_VALUE" />
</Match>
<!--
We use a separate lock to guard cachingStrategy in order to separate
locks for p-reads from seek + read invocations.
-->
<Match>
<Class name="org.apache.hadoop.hdfs.DFSInputStream" />
<Field name="cachingStrategy" />
<Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match>
</FindBugsFilter>

View File

@ -18,8 +18,6 @@
package org.apache.hadoop.hdfs;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;

View File

@ -67,7 +67,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.key.KeyProvider;
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@ -1441,27 +1440,4 @@ public static KeyProviderCryptoExtension createKeyProviderCryptoExtension(
return cryptoProvider;
}
public static int getIoFileBufferSize(Configuration conf) {
return conf.getInt(
CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
}
public static int getSmallBufferSize(Configuration conf) {
return Math.min(getIoFileBufferSize(conf) / 2, 512);
}
/**
* Probe for HDFS Encryption being enabled; this uses the value of
* the option {@link DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI},
* returning true if that property contains a non-empty, non-whitespace
* string.
* @param conf configuration to probe
* @return true if encryption is considered enabled.
*/
public static boolean isHDFSEncryptionEnabled(Configuration conf) {
return !conf.getTrimmed(
DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI, "").isEmpty();
}
}

View File

@ -31,9 +31,7 @@ public class HdfsConfiguration extends Configuration {
addDeprecatedKeys();
// adds the default resources
Configuration.addDefaultResource("hdfs-default.xml");
Configuration.addDefaultResource("hdfs-site.xml");
HdfsConfigurationLoader.init();
}
public HdfsConfiguration() {
@ -52,8 +50,9 @@ public HdfsConfiguration(Configuration conf) {
* This method is here so that when invoked, HdfsConfiguration is class-loaded if
* it hasn't already been previously loaded. Upon loading the class, the static
* initializer block above will be executed to add the deprecated keys and to add
* the default resources. It is safe for this method to be called multiple times
* as the static initializer block will only get invoked once.
* the default resources via {@link HdfsConfigurationLoader#init()}. It is
* safe for this method to be called multiple times as the static initializer
* block will only get invoked once.
*
* This replaces the previously, dangerous practice of other classes calling
* Configuration.addDefaultResource("hdfs-default.xml") directly without loading

View File

@ -47,6 +47,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@ -877,7 +878,7 @@ public Dispatcher(NameNodeConnector nnc, Set<String> includedNodes,
this.saslClient = new SaslDataTransferClient(conf,
DataTransferSaslUtil.getSaslPropertiesResolver(conf),
TrustedChannelResolver.getInstance(conf), nnc.fallbackToSimpleAuth);
this.ioFileBufferSize = DFSUtil.getIoFileBufferSize(conf);
this.ioFileBufferSize = DFSUtilClient.getIoFileBufferSize(conf);
}
public DistributedFileSystem getDistributedFileSystem() {

View File

@ -38,7 +38,7 @@
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FSOutputSummer;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
@ -248,7 +248,7 @@ class BlockReceiver implements Closeable {
out.getClass());
}
this.checksumOut = new DataOutputStream(new BufferedOutputStream(
streams.getChecksumOut(), DFSUtil.getSmallBufferSize(
streams.getChecksumOut(), DFSUtilClient.getSmallBufferSize(
datanode.getConf())));
// write data chunk header if creating a new replica
if (isCreate) {

View File

@ -34,7 +34,7 @@
import org.apache.commons.logging.Log;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
@ -111,7 +111,7 @@ class BlockSender implements java.io.Closeable {
private static final int IO_FILE_BUFFER_SIZE;
static {
HdfsConfiguration conf = new HdfsConfiguration();
IO_FILE_BUFFER_SIZE = DFSUtil.getIoFileBufferSize(conf);
IO_FILE_BUFFER_SIZE = DFSUtilClient.getIoFileBufferSize(conf);
}
private static final int TRANSFERTO_BUFFER_SIZE = Math.max(
IO_FILE_BUFFER_SIZE, MIN_BUFFER_WITH_TRANSFERTO);

View File

@ -108,6 +108,7 @@
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.HDFSPolicyProvider;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.client.BlockReportOptions;
@ -2156,7 +2157,7 @@ public void run() {
unbufIn = saslStreams.in;
out = new DataOutputStream(new BufferedOutputStream(unbufOut,
DFSUtil.getSmallBufferSize(conf)));
DFSUtilClient.getSmallBufferSize(conf)));
in = new DataInputStream(unbufIn);
blockSender = new BlockSender(b, 0, b.getNumBytes(),
false, false, true, DataNode.this, null, cachingStrategy);

View File

@ -49,7 +49,7 @@
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@ -134,8 +134,8 @@ private DataXceiver(Peer peer, DataNode datanode,
this.datanode = datanode;
this.dataXceiverServer = dataXceiverServer;
this.connectToDnViaHostname = datanode.getDnConf().connectToDnViaHostname;
this.ioFileBufferSize = DFSUtil.getIoFileBufferSize(datanode.getConf());
this.smallBufferSize = DFSUtil.getSmallBufferSize(datanode.getConf());
this.ioFileBufferSize = DFSUtilClient.getIoFileBufferSize(datanode.getConf());
this.smallBufferSize = DFSUtilClient.getSmallBufferSize(datanode.getConf());
remoteAddress = peer.getRemoteAddressString();
final int colonIdx = remoteAddress.indexOf(':');
remoteAddressWithoutPort =

View File

@ -38,7 +38,7 @@
import org.apache.hadoop.fs.DU;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica;
@ -111,7 +111,7 @@ class BlockPoolSlice {
}
}
this.ioFileBufferSize = DFSUtil.getIoFileBufferSize(conf);
this.ioFileBufferSize = DFSUtilClient.getIoFileBufferSize(conf);
this.deleteDuplicateReplicas = conf.getBoolean(
DFSConfigKeys.DFS_DATANODE_DUPLICATE_REPLICA_DELETION,

View File

@ -57,7 +57,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
@ -263,7 +263,7 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b)
this.datanode = datanode;
this.dataStorage = storage;
this.conf = conf;
this.smallBufferSize = DFSUtil.getSmallBufferSize(conf);
this.smallBufferSize = DFSUtilClient.getSmallBufferSize(conf);
// The number of volumes required for operation is the total number
// of volumes minus the number of failed volumes we can tolerate.
volFailuresTolerated =
@ -956,7 +956,7 @@ private static void computeChecksum(File srcMeta, File dstMeta,
File blockFile, int smallBufferSize, final Configuration conf)
throws IOException {
final DataChecksum checksum = BlockMetadataHeader.readDataChecksum(srcMeta,
DFSUtil.getIoFileBufferSize(conf));
DFSUtilClient.getIoFileBufferSize(conf));
final byte[] data = new byte[1 << 16];
final byte[] crcs = new byte[checksum.getChecksumSize(data.length)];

View File

@ -21,7 +21,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
@ -240,7 +240,7 @@ public void run() {
boolean succeeded = false;
final FsDatasetImpl dataset = (FsDatasetImpl)datanode.getFSDataset();
try (FsVolumeReference ref = this.targetVolume) {
int smallBufferSize = DFSUtil.getSmallBufferSize(EMPTY_HDFS_CONF);
int smallBufferSize = DFSUtilClient.getSmallBufferSize(EMPTY_HDFS_CONF);
// No FsDatasetImpl lock for the file copy
File targetFiles[] = FsDatasetImpl.copyBlockFiles(
blockId, genStamp, metaFile, blockFile, lazyPersistDir, true,

View File

@ -43,7 +43,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
@ -115,7 +115,7 @@ public static TransferResult getResultForCode(int code){
connectionFactory = URLConnectionFactory
.newDefaultURLConnectionFactory(conf);
isSpnegoEnabled = UserGroupInformation.isSecurityEnabled();
IO_FILE_BUFFER_SIZE = DFSUtil.getIoFileBufferSize(conf);
IO_FILE_BUFFER_SIZE = DFSUtilClient.getIoFileBufferSize(conf);
}
private static final Log LOG = LogFactory.getLog(TransferFsImage.class);

View File

@ -20,7 +20,6 @@
import java.io.IOException;
import java.util.Random;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fi.DataTransferTestUtil;
import org.apache.hadoop.fi.DataTransferTestUtil.CountdownDoosAction;

View File

@ -982,7 +982,7 @@ public static BlockOpResponseProto transferRbw(final ExtendedBlock b,
final long writeTimeout = dfsClient.getDatanodeWriteTimeout(datanodes.length);
final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
NetUtils.getOutputStream(s, writeTimeout),
DFSUtil.getSmallBufferSize(dfsClient.getConfiguration())));
DFSUtilClient.getSmallBufferSize(dfsClient.getConfiguration())));
final DataInputStream in = new DataInputStream(NetUtils.getInputStream(s));
// send the request

View File

@ -28,8 +28,8 @@
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.apache.hadoop.net.NetUtils;

View File

@ -36,7 +36,6 @@
import org.apache.hadoop.hdfs.protocol.*;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.server.blockmanagement.*;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.NameNode;

View File

@ -124,8 +124,8 @@ public class TestClientProtocolForPipelineRecovery {
public void testPipelineRecoveryForLastBlock() throws IOException {
DFSClientFaultInjector faultInjector
= Mockito.mock(DFSClientFaultInjector.class);
DFSClientFaultInjector oldInjector = DFSClientFaultInjector.instance;
DFSClientFaultInjector.instance = faultInjector;
DFSClientFaultInjector oldInjector = DFSClientFaultInjector.get();
DFSClientFaultInjector.set(faultInjector);
Configuration conf = new HdfsConfiguration();
conf.setInt(HdfsClientConfigKeys.BlockWrite.LOCATEFOLLOWINGBLOCK_RETRIES_KEY, 3);
@ -153,7 +153,7 @@ public void testPipelineRecoveryForLastBlock() throws IOException {
+ " corrupt replicas.");
}
} finally {
DFSClientFaultInjector.instance = oldInjector;
DFSClientFaultInjector.set(oldInjector);
if (cluster != null) {
cluster.shutdown();
}

View File

@ -76,7 +76,7 @@ public class TestCrcCorruption {
@Before
public void setUp() throws IOException {
faultInjector = Mockito.mock(DFSClientFaultInjector.class);
DFSClientFaultInjector.instance = faultInjector;
DFSClientFaultInjector.set(faultInjector);
}
/**

View File

@ -903,16 +903,16 @@ public void testEncryptionProbe() throws Throwable {
Configuration conf = new Configuration(false);
conf.unset(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI);
assertFalse("encryption enabled on no provider key",
DFSUtil.isHDFSEncryptionEnabled(conf));
DFSUtilClient.isHDFSEncryptionEnabled(conf));
conf.set(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI, "");
assertFalse("encryption enabled on empty provider key",
DFSUtil.isHDFSEncryptionEnabled(conf));
DFSUtilClient.isHDFSEncryptionEnabled(conf));
conf.set(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI, "\n\t\n");
assertFalse("encryption enabled on whitespace provider key",
DFSUtil.isHDFSEncryptionEnabled(conf));
DFSUtilClient.isHDFSEncryptionEnabled(conf));
conf.set(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI, "http://hadoop.apache.org");
assertTrue("encryption disabled on valid provider key",
DFSUtil.isHDFSEncryptionEnabled(conf));
DFSUtilClient.isHDFSEncryptionEnabled(conf));
}
}

View File

@ -292,9 +292,8 @@ public void testHedgedReadLoopTooManyTimes() throws IOException {
hedgedReadTimeoutMillis);
conf.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 0);
// Set up the InjectionHandler
DFSClientFaultInjector.instance = Mockito
.mock(DFSClientFaultInjector.class);
DFSClientFaultInjector injector = DFSClientFaultInjector.instance;
DFSClientFaultInjector.set(Mockito.mock(DFSClientFaultInjector.class));
DFSClientFaultInjector injector = DFSClientFaultInjector.get();
final int sleepMs = 100;
Mockito.doAnswer(new Answer<Void>() {
@Override
@ -368,9 +367,8 @@ public void testMaxOutHedgedReadPool() throws IOException,
initialHedgedReadTimeoutMillis);
// Set up the InjectionHandler
DFSClientFaultInjector.instance = Mockito
.mock(DFSClientFaultInjector.class);
DFSClientFaultInjector injector = DFSClientFaultInjector.instance;
DFSClientFaultInjector.set(Mockito.mock(DFSClientFaultInjector.class));
DFSClientFaultInjector injector = DFSClientFaultInjector.get();
// make preads sleep for 50ms
Mockito.doAnswer(new Answer<Void>() {
@Override