From 2e140523d3ccb27809cde4a55e95f7e0006c028f Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Fri, 17 Oct 2014 18:27:42 -0700 Subject: [PATCH] HDFS-7260. Change DFSOutputStream.MAX_PACKETS to be configurable. --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 ++ .../org/apache/hadoop/hdfs/DFSClient.java | 25 ++++++++++--------- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 2 ++ .../apache/hadoop/hdfs/DFSOutputStream.java | 3 +-- 4 files changed, 18 insertions(+), 14 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index fc84c579ae..62201c091a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -971,6 +971,8 @@ Release 2.6.0 - UNRELEASED HDFS-5089. When a LayoutVersion support SNAPSHOT, it must support FSIMAGE_NAME_OPTIMIZATION. (szetszwo) + HDFS-7260. Change DFSOutputStream.MAX_PACKETS to be configurable. (szetszwo) + BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS HDFS-6387. HDFS CLI admin tool for creating & deleting an diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 8a7c8ebcb8..68e355aa93 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -17,9 +17,6 @@ */ package org.apache.hadoop.hdfs; -import static org.apache.hadoop.crypto.key.KeyProvider.KeyVersion; -import static org.apache.hadoop.crypto.key.KeyProviderCryptoExtension - .EncryptedKeyVersion; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CRYPTO_CODEC_CLASSES_KEY_PREFIX; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY; @@ -59,6 +56,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_MAX_PACKETS_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_MAX_PACKETS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY; @@ -72,7 +71,6 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.lang.reflect.Proxy; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.Socket; @@ -89,10 +87,10 @@ import java.util.List; import java.util.Map; import java.util.Random; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import javax.net.SocketFactory; @@ -107,7 +105,9 @@ import org.apache.hadoop.crypto.CryptoOutputStream; import org.apache.hadoop.crypto.CryptoProtocolVersion; import org.apache.hadoop.crypto.key.KeyProvider; +import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion; import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension; +import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.BlockStorageLocation; import org.apache.hadoop.fs.CacheFlag; @@ -135,8 +135,8 @@ import org.apache.hadoop.fs.XAttrSetFlag; import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclStatus; -import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.client.HdfsDataInputStream; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.hdfs.net.Peer; @@ -202,7 +202,6 @@ import org.apache.hadoop.ipc.Client; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RemoteException; -import org.apache.hadoop.ipc.RpcInvocationHandler; import org.apache.hadoop.net.DNS; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.AccessControlException; @@ -217,17 +216,16 @@ import org.apache.hadoop.util.DataChecksum.Type; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Time; +import org.htrace.Sampler; +import org.htrace.Span; +import org.htrace.Trace; +import org.htrace.TraceScope; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.net.InetAddresses; -import org.htrace.Sampler; -import org.htrace.Span; -import org.htrace.Trace; -import org.htrace.TraceScope; -import org.htrace.impl.ProbabilitySampler; /******************************************************** * DFSClient can connect to a Hadoop Filesystem and @@ -294,6 +292,7 @@ public static class Conf { final int ioBufferSize; final ChecksumOpt defaultChecksumOpt; final int writePacketSize; + final int writeMaxPackets; final int socketTimeout; final int socketCacheCapacity; final long socketCacheExpiry; @@ -364,6 +363,8 @@ public Conf(Configuration conf) { /** dfs.write.packet.size is an internal config variable */ writePacketSize = conf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY, DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT); + writeMaxPackets = conf.getInt(DFS_CLIENT_WRITE_MAX_PACKETS_KEY, + DFS_CLIENT_WRITE_MAX_PACKETS_DEFAULT); defaultBlockSize = conf.getLongBytes(DFS_BLOCK_SIZE_KEY, DFS_BLOCK_SIZE_DEFAULT); defaultReplication = (short) conf.getInt( diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 4391578474..eb0a735fa6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -50,6 +50,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT = "10000,6,60000,10"; //t1,n1,t2,n2,... public static final String DFS_CHECKSUM_TYPE_KEY = "dfs.checksum.type"; public static final String DFS_CHECKSUM_TYPE_DEFAULT = "CRC32C"; + public static final String DFS_CLIENT_WRITE_MAX_PACKETS_KEY = "dfs.client.write.max-packets"; + public static final int DFS_CLIENT_WRITE_MAX_PACKETS_DEFAULT = 80; public static final String DFS_CLIENT_WRITE_PACKET_SIZE_KEY = "dfs.client-write-packet-size"; public static final int DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT = 64*1024; public static final String DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_KEY = "dfs.client.block.write.replace-datanode-on-failure.enable"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index e0d412647c..b6b4846d00 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -129,7 +129,6 @@ @InterfaceAudience.Private public class DFSOutputStream extends FSOutputSummer implements Syncable, CanSetDropBehind { - private static final int MAX_PACKETS = 80; // each packet 64K, total 5MB /** * Number of times to retry creating a file when there are transient * errors (typically related to encryption zones and KeyProvider operations). @@ -1783,7 +1782,7 @@ private void waitAndQueueCurrentPacket() throws IOException { synchronized (dataQueue) { try { // If queue is full, then wait till we have enough space - while (!closed && dataQueue.size() + ackQueue.size() > MAX_PACKETS) { + while (!closed && dataQueue.size() + ackQueue.size() > dfsClient.getConf().writeMaxPackets) { try { dataQueue.wait(); } catch (InterruptedException e) {