HDFS-3322. Use HdfsDataInputStream and HdfsDataOutputStream in Hdfs.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1331114 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
17e03547dc
commit
1a76c82a31
@ -406,6 +406,9 @@ Release 2.0.0 - UNRELEASED
|
|||||||
HDFS-3258. Test for HADOOP-8144 (pseudoSortByDistance in
|
HDFS-3258. Test for HADOOP-8144 (pseudoSortByDistance in
|
||||||
NetworkTopology for first rack local node). (Junping Du via eli)
|
NetworkTopology for first rack local node). (Junping Du via eli)
|
||||||
|
|
||||||
|
HDFS-3322. Use HdfsDataInputStream and HdfsDataOutputStream in Hdfs.
|
||||||
|
(szetszwo)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-3024. Improve performance of stringification in addStoredBlock (todd)
|
HDFS-3024. Improve performance of stringification in addStoredBlock (todd)
|
||||||
|
@ -35,6 +35,8 @@
|
|||||||
import org.apache.hadoop.hdfs.DFSClient;
|
import org.apache.hadoop.hdfs.DFSClient;
|
||||||
import org.apache.hadoop.hdfs.DFSUtil;
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
|
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
|
||||||
|
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
|
||||||
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
|
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||||
@ -43,8 +45,8 @@
|
|||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.security.AccessControlException;
|
import org.apache.hadoop.security.AccessControlException;
|
||||||
import org.apache.hadoop.security.token.Token;
|
|
||||||
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
||||||
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
|
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
|
||||||
import org.apache.hadoop.util.Progressable;
|
import org.apache.hadoop.util.Progressable;
|
||||||
|
|
||||||
@ -88,11 +90,11 @@ public int getUriDefaultPort() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public FSDataOutputStream createInternal(Path f,
|
public HdfsDataOutputStream createInternal(Path f,
|
||||||
EnumSet<CreateFlag> createFlag, FsPermission absolutePermission,
|
EnumSet<CreateFlag> createFlag, FsPermission absolutePermission,
|
||||||
int bufferSize, short replication, long blockSize, Progressable progress,
|
int bufferSize, short replication, long blockSize, Progressable progress,
|
||||||
int bytesPerChecksum, boolean createParent) throws IOException {
|
int bytesPerChecksum, boolean createParent) throws IOException {
|
||||||
return new FSDataOutputStream(dfs.primitiveCreate(getUriPath(f),
|
return new HdfsDataOutputStream(dfs.primitiveCreate(getUriPath(f),
|
||||||
absolutePermission, createFlag, createParent, replication, blockSize,
|
absolutePermission, createFlag, createParent, replication, blockSize,
|
||||||
progress, bufferSize, bytesPerChecksum), getStatistics());
|
progress, bufferSize, bytesPerChecksum), getStatistics());
|
||||||
}
|
}
|
||||||
@ -324,8 +326,9 @@ public void mkdir(Path dir, FsPermission permission, boolean createParent)
|
|||||||
dfs.mkdirs(getUriPath(dir), permission, createParent);
|
dfs.mkdirs(getUriPath(dir), permission, createParent);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("deprecation")
|
||||||
@Override
|
@Override
|
||||||
public FSDataInputStream open(Path f, int bufferSize)
|
public HdfsDataInputStream open(Path f, int bufferSize)
|
||||||
throws IOException, UnresolvedLinkException {
|
throws IOException, UnresolvedLinkException {
|
||||||
return new DFSClient.DFSDataInputStream(dfs.open(getUriPath(f),
|
return new DFSClient.DFSDataInputStream(dfs.open(getUriPath(f),
|
||||||
bufferSize, verifyChecksum));
|
bufferSize, verifyChecksum));
|
||||||
|
@ -45,7 +45,6 @@
|
|||||||
import org.apache.hadoop.fs.UnresolvedLinkException;
|
import org.apache.hadoop.fs.UnresolvedLinkException;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
|
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
|
||||||
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
|
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
|
@ -44,7 +44,6 @@
|
|||||||
import org.apache.hadoop.fs.PathFilter;
|
import org.apache.hadoop.fs.PathFilter;
|
||||||
import org.apache.hadoop.fs.RemoteIterator;
|
import org.apache.hadoop.fs.RemoteIterator;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream;
|
|
||||||
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
|
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
|
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
@ -638,19 +637,18 @@ public FsServerDefaults getServerDefaults() throws IOException {
|
|||||||
// We do not see a need for user to report block checksum errors and do not
|
// We do not see a need for user to report block checksum errors and do not
|
||||||
// want to rely on user to report block corruptions.
|
// want to rely on user to report block corruptions.
|
||||||
@Deprecated
|
@Deprecated
|
||||||
@SuppressWarnings("deprecation")
|
|
||||||
public boolean reportChecksumFailure(Path f,
|
public boolean reportChecksumFailure(Path f,
|
||||||
FSDataInputStream in, long inPos,
|
FSDataInputStream in, long inPos,
|
||||||
FSDataInputStream sums, long sumsPos) {
|
FSDataInputStream sums, long sumsPos) {
|
||||||
|
|
||||||
if(!(in instanceof DFSDataInputStream && sums instanceof DFSDataInputStream))
|
if(!(in instanceof HdfsDataInputStream && sums instanceof HdfsDataInputStream))
|
||||||
throw new IllegalArgumentException("Input streams must be types " +
|
throw new IllegalArgumentException(
|
||||||
"of DFSDataInputStream");
|
"Input streams must be types of HdfsDataInputStream");
|
||||||
|
|
||||||
LocatedBlock lblocks[] = new LocatedBlock[2];
|
LocatedBlock lblocks[] = new LocatedBlock[2];
|
||||||
|
|
||||||
// Find block in data stream.
|
// Find block in data stream.
|
||||||
DFSClient.DFSDataInputStream dfsIn = (DFSClient.DFSDataInputStream) in;
|
HdfsDataInputStream dfsIn = (HdfsDataInputStream) in;
|
||||||
ExtendedBlock dataBlock = dfsIn.getCurrentBlock();
|
ExtendedBlock dataBlock = dfsIn.getCurrentBlock();
|
||||||
if (dataBlock == null) {
|
if (dataBlock == null) {
|
||||||
LOG.error("Error: Current block in data stream is null! ");
|
LOG.error("Error: Current block in data stream is null! ");
|
||||||
@ -663,7 +661,7 @@ public boolean reportChecksumFailure(Path f,
|
|||||||
+ dataNode[0]);
|
+ dataNode[0]);
|
||||||
|
|
||||||
// Find block in checksum stream
|
// Find block in checksum stream
|
||||||
DFSClient.DFSDataInputStream dfsSums = (DFSClient.DFSDataInputStream) sums;
|
HdfsDataInputStream dfsSums = (HdfsDataInputStream) sums;
|
||||||
ExtendedBlock sumsBlock = dfsSums.getCurrentBlock();
|
ExtendedBlock sumsBlock = dfsSums.getCurrentBlock();
|
||||||
if (sumsBlock == null) {
|
if (sumsBlock == null) {
|
||||||
LOG.error("Error: Current block in checksum stream is null! ");
|
LOG.error("Error: Current block in checksum stream is null! ");
|
||||||
|
@ -50,7 +50,7 @@
|
|||||||
import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
|
import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.hdfs.DFSClient;
|
import org.apache.hadoop.hdfs.DFSClient;
|
||||||
import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream;
|
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
|
||||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
@ -398,22 +398,21 @@ private Response get(
|
|||||||
{
|
{
|
||||||
final int b = bufferSize.getValue(conf);
|
final int b = bufferSize.getValue(conf);
|
||||||
final DFSClient dfsclient = new DFSClient(nnRpcAddr, conf);
|
final DFSClient dfsclient = new DFSClient(nnRpcAddr, conf);
|
||||||
DFSDataInputStream in = null;
|
HdfsDataInputStream in = null;
|
||||||
try {
|
try {
|
||||||
in = new DFSClient.DFSDataInputStream(
|
in = new HdfsDataInputStream(dfsclient.open(fullpath, b, true));
|
||||||
dfsclient.open(fullpath, b, true));
|
|
||||||
in.seek(offset.getValue());
|
in.seek(offset.getValue());
|
||||||
} catch(IOException ioe) {
|
} catch(IOException ioe) {
|
||||||
IOUtils.cleanup(LOG, in);
|
IOUtils.cleanup(LOG, in);
|
||||||
IOUtils.cleanup(LOG, dfsclient);
|
IOUtils.cleanup(LOG, dfsclient);
|
||||||
throw ioe;
|
throw ioe;
|
||||||
}
|
}
|
||||||
final DFSDataInputStream dis = in;
|
final HdfsDataInputStream dis = in;
|
||||||
final StreamingOutput streaming = new StreamingOutput() {
|
final StreamingOutput streaming = new StreamingOutput() {
|
||||||
@Override
|
@Override
|
||||||
public void write(final OutputStream out) throws IOException {
|
public void write(final OutputStream out) throws IOException {
|
||||||
final Long n = length.getValue();
|
final Long n = length.getValue();
|
||||||
DFSDataInputStream dfsin = dis;
|
HdfsDataInputStream dfsin = dis;
|
||||||
DFSClient client = dfsclient;
|
DFSClient client = dfsclient;
|
||||||
try {
|
try {
|
||||||
if (n == null) {
|
if (n == null) {
|
||||||
|
@ -55,8 +55,8 @@
|
|||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.FileSystem.Statistics;
|
import org.apache.hadoop.fs.FileSystem.Statistics;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream;
|
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster.NameNodeInfo;
|
import org.apache.hadoop.hdfs.MiniDFSCluster.NameNodeInfo;
|
||||||
|
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
@ -515,15 +515,14 @@ public void cleanup(FileSystem fs, String topdir) throws IOException {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public static ExtendedBlock getFirstBlock(FileSystem fs, Path path) throws IOException {
|
public static ExtendedBlock getFirstBlock(FileSystem fs, Path path) throws IOException {
|
||||||
DFSDataInputStream in =
|
HdfsDataInputStream in = (HdfsDataInputStream)((DistributedFileSystem)fs).open(path);
|
||||||
(DFSDataInputStream) ((DistributedFileSystem)fs).open(path);
|
|
||||||
in.readByte();
|
in.readByte();
|
||||||
return in.getCurrentBlock();
|
return in.getCurrentBlock();
|
||||||
}
|
}
|
||||||
|
|
||||||
public static List<LocatedBlock> getAllBlocks(FSDataInputStream in)
|
public static List<LocatedBlock> getAllBlocks(FSDataInputStream in)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
return ((DFSClient.DFSDataInputStream) in).getAllBlocks();
|
return ((HdfsDataInputStream) in).getAllBlocks();
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Token<BlockTokenIdentifier> getBlockToken(
|
public static Token<BlockTokenIdentifier> getBlockToken(
|
||||||
|
@ -34,6 +34,7 @@
|
|||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
|
||||||
@ -145,7 +146,7 @@ private String checkFile(FileSystem fileSys, Path name, int repl,
|
|||||||
// need a raw stream
|
// need a raw stream
|
||||||
assertTrue("Not HDFS:"+fileSys.getUri(),
|
assertTrue("Not HDFS:"+fileSys.getUri(),
|
||||||
fileSys instanceof DistributedFileSystem);
|
fileSys instanceof DistributedFileSystem);
|
||||||
DFSClient.DFSDataInputStream dis = (DFSClient.DFSDataInputStream)
|
HdfsDataInputStream dis = (HdfsDataInputStream)
|
||||||
((DistributedFileSystem)fileSys).open(name);
|
((DistributedFileSystem)fileSys).open(name);
|
||||||
Collection<LocatedBlock> dinfo = dis.getAllBlocks();
|
Collection<LocatedBlock> dinfo = dis.getAllBlocks();
|
||||||
for (LocatedBlock blk : dinfo) { // for each block
|
for (LocatedBlock blk : dinfo) { // for each block
|
||||||
|
@ -28,7 +28,7 @@
|
|||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream;
|
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
|
||||||
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
|
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
@ -147,7 +147,7 @@ static void checkFile(Path p, int expectedsize, final Configuration conf
|
|||||||
|
|
||||||
final FileSystem fs = DFSTestUtil.getFileSystemAs(ugi, conf);
|
final FileSystem fs = DFSTestUtil.getFileSystemAs(ugi, conf);
|
||||||
|
|
||||||
final DFSDataInputStream in = (DFSDataInputStream)fs.open(p);
|
final HdfsDataInputStream in = (HdfsDataInputStream)fs.open(p);
|
||||||
|
|
||||||
//Check visible length
|
//Check visible length
|
||||||
Assert.assertTrue(in.getVisibleLength() >= expectedsize);
|
Assert.assertTrue(in.getVisibleLength() >= expectedsize);
|
||||||
|
@ -29,7 +29,7 @@
|
|||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream;
|
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
|
||||||
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
|
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
|
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
@ -119,7 +119,7 @@ static void checkFileContent(FileSystem fs, Path name, byte[] expected,
|
|||||||
*/
|
*/
|
||||||
static void checkFileContentDirect(FileSystem fs, Path name, byte[] expected,
|
static void checkFileContentDirect(FileSystem fs, Path name, byte[] expected,
|
||||||
int readOffset) throws IOException {
|
int readOffset) throws IOException {
|
||||||
DFSDataInputStream stm = (DFSDataInputStream)fs.open(name);
|
HdfsDataInputStream stm = (HdfsDataInputStream)fs.open(name);
|
||||||
|
|
||||||
ByteBuffer actual = ByteBuffer.allocate(expected.length - readOffset);
|
ByteBuffer actual = ByteBuffer.allocate(expected.length - readOffset);
|
||||||
|
|
||||||
|
@ -31,6 +31,7 @@
|
|||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
@ -155,7 +156,7 @@ private long readData(String fname, byte[] buffer, long byteExpected, long begin
|
|||||||
try {
|
try {
|
||||||
in = openInputStream(path);
|
in = openInputStream(path);
|
||||||
|
|
||||||
long visibleLenFromReadStream = getVisibleFileLength(in);
|
long visibleLenFromReadStream = ((HdfsDataInputStream)in).getVisibleLength();
|
||||||
|
|
||||||
if (visibleLenFromReadStream < byteExpected)
|
if (visibleLenFromReadStream < byteExpected)
|
||||||
{
|
{
|
||||||
@ -418,11 +419,6 @@ private long getFileLengthFromNN(Path path) throws IOException {
|
|||||||
return fileStatus.getLen();
|
return fileStatus.getLen();
|
||||||
}
|
}
|
||||||
|
|
||||||
private long getVisibleFileLength(FSDataInputStream in) throws IOException {
|
|
||||||
DFSClient.DFSDataInputStream din = (DFSClient.DFSDataInputStream) in;
|
|
||||||
return din.getVisibleLength();
|
|
||||||
}
|
|
||||||
|
|
||||||
private boolean ifExists(Path path) throws IOException {
|
private boolean ifExists(Path path) throws IOException {
|
||||||
return useFCOption ? mfc.util().exists(path) : mfs.exists(path);
|
return useFCOption ? mfc.util().exists(path) : mfs.exists(path);
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user