HDFS-7790. Do not create optional fields in DFSInputStream unless they are needed (cmccabe)
This commit is contained in:
parent
46b6d23e8f
commit
871cb56152
@ -631,6 +631,9 @@ Release 2.7.0 - UNRELEASED
|
|||||||
HDFS-7684. The host:port settings of the daemons should be trimmed before
|
HDFS-7684. The host:port settings of the daemons should be trimmed before
|
||||||
use. (Anu Engineer via aajisaka)
|
use. (Anu Engineer via aajisaka)
|
||||||
|
|
||||||
|
HDFS-7790. Do not create optional fields in DFSInputStream unless they are
|
||||||
|
needed (cmccabe)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-7454. Reduce memory footprint for AclEntries in NameNode.
|
HDFS-7454. Reduce memory footprint for AclEntries in NameNode.
|
||||||
|
@ -127,8 +127,15 @@ public class DFSInputStream extends FSInputStream
|
|||||||
* The value type can be either ByteBufferPool or ClientMmap, depending on
|
* The value type can be either ByteBufferPool or ClientMmap, depending on
|
||||||
* whether we this is a memory-mapped buffer or not.
|
* whether we this is a memory-mapped buffer or not.
|
||||||
*/
|
*/
|
||||||
private final IdentityHashStore<ByteBuffer, Object>
|
private IdentityHashStore<ByteBuffer, Object> extendedReadBuffers;
|
||||||
|
|
||||||
|
private synchronized IdentityHashStore<ByteBuffer, Object>
|
||||||
|
getExtendedReadBuffers() {
|
||||||
|
if (extendedReadBuffers == null) {
|
||||||
extendedReadBuffers = new IdentityHashStore<ByteBuffer, Object>(0);
|
extendedReadBuffers = new IdentityHashStore<ByteBuffer, Object>(0);
|
||||||
|
}
|
||||||
|
return extendedReadBuffers;
|
||||||
|
}
|
||||||
|
|
||||||
public static class ReadStatistics {
|
public static class ReadStatistics {
|
||||||
public ReadStatistics() {
|
public ReadStatistics() {
|
||||||
@ -236,7 +243,7 @@ void clear() {
|
|||||||
private final ConcurrentHashMap<DatanodeInfo, DatanodeInfo> deadNodes =
|
private final ConcurrentHashMap<DatanodeInfo, DatanodeInfo> deadNodes =
|
||||||
new ConcurrentHashMap<DatanodeInfo, DatanodeInfo>();
|
new ConcurrentHashMap<DatanodeInfo, DatanodeInfo>();
|
||||||
|
|
||||||
private final byte[] oneByteBuf = new byte[1]; // used for 'int read()'
|
private byte[] oneByteBuf; // used for 'int read()'
|
||||||
|
|
||||||
void addToDeadNodes(DatanodeInfo dnInfo) {
|
void addToDeadNodes(DatanodeInfo dnInfo) {
|
||||||
deadNodes.put(dnInfo, dnInfo);
|
deadNodes.put(dnInfo, dnInfo);
|
||||||
@ -670,7 +677,7 @@ public synchronized void close() throws IOException {
|
|||||||
}
|
}
|
||||||
dfsClient.checkOpen();
|
dfsClient.checkOpen();
|
||||||
|
|
||||||
if (!extendedReadBuffers.isEmpty()) {
|
if ((extendedReadBuffers != null) && (!extendedReadBuffers.isEmpty())) {
|
||||||
final StringBuilder builder = new StringBuilder();
|
final StringBuilder builder = new StringBuilder();
|
||||||
extendedReadBuffers.visitAll(new IdentityHashStore.Visitor<ByteBuffer, Object>() {
|
extendedReadBuffers.visitAll(new IdentityHashStore.Visitor<ByteBuffer, Object>() {
|
||||||
private String prefix = "";
|
private String prefix = "";
|
||||||
@ -690,6 +697,9 @@ public void accept(ByteBuffer k, Object v) {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized int read() throws IOException {
|
public synchronized int read() throws IOException {
|
||||||
|
if (oneByteBuf == null) {
|
||||||
|
oneByteBuf = new byte[1];
|
||||||
|
}
|
||||||
int ret = read( oneByteBuf, 0, 1 );
|
int ret = read( oneByteBuf, 0, 1 );
|
||||||
return ( ret <= 0 ) ? -1 : (oneByteBuf[0] & 0xff);
|
return ( ret <= 0 ) ? -1 : (oneByteBuf[0] & 0xff);
|
||||||
}
|
}
|
||||||
@ -1708,7 +1718,7 @@ public synchronized ByteBuffer read(ByteBufferPool bufferPool,
|
|||||||
}
|
}
|
||||||
buffer = ByteBufferUtil.fallbackRead(this, bufferPool, maxLength);
|
buffer = ByteBufferUtil.fallbackRead(this, bufferPool, maxLength);
|
||||||
if (buffer != null) {
|
if (buffer != null) {
|
||||||
extendedReadBuffers.put(buffer, bufferPool);
|
getExtendedReadBuffers().put(buffer, bufferPool);
|
||||||
}
|
}
|
||||||
return buffer;
|
return buffer;
|
||||||
}
|
}
|
||||||
@ -1787,7 +1797,7 @@ private synchronized ByteBuffer tryReadZeroCopy(int maxLength,
|
|||||||
buffer = clientMmap.getMappedByteBuffer().asReadOnlyBuffer();
|
buffer = clientMmap.getMappedByteBuffer().asReadOnlyBuffer();
|
||||||
buffer.position((int)blockPos);
|
buffer.position((int)blockPos);
|
||||||
buffer.limit((int)(blockPos + length));
|
buffer.limit((int)(blockPos + length));
|
||||||
extendedReadBuffers.put(buffer, clientMmap);
|
getExtendedReadBuffers().put(buffer, clientMmap);
|
||||||
synchronized (infoLock) {
|
synchronized (infoLock) {
|
||||||
readStatistics.addZeroCopyBytes(length);
|
readStatistics.addZeroCopyBytes(length);
|
||||||
}
|
}
|
||||||
@ -1808,7 +1818,7 @@ private synchronized ByteBuffer tryReadZeroCopy(int maxLength,
|
|||||||
@Override
|
@Override
|
||||||
public synchronized void releaseBuffer(ByteBuffer buffer) {
|
public synchronized void releaseBuffer(ByteBuffer buffer) {
|
||||||
if (buffer == EMPTY_BUFFER) return;
|
if (buffer == EMPTY_BUFFER) return;
|
||||||
Object val = extendedReadBuffers.remove(buffer);
|
Object val = getExtendedReadBuffers().remove(buffer);
|
||||||
if (val == null) {
|
if (val == null) {
|
||||||
throw new IllegalArgumentException("tried to release a buffer " +
|
throw new IllegalArgumentException("tried to release a buffer " +
|
||||||
"that was not created by this stream, " + buffer);
|
"that was not created by this stream, " + buffer);
|
||||||
|
Loading…
Reference in New Issue
Block a user