HDFS-3319. Change DFSOutputStream to not to start a thread in constructors.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1330535 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2012-04-25 20:17:20 +00:00
parent 2584779166
commit 3b773da036
4 changed files with 39 additions and 24 deletions

View File

@ -556,6 +556,9 @@ Release 2.0.0 - UNRELEASED
HDFS-3314. HttpFS operation for getHomeDirectory is incorrect. (tucu) HDFS-3314. HttpFS operation for getHomeDirectory is incorrect. (tucu)
HDFS-3319. Change DFSOutputStream to not to start a thread in constructors.
(szetszwo)
BREAKDOWN OF HDFS-1623 SUBTASKS BREAKDOWN OF HDFS-1623 SUBTASKS
HDFS-2179. Add fencing framework and mechanisms for NameNode HA. (todd) HDFS-2179. Add fencing framework and mechanisms for NameNode HA. (todd)

View File

@ -1046,9 +1046,9 @@ public DFSOutputStream create(String src,
if(LOG.isDebugEnabled()) { if(LOG.isDebugEnabled()) {
LOG.debug(src + ": masked=" + masked); LOG.debug(src + ": masked=" + masked);
} }
final DFSOutputStream result = new DFSOutputStream(this, src, masked, flag, final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,
createParent, replication, blockSize, progress, buffersize, src, masked, flag, createParent, replication, blockSize, progress,
dfsClientConf.createChecksum()); buffersize, dfsClientConf.createChecksum());
leaserenewer.put(src, result, this); leaserenewer.put(src, result, this);
return result; return result;
} }
@ -1095,7 +1095,7 @@ public DFSOutputStream primitiveCreate(String src,
DataChecksum checksum = DataChecksum.newDataChecksum( DataChecksum checksum = DataChecksum.newDataChecksum(
dfsClientConf.checksumType, dfsClientConf.checksumType,
bytesPerChecksum); bytesPerChecksum);
result = new DFSOutputStream(this, src, absPermission, result = DFSOutputStream.newStreamForCreate(this, src, absPermission,
flag, createParent, replication, blockSize, progress, buffersize, flag, createParent, replication, blockSize, progress, buffersize,
checksum); checksum);
} }
@ -1154,7 +1154,7 @@ private DFSOutputStream callAppend(HdfsFileStatus stat, String src,
UnsupportedOperationException.class, UnsupportedOperationException.class,
UnresolvedPathException.class); UnresolvedPathException.class);
} }
return new DFSOutputStream(this, src, buffersize, progress, return DFSOutputStream.newStreamForAppend(this, src, buffersize, progress,
lastBlock, stat, dfsClientConf.createChecksum()); lastBlock, stat, dfsClientConf.createChecksum());
} }

View File

@ -100,7 +100,7 @@
* starts sending packets from the dataQueue. * starts sending packets from the dataQueue.
****************************************************************/ ****************************************************************/
@InterfaceAudience.Private @InterfaceAudience.Private
public final class DFSOutputStream extends FSOutputSummer implements Syncable { public class DFSOutputStream extends FSOutputSummer implements Syncable {
private final DFSClient dfsClient; private final DFSClient dfsClient;
private static final int MAX_PACKETS = 80; // each packet 64K, total 5MB private static final int MAX_PACKETS = 80; // each packet 64K, total 5MB
private Socket s; private Socket s;
@ -1234,14 +1234,11 @@ private DFSOutputStream(DFSClient dfsClient, String src, long blockSize, Progres
this.checksum = checksum; this.checksum = checksum;
} }
/** /** Construct a new output stream for creating a file. */
* Create a new output stream to the given DataNode. private DFSOutputStream(DFSClient dfsClient, String src, FsPermission masked,
* @see ClientProtocol#create(String, FsPermission, String, EnumSetWritable, boolean, short, long) EnumSet<CreateFlag> flag, boolean createParent, short replication,
*/ long blockSize, Progressable progress, int buffersize,
DFSOutputStream(DFSClient dfsClient, String src, FsPermission masked, EnumSet<CreateFlag> flag, DataChecksum checksum) throws IOException {
boolean createParent, short replication, long blockSize, Progressable progress,
int buffersize, DataChecksum checksum)
throws IOException {
this(dfsClient, src, blockSize, progress, checksum, replication); this(dfsClient, src, blockSize, progress, checksum, replication);
computePacketChunkSize(dfsClient.getConf().writePacketSize, computePacketChunkSize(dfsClient.getConf().writePacketSize,
@ -1261,14 +1258,21 @@ private DFSOutputStream(DFSClient dfsClient, String src, long blockSize, Progres
UnresolvedPathException.class); UnresolvedPathException.class);
} }
streamer = new DataStreamer(); streamer = new DataStreamer();
streamer.start();
} }
/** static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,
* Create a new output stream to the given DataNode. FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent,
* @see ClientProtocol#create(String, FsPermission, String, boolean, short, long) short replication, long blockSize, Progressable progress, int buffersize,
*/ DataChecksum checksum) throws IOException {
DFSOutputStream(DFSClient dfsClient, String src, int buffersize, Progressable progress, final DFSOutputStream out = new DFSOutputStream(dfsClient, src, masked,
flag, createParent, replication, blockSize, progress, buffersize,
checksum);
out.streamer.start();
return out;
}
/** Construct a new output stream for append. */
private DFSOutputStream(DFSClient dfsClient, String src, int buffersize, Progressable progress,
LocatedBlock lastBlock, HdfsFileStatus stat, LocatedBlock lastBlock, HdfsFileStatus stat,
DataChecksum checksum) throws IOException { DataChecksum checksum) throws IOException {
this(dfsClient, src, stat.getBlockSize(), progress, checksum, stat.getReplication()); this(dfsClient, src, stat.getBlockSize(), progress, checksum, stat.getReplication());
@ -1286,7 +1290,15 @@ private DFSOutputStream(DFSClient dfsClient, String src, long blockSize, Progres
checksum.getBytesPerChecksum()); checksum.getBytesPerChecksum());
streamer = new DataStreamer(); streamer = new DataStreamer();
} }
streamer.start(); }
static DFSOutputStream newStreamForAppend(DFSClient dfsClient, String src,
int buffersize, Progressable progress, LocatedBlock lastBlock,
HdfsFileStatus stat, DataChecksum checksum) throws IOException {
final DFSOutputStream out = new DFSOutputStream(dfsClient, src, buffersize,
progress, lastBlock, stat, checksum);
out.streamer.start();
return out;
} }
private void computePacketChunkSize(int psize, int csize) { private void computePacketChunkSize(int psize, int csize) {

View File

@ -99,7 +99,7 @@ public void testClientName() throws IOException {
clientName.startsWith("DFSClient_NONMAPREDUCE_")); clientName.startsWith("DFSClient_NONMAPREDUCE_"));
} }
// @Test @Test
public void testRenewal() throws Exception { public void testRenewal() throws Exception {
// Keep track of how many times the lease gets renewed // Keep track of how many times the lease gets renewed
final AtomicInteger leaseRenewalCount = new AtomicInteger(); final AtomicInteger leaseRenewalCount = new AtomicInteger();
@ -135,7 +135,7 @@ public Boolean answer(InvocationOnMock invocation) throws Throwable {
* to several DFSClients with the same name, the first of which has no files * to several DFSClients with the same name, the first of which has no files
* open. Previously, this was causing the lease to not get renewed. * open. Previously, this was causing the lease to not get renewed.
*/ */
// @Test @Test
public void testManyDfsClientsWhereSomeNotOpen() throws Exception { public void testManyDfsClientsWhereSomeNotOpen() throws Exception {
// First DFSClient has no files open so doesn't renew leases. // First DFSClient has no files open so doesn't renew leases.
final DFSClient mockClient1 = createMockClient(); final DFSClient mockClient1 = createMockClient();
@ -181,7 +181,7 @@ public Boolean get() {
renewer.closeFile(filePath, mockClient2); renewer.closeFile(filePath, mockClient2);
} }
// @Test @Test
public void testThreadName() throws Exception { public void testThreadName() throws Exception {
DFSOutputStream mockStream = Mockito.mock(DFSOutputStream.class); DFSOutputStream mockStream = Mockito.mock(DFSOutputStream.class);
String filePath = "/foo"; String filePath = "/foo";