From 504b801ca0e7fd3944872d3214539feb2d614f06 Mon Sep 17 00:00:00 2001 From: Eli Collins Date: Fri, 12 Aug 2011 19:57:15 +0000 Subject: [PATCH] HDFS-73. DFSOutputStream does not close all the sockets. Contributed by Uma Maheswara Rao G git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1157232 13f79535-47bb-0310-9956-ffa450edef68 --- hdfs/CHANGES.txt | 3 +++ .../apache/hadoop/hdfs/DFSOutputStream.java | 26 +++++++++++++++---- 2 files changed, 24 insertions(+), 5 deletions(-) diff --git a/hdfs/CHANGES.txt b/hdfs/CHANGES.txt index 5148ea243d..02c8864284 100644 --- a/hdfs/CHANGES.txt +++ b/hdfs/CHANGES.txt @@ -964,6 +964,9 @@ Trunk (unreleased changes) HDFS-2240. Fix a deadlock in LeaseRenewer by enforcing lock acquisition ordering. (szetszwo) + HDFS-73. DFSOutputStream does not close all the sockets. + (Uma Maheswara Rao G via eli) + BREAKDOWN OF HDFS-1073 SUBTASKS HDFS-1521. Persist transaction ID on disk between NN restarts. diff --git a/hdfs/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hdfs/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java index d267bf8a8d..03879338dd 100644 --- a/hdfs/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hdfs/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -36,7 +36,6 @@ import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FSOutputSummer; import org.apache.hadoop.fs.FileAlreadyExistsException; @@ -63,7 +62,6 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; -import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException; import org.apache.hadoop.hdfs.server.namenode.SafeModeException; import org.apache.hadoop.io.EnumSetWritable; @@ -606,6 +604,7 @@ private void closeStream() { try { blockStream.close(); } catch (IOException e) { + setLastException(e); } finally { blockStream = null; } @@ -614,10 +613,20 @@ private void closeStream() { try { blockReplyStream.close(); } catch (IOException e) { + setLastException(e); } finally { blockReplyStream = null; } } + if (null != s) { + try { + s.close(); + } catch (IOException e) { + setLastException(e); + } finally { + s = null; + } + } } // @@ -1003,16 +1012,20 @@ private boolean createBlockOutputStream(DatanodeInfo[] nodes, long newGS, persistBlocks.set(true); boolean result = false; + DataOutputStream out = null; try { + assert null == s : "Previous socket unclosed"; s = createSocketForPipeline(nodes[0], nodes.length, dfsClient); long writeTimeout = dfsClient.getDatanodeWriteTimeout(nodes.length); // // Xmit header info to datanode // - DataOutputStream out = new DataOutputStream(new BufferedOutputStream( + out = new DataOutputStream(new BufferedOutputStream( NetUtils.getOutputStream(s, writeTimeout), FSConstants.SMALL_BUFFER_SIZE)); + + assert null == blockReplyStream : "Previous blockReplyStream unclosed"; blockReplyStream = new DataInputStream(NetUtils.getInputStream(s)); // send the request @@ -1038,7 +1051,7 @@ private boolean createBlockOutputStream(DatanodeInfo[] nodes, long newGS, + firstBadLink); } } - + assert null == blockStream : "Previous blockStream unclosed"; blockStream = out; result = true; // success @@ -1059,12 +1072,15 @@ private boolean createBlockOutputStream(DatanodeInfo[] nodes, long newGS, } hasError = true; setLastException(ie); - blockReplyStream = null; result = false; // error } finally { if (!result) { IOUtils.closeSocket(s); s = null; + IOUtils.closeStream(out); + out = null; + IOUtils.closeStream(blockReplyStream); + blockReplyStream = null; } } return result;