diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 074a16a915..56e2657c4b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -556,6 +556,9 @@ Release 2.0.0 - UNRELEASED HDFS-3314. HttpFS operation for getHomeDirectory is incorrect. (tucu) + HDFS-3319. Change DFSOutputStream to not to start a thread in constructors. + (szetszwo) + BREAKDOWN OF HDFS-1623 SUBTASKS HDFS-2179. Add fencing framework and mechanisms for NameNode HA. (todd) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 2c042ffbc4..43b1ba6fb8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -1046,9 +1046,9 @@ public DFSOutputStream create(String src, if(LOG.isDebugEnabled()) { LOG.debug(src + ": masked=" + masked); } - final DFSOutputStream result = new DFSOutputStream(this, src, masked, flag, - createParent, replication, blockSize, progress, buffersize, - dfsClientConf.createChecksum()); + final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this, + src, masked, flag, createParent, replication, blockSize, progress, + buffersize, dfsClientConf.createChecksum()); leaserenewer.put(src, result, this); return result; } @@ -1095,7 +1095,7 @@ public DFSOutputStream primitiveCreate(String src, DataChecksum checksum = DataChecksum.newDataChecksum( dfsClientConf.checksumType, bytesPerChecksum); - result = new DFSOutputStream(this, src, absPermission, + result = DFSOutputStream.newStreamForCreate(this, src, absPermission, flag, createParent, replication, blockSize, progress, buffersize, checksum); } @@ -1154,7 +1154,7 @@ private DFSOutputStream callAppend(HdfsFileStatus stat, String src, UnsupportedOperationException.class, UnresolvedPathException.class); } - return new DFSOutputStream(this, src, buffersize, progress, + return DFSOutputStream.newStreamForAppend(this, src, buffersize, progress, lastBlock, stat, dfsClientConf.createChecksum()); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index 42c41dd601..fd09882be9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -100,7 +100,7 @@ * starts sending packets from the dataQueue. ****************************************************************/ @InterfaceAudience.Private -public final class DFSOutputStream extends FSOutputSummer implements Syncable { +public class DFSOutputStream extends FSOutputSummer implements Syncable { private final DFSClient dfsClient; private static final int MAX_PACKETS = 80; // each packet 64K, total 5MB private Socket s; @@ -1234,14 +1234,11 @@ private DFSOutputStream(DFSClient dfsClient, String src, long blockSize, Progres this.checksum = checksum; } - /** - * Create a new output stream to the given DataNode. - * @see ClientProtocol#create(String, FsPermission, String, EnumSetWritable, boolean, short, long) - */ - DFSOutputStream(DFSClient dfsClient, String src, FsPermission masked, EnumSet flag, - boolean createParent, short replication, long blockSize, Progressable progress, - int buffersize, DataChecksum checksum) - throws IOException { + /** Construct a new output stream for creating a file. */ + private DFSOutputStream(DFSClient dfsClient, String src, FsPermission masked, + EnumSet flag, boolean createParent, short replication, + long blockSize, Progressable progress, int buffersize, + DataChecksum checksum) throws IOException { this(dfsClient, src, blockSize, progress, checksum, replication); computePacketChunkSize(dfsClient.getConf().writePacketSize, @@ -1261,14 +1258,21 @@ private DFSOutputStream(DFSClient dfsClient, String src, long blockSize, Progres UnresolvedPathException.class); } streamer = new DataStreamer(); - streamer.start(); } - /** - * Create a new output stream to the given DataNode. - * @see ClientProtocol#create(String, FsPermission, String, boolean, short, long) - */ - DFSOutputStream(DFSClient dfsClient, String src, int buffersize, Progressable progress, + static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src, + FsPermission masked, EnumSet flag, boolean createParent, + short replication, long blockSize, Progressable progress, int buffersize, + DataChecksum checksum) throws IOException { + final DFSOutputStream out = new DFSOutputStream(dfsClient, src, masked, + flag, createParent, replication, blockSize, progress, buffersize, + checksum); + out.streamer.start(); + return out; + } + + /** Construct a new output stream for append. */ + private DFSOutputStream(DFSClient dfsClient, String src, int buffersize, Progressable progress, LocatedBlock lastBlock, HdfsFileStatus stat, DataChecksum checksum) throws IOException { this(dfsClient, src, stat.getBlockSize(), progress, checksum, stat.getReplication()); @@ -1286,7 +1290,15 @@ private DFSOutputStream(DFSClient dfsClient, String src, long blockSize, Progres checksum.getBytesPerChecksum()); streamer = new DataStreamer(); } - streamer.start(); + } + + static DFSOutputStream newStreamForAppend(DFSClient dfsClient, String src, + int buffersize, Progressable progress, LocatedBlock lastBlock, + HdfsFileStatus stat, DataChecksum checksum) throws IOException { + final DFSOutputStream out = new DFSOutputStream(dfsClient, src, buffersize, + progress, lastBlock, stat, checksum); + out.streamer.start(); + return out; } private void computePacketChunkSize(int psize, int csize) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRenewer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRenewer.java index f931b8e169..1bdb497927 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRenewer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRenewer.java @@ -99,7 +99,7 @@ public void testClientName() throws IOException { clientName.startsWith("DFSClient_NONMAPREDUCE_")); } -// @Test + @Test public void testRenewal() throws Exception { // Keep track of how many times the lease gets renewed final AtomicInteger leaseRenewalCount = new AtomicInteger(); @@ -135,7 +135,7 @@ public Boolean answer(InvocationOnMock invocation) throws Throwable { * to several DFSClients with the same name, the first of which has no files * open. Previously, this was causing the lease to not get renewed. */ -// @Test + @Test public void testManyDfsClientsWhereSomeNotOpen() throws Exception { // First DFSClient has no files open so doesn't renew leases. final DFSClient mockClient1 = createMockClient(); @@ -181,7 +181,7 @@ public Boolean get() { renewer.closeFile(filePath, mockClient2); } -// @Test + @Test public void testThreadName() throws Exception { DFSOutputStream mockStream = Mockito.mock(DFSOutputStream.class); String filePath = "/foo";