HDFS-7696. In FsDatasetImpl, the getBlockInputStream(..) and getTmpInputStreams(..) methods may leak file descriptors.
This commit is contained in:
parent
df01337b80
commit
d085eb15d7
@ -821,6 +821,9 @@ Release 2.7.0 - UNRELEASED
|
||||
HDFS-7423. various typos and message formatting fixes in nfs daemon and
|
||||
doc. (Charles Lamb via yliu)
|
||||
|
||||
HDFS-7696. In FsDatasetImpl, the getBlockInputStream(..) and
|
||||
getTmpInputStreams(..) methods may leak file descriptors. (szetszwo)
|
||||
|
||||
Release 2.6.1 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -18,8 +18,6 @@
|
||||
package org.apache.hadoop.hdfs.server.datanode.fsdataset;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.FileDescriptor;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.InputStream;
|
||||
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
|
@ -46,15 +46,13 @@
|
||||
import javax.management.ObjectName;
|
||||
import javax.management.StandardMBean;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.ExtendedBlockId;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.ExtendedBlockId;
|
||||
import org.apache.hadoop.hdfs.StorageType;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
||||
@ -67,7 +65,6 @@
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
|
||||
import org.apache.hadoop.hdfs.server.common.Storage;
|
||||
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
|
||||
import org.apache.hadoop.hdfs.server.datanode.BlockScanner;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
|
||||
@ -75,8 +72,8 @@
|
||||
import org.apache.hadoop.hdfs.server.datanode.Replica;
|
||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaAlreadyExistsException;
|
||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
|
||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline;
|
||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaHandler;
|
||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline;
|
||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
|
||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
|
||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery;
|
||||
@ -91,7 +88,7 @@
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
|
||||
import static org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaTracker.RamDiskReplica;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaTracker.RamDiskReplica;
|
||||
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
|
||||
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
||||
@ -109,6 +106,9 @@
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
import org.apache.hadoop.util.Time;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
/**************************************************
|
||||
* FSDataset manages a set of data blocks. Each block
|
||||
* has a unique name and an extent on disk.
|
||||
@ -571,18 +571,12 @@ public InputStream getBlockInputStream(ExtendedBlock b,
|
||||
if (isNativeIOAvailable) {
|
||||
return NativeIO.getShareDeleteFileInputStream(blockFile, seekOffset);
|
||||
} else {
|
||||
RandomAccessFile blockInFile;
|
||||
try {
|
||||
blockInFile = new RandomAccessFile(blockFile, "r");
|
||||
return openAndSeek(blockFile, seekOffset);
|
||||
} catch (FileNotFoundException fnfe) {
|
||||
throw new IOException("Block " + b + " is not valid. " +
|
||||
"Expected block file at " + blockFile + " does not exist.");
|
||||
}
|
||||
|
||||
if (seekOffset > 0) {
|
||||
blockInFile.seek(seekOffset);
|
||||
}
|
||||
return new FileInputStream(blockInFile.getFD());
|
||||
}
|
||||
}
|
||||
|
||||
@ -627,24 +621,14 @@ private ReplicaInfo getReplicaInfo(String bpid, long blkid)
|
||||
* Returns handles to the block file and its metadata file
|
||||
*/
|
||||
@Override // FsDatasetSpi
|
||||
public synchronized ReplicaInputStreams getTmpInputStreams(ExtendedBlock b,
|
||||
long blkOffset, long ckoff) throws IOException {
|
||||
public synchronized ReplicaInputStreams getTmpInputStreams(ExtendedBlock b,
|
||||
long blkOffset, long metaOffset) throws IOException {
|
||||
ReplicaInfo info = getReplicaInfo(b);
|
||||
FsVolumeReference ref = info.getVolume().obtainReference();
|
||||
try {
|
||||
File blockFile = info.getBlockFile();
|
||||
RandomAccessFile blockInFile = new RandomAccessFile(blockFile, "r");
|
||||
if (blkOffset > 0) {
|
||||
blockInFile.seek(blkOffset);
|
||||
}
|
||||
File metaFile = info.getMetaFile();
|
||||
RandomAccessFile metaInFile = new RandomAccessFile(metaFile, "r");
|
||||
if (ckoff > 0) {
|
||||
metaInFile.seek(ckoff);
|
||||
}
|
||||
InputStream blockInStream = new FileInputStream(blockInFile.getFD());
|
||||
InputStream blockInStream = openAndSeek(info.getBlockFile(), blkOffset);
|
||||
try {
|
||||
InputStream metaInStream = new FileInputStream(metaInFile.getFD());
|
||||
InputStream metaInStream = openAndSeek(info.getMetaFile(), metaOffset);
|
||||
return new ReplicaInputStreams(blockInStream, metaInStream, ref);
|
||||
} catch (IOException e) {
|
||||
IOUtils.cleanup(null, blockInStream);
|
||||
@ -656,6 +640,21 @@ public synchronized ReplicaInputStreams getTmpInputStreams(ExtendedBlock b,
|
||||
}
|
||||
}
|
||||
|
||||
private static FileInputStream openAndSeek(File file, long offset)
|
||||
throws IOException {
|
||||
RandomAccessFile raf = null;
|
||||
try {
|
||||
raf = new RandomAccessFile(file, "r");
|
||||
if (offset > 0) {
|
||||
raf.seek(offset);
|
||||
}
|
||||
return new FileInputStream(raf.getFD());
|
||||
} catch(IOException ioe) {
|
||||
IOUtils.cleanup(null, raf);
|
||||
throw ioe;
|
||||
}
|
||||
}
|
||||
|
||||
static File moveBlockFiles(Block b, File srcfile, File destdir)
|
||||
throws IOException {
|
||||
final File dstfile = new File(destdir, b.getBlockName());
|
||||
|
Loading…
Reference in New Issue
Block a user