HADOOP 15688. ABFS: InputStream wrapped in FSDataInputStream twice.

Contributed by Sean Mackrory.
This commit is contained in:
Thomas Marquardt 2018-08-23 20:43:52 +00:00
parent 9c1e4e8139
commit 6b6f8cc2be

View File

@ -19,7 +19,6 @@
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.net.MalformedURLException; import java.net.MalformedURLException;
import java.net.URI; import java.net.URI;
@ -50,8 +49,6 @@
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -251,11 +248,12 @@ public OutputStream createFile(final Path path, final boolean overwrite, final F
isNamespaceEnabled ? getOctalNotation(permission) : null, isNamespaceEnabled ? getOctalNotation(permission) : null,
isNamespaceEnabled ? getOctalNotation(umask) : null); isNamespaceEnabled ? getOctalNotation(umask) : null);
final OutputStream outputStream; return new AbfsOutputStream(
outputStream = new FSDataOutputStream( client,
new AbfsOutputStream(client, AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), 0, AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path),
abfsConfiguration.getWriteBufferSize(), abfsConfiguration.isFlushEnabled()), null); 0,
return outputStream; abfsConfiguration.getWriteBufferSize(),
abfsConfiguration.isFlushEnabled());
} }
public void createDirectory(final Path path, final FsPermission permission, final FsPermission umask) public void createDirectory(final Path path, final FsPermission permission, final FsPermission umask)
@ -273,7 +271,7 @@ public void createDirectory(final Path path, final FsPermission permission, fina
isNamespaceEnabled ? getOctalNotation(umask) : null); isNamespaceEnabled ? getOctalNotation(umask) : null);
} }
public InputStream openFileForRead(final Path path, final FileSystem.Statistics statistics) public AbfsInputStream openFileForRead(final Path path, final FileSystem.Statistics statistics)
throws AzureBlobFileSystemException { throws AzureBlobFileSystemException {
LOG.debug("openFileForRead filesystem: {} path: {}", LOG.debug("openFileForRead filesystem: {} path: {}",
client.getFileSystem(), client.getFileSystem(),
@ -294,10 +292,9 @@ public InputStream openFileForRead(final Path path, final FileSystem.Statistics
} }
// Add statistics for InputStream // Add statistics for InputStream
return new FSDataInputStream( return new AbfsInputStream(client, statistics,
new AbfsInputStream(client, statistics,
AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), contentLength, AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), contentLength,
abfsConfiguration.getReadBufferSize(), abfsConfiguration.getReadAheadQueueDepth(), eTag)); abfsConfiguration.getReadBufferSize(), abfsConfiguration.getReadAheadQueueDepth(), eTag);
} }
public OutputStream openFileForWrite(final Path path, final boolean overwrite) throws public OutputStream openFileForWrite(final Path path, final boolean overwrite) throws
@ -322,11 +319,12 @@ public OutputStream openFileForWrite(final Path path, final boolean overwrite) t
final long offset = overwrite ? 0 : contentLength; final long offset = overwrite ? 0 : contentLength;
final OutputStream outputStream; return new AbfsOutputStream(
outputStream = new FSDataOutputStream( client,
new AbfsOutputStream(client, AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path),
offset, abfsConfiguration.getWriteBufferSize(), abfsConfiguration.isFlushEnabled()), null); offset,
return outputStream; abfsConfiguration.getWriteBufferSize(),
abfsConfiguration.isFlushEnabled());
} }
public void rename(final Path source, final Path destination) throws public void rename(final Path source, final Path destination) throws