HDFS-13702. Remove HTrace hooks from DFSClient to reduce CPU usage. Contributed by Todd Lipcon.
This commit is contained in:
parent
6ba9974108
commit
5d748bd056
@ -3070,25 +3070,6 @@ TraceScope newSrcDstTraceScope(String description, String src, String dst) {
|
|||||||
return scope;
|
return scope;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Full detailed tracing for read requests: path, position in the file,
|
|
||||||
* and length.
|
|
||||||
*
|
|
||||||
* @param reqLen requested length
|
|
||||||
*/
|
|
||||||
TraceScope newReaderTraceScope(String description, String path, long pos,
|
|
||||||
int reqLen) {
|
|
||||||
TraceScope scope = newPathTraceScope(description, path);
|
|
||||||
scope.addKVAnnotation("pos", Long.toString(pos));
|
|
||||||
scope.addKVAnnotation("reqLen", Integer.toString(reqLen));
|
|
||||||
return scope;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Add the returned length info to the scope. */
|
|
||||||
void addRetLenToReaderScope(TraceScope scope, int retLen) {
|
|
||||||
scope.addKVAnnotation("retLen", Integer.toString(retLen));
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the erasure coding policy information for the specified path
|
* Get the erasure coding policy information for the specified path
|
||||||
*
|
*
|
||||||
|
@ -85,8 +85,6 @@
|
|||||||
import org.apache.hadoop.util.StopWatch;
|
import org.apache.hadoop.util.StopWatch;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.htrace.core.SpanId;
|
import org.apache.htrace.core.SpanId;
|
||||||
import org.apache.htrace.core.TraceScope;
|
|
||||||
import org.apache.htrace.core.Tracer;
|
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
@ -641,7 +639,6 @@ protected BlockReader getBlockReader(LocatedBlock targetBlock,
|
|||||||
setClientCacheContext(dfsClient.getClientContext()).
|
setClientCacheContext(dfsClient.getClientContext()).
|
||||||
setUserGroupInformation(dfsClient.ugi).
|
setUserGroupInformation(dfsClient.ugi).
|
||||||
setConfiguration(dfsClient.getConfiguration()).
|
setConfiguration(dfsClient.getConfiguration()).
|
||||||
setTracer(dfsClient.getTracer()).
|
|
||||||
build();
|
build();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -821,31 +818,14 @@ public synchronized int read(@Nonnull final byte buf[], int off, int len)
|
|||||||
}
|
}
|
||||||
ReaderStrategy byteArrayReader =
|
ReaderStrategy byteArrayReader =
|
||||||
new ByteArrayStrategy(buf, off, len, readStatistics, dfsClient);
|
new ByteArrayStrategy(buf, off, len, readStatistics, dfsClient);
|
||||||
try (TraceScope scope =
|
return readWithStrategy(byteArrayReader);
|
||||||
dfsClient.newReaderTraceScope("DFSInputStream#byteArrayRead",
|
|
||||||
src, getPos(), len)) {
|
|
||||||
int retLen = readWithStrategy(byteArrayReader);
|
|
||||||
if (retLen < len) {
|
|
||||||
dfsClient.addRetLenToReaderScope(scope, retLen);
|
|
||||||
}
|
|
||||||
return retLen;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized int read(final ByteBuffer buf) throws IOException {
|
public synchronized int read(final ByteBuffer buf) throws IOException {
|
||||||
ReaderStrategy byteBufferReader =
|
ReaderStrategy byteBufferReader =
|
||||||
new ByteBufferStrategy(buf, readStatistics, dfsClient);
|
new ByteBufferStrategy(buf, readStatistics, dfsClient);
|
||||||
int reqLen = buf.remaining();
|
return readWithStrategy(byteBufferReader);
|
||||||
try (TraceScope scope =
|
|
||||||
dfsClient.newReaderTraceScope("DFSInputStream#byteBufferRead",
|
|
||||||
src, getPos(), reqLen)){
|
|
||||||
int retLen = readWithStrategy(byteBufferReader);
|
|
||||||
if (retLen < reqLen) {
|
|
||||||
dfsClient.addRetLenToReaderScope(scope, retLen);
|
|
||||||
}
|
|
||||||
return retLen;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private DNAddrPair chooseDataNode(LocatedBlock block,
|
private DNAddrPair chooseDataNode(LocatedBlock block,
|
||||||
@ -1026,16 +1006,12 @@ private Callable<ByteBuffer> getFromOneDataNode(final DNAddrPair datanode,
|
|||||||
final ByteBuffer bb,
|
final ByteBuffer bb,
|
||||||
final CorruptedBlocks corruptedBlocks,
|
final CorruptedBlocks corruptedBlocks,
|
||||||
final int hedgedReadId) {
|
final int hedgedReadId) {
|
||||||
final SpanId parentSpanId = Tracer.getCurrentSpanId();
|
|
||||||
return new Callable<ByteBuffer>() {
|
return new Callable<ByteBuffer>() {
|
||||||
@Override
|
@Override
|
||||||
public ByteBuffer call() throws Exception {
|
public ByteBuffer call() throws Exception {
|
||||||
DFSClientFaultInjector.get().sleepBeforeHedgedGet();
|
DFSClientFaultInjector.get().sleepBeforeHedgedGet();
|
||||||
try (TraceScope ignored = dfsClient.getTracer().
|
actualGetFromOneDataNode(datanode, start, end, bb, corruptedBlocks);
|
||||||
newScope("hedgedRead" + hedgedReadId, parentSpanId)) {
|
return bb;
|
||||||
actualGetFromOneDataNode(datanode, start, end, bb, corruptedBlocks);
|
|
||||||
return bb;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
@ -1336,16 +1312,8 @@ public int read(long position, byte[] buffer, int offset, int length)
|
|||||||
if (length == 0) {
|
if (length == 0) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
try (TraceScope scope = dfsClient.
|
ByteBuffer bb = ByteBuffer.wrap(buffer, offset, length);
|
||||||
newReaderTraceScope("DFSInputStream#byteArrayPread",
|
return pread(position, bb);
|
||||||
src, position, length)) {
|
|
||||||
ByteBuffer bb = ByteBuffer.wrap(buffer, offset, length);
|
|
||||||
int retLen = pread(position, bb);
|
|
||||||
if (retLen < length) {
|
|
||||||
dfsClient.addRetLenToReaderScope(scope, retLen);
|
|
||||||
}
|
|
||||||
return retLen;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private int pread(long position, ByteBuffer buffer)
|
private int pread(long position, ByteBuffer buffer)
|
||||||
|
@ -75,7 +75,6 @@
|
|||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import org.apache.htrace.core.Tracer;
|
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
@ -189,11 +188,6 @@ public boolean getSupportsReceiptVerification() {
|
|||||||
*/
|
*/
|
||||||
private Configuration configuration;
|
private Configuration configuration;
|
||||||
|
|
||||||
/**
|
|
||||||
* The HTrace tracer to use.
|
|
||||||
*/
|
|
||||||
private Tracer tracer;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Information about the domain socket path we should use to connect to the
|
* Information about the domain socket path we should use to connect to the
|
||||||
* local peer-- or null if we haven't examined the local domain socket.
|
* local peer-- or null if we haven't examined the local domain socket.
|
||||||
@ -298,11 +292,6 @@ public BlockReaderFactory setConfiguration(
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public BlockReaderFactory setTracer(Tracer tracer) {
|
|
||||||
this.tracer = tracer;
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public static void setFailureInjectorForTesting(FailureInjector injector) {
|
public static void setFailureInjectorForTesting(FailureInjector injector) {
|
||||||
failureInjector = injector;
|
failureInjector = injector;
|
||||||
@ -451,7 +440,7 @@ private BlockReader getLegacyBlockReaderLocal() throws IOException {
|
|||||||
try {
|
try {
|
||||||
return BlockReaderLocalLegacy.newBlockReader(conf,
|
return BlockReaderLocalLegacy.newBlockReader(conf,
|
||||||
userGroupInformation, configuration, fileName, block, token,
|
userGroupInformation, configuration, fileName, block, token,
|
||||||
datanode, startOffset, length, storageType, tracer);
|
datanode, startOffset, length, storageType);
|
||||||
} catch (RemoteException remoteException) {
|
} catch (RemoteException remoteException) {
|
||||||
ioe = remoteException.unwrapRemoteException(
|
ioe = remoteException.unwrapRemoteException(
|
||||||
InvalidToken.class, AccessControlException.class);
|
InvalidToken.class, AccessControlException.class);
|
||||||
@ -509,7 +498,6 @@ private BlockReader getBlockReaderLocal() throws IOException {
|
|||||||
setVerifyChecksum(verifyChecksum).
|
setVerifyChecksum(verifyChecksum).
|
||||||
setCachingStrategy(cachingStrategy).
|
setCachingStrategy(cachingStrategy).
|
||||||
setStorageType(storageType).
|
setStorageType(storageType).
|
||||||
setTracer(tracer).
|
|
||||||
build();
|
build();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -860,7 +848,7 @@ private BlockReader getRemoteBlockReader(Peer peer) throws IOException {
|
|||||||
return BlockReaderRemote.newBlockReader(
|
return BlockReaderRemote.newBlockReader(
|
||||||
fileName, block, token, startOffset, length,
|
fileName, block, token, startOffset, length,
|
||||||
verifyChecksum, clientName, peer, datanode,
|
verifyChecksum, clientName, peer, datanode,
|
||||||
clientContext.getPeerCache(), cachingStrategy, tracer,
|
clientContext.getPeerCache(), cachingStrategy,
|
||||||
networkDistance);
|
networkDistance);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -35,8 +35,6 @@
|
|||||||
import org.apache.hadoop.util.DataChecksum;
|
import org.apache.hadoop.util.DataChecksum;
|
||||||
import org.apache.hadoop.util.DirectBufferPool;
|
import org.apache.hadoop.util.DirectBufferPool;
|
||||||
import org.apache.hadoop.util.Timer;
|
import org.apache.hadoop.util.Timer;
|
||||||
import org.apache.htrace.core.TraceScope;
|
|
||||||
import org.apache.htrace.core.Tracer;
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
@ -83,7 +81,6 @@ public static class Builder {
|
|||||||
private long dataPos;
|
private long dataPos;
|
||||||
private ExtendedBlock block;
|
private ExtendedBlock block;
|
||||||
private StorageType storageType;
|
private StorageType storageType;
|
||||||
private Tracer tracer;
|
|
||||||
private ShortCircuitConf shortCircuitConf;
|
private ShortCircuitConf shortCircuitConf;
|
||||||
|
|
||||||
public Builder(ShortCircuitConf conf) {
|
public Builder(ShortCircuitConf conf) {
|
||||||
@ -131,11 +128,6 @@ public Builder setStorageType(StorageType storageType) {
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Builder setTracer(Tracer tracer) {
|
|
||||||
this.tracer = tracer;
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
public BlockReaderLocal build() {
|
public BlockReaderLocal build() {
|
||||||
Preconditions.checkNotNull(replica);
|
Preconditions.checkNotNull(replica);
|
||||||
return new BlockReaderLocal(this);
|
return new BlockReaderLocal(this);
|
||||||
@ -244,11 +236,6 @@ public BlockReaderLocal build() {
|
|||||||
*/
|
*/
|
||||||
private StorageType storageType;
|
private StorageType storageType;
|
||||||
|
|
||||||
/**
|
|
||||||
* The Tracer to use.
|
|
||||||
*/
|
|
||||||
private final Tracer tracer;
|
|
||||||
|
|
||||||
private BlockReaderLocal(Builder builder) {
|
private BlockReaderLocal(Builder builder) {
|
||||||
this.replica = builder.replica;
|
this.replica = builder.replica;
|
||||||
this.dataIn = replica.getDataStream().getChannel();
|
this.dataIn = replica.getDataStream().getChannel();
|
||||||
@ -278,7 +265,6 @@ private BlockReaderLocal(Builder builder) {
|
|||||||
}
|
}
|
||||||
this.maxReadaheadLength = maxReadaheadChunks * bytesPerChecksum;
|
this.maxReadaheadLength = maxReadaheadChunks * bytesPerChecksum;
|
||||||
this.storageType = builder.storageType;
|
this.storageType = builder.storageType;
|
||||||
this.tracer = builder.tracer;
|
|
||||||
|
|
||||||
if (builder.shortCircuitConf.isScrMetricsEnabled()) {
|
if (builder.shortCircuitConf.isScrMetricsEnabled()) {
|
||||||
metricsInitializationLock.lock();
|
metricsInitializationLock.lock();
|
||||||
@ -360,52 +346,49 @@ private synchronized int drainDataBuf(ByteBuffer buf) {
|
|||||||
*/
|
*/
|
||||||
private synchronized int fillBuffer(ByteBuffer buf, boolean canSkipChecksum)
|
private synchronized int fillBuffer(ByteBuffer buf, boolean canSkipChecksum)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
try (TraceScope ignored = tracer.newScope(
|
int total = 0;
|
||||||
"BlockReaderLocal#fillBuffer(" + block.getBlockId() + ")")) {
|
long startDataPos = dataPos;
|
||||||
int total = 0;
|
int startBufPos = buf.position();
|
||||||
long startDataPos = dataPos;
|
while (buf.hasRemaining()) {
|
||||||
int startBufPos = buf.position();
|
int nRead = blockReaderIoProvider.read(dataIn, buf, dataPos);
|
||||||
while (buf.hasRemaining()) {
|
if (nRead < 0) {
|
||||||
int nRead = blockReaderIoProvider.read(dataIn, buf, dataPos);
|
break;
|
||||||
if (nRead < 0) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
dataPos += nRead;
|
|
||||||
total += nRead;
|
|
||||||
}
|
|
||||||
if (canSkipChecksum) {
|
|
||||||
freeChecksumBufIfExists();
|
|
||||||
return total;
|
|
||||||
}
|
|
||||||
if (total > 0) {
|
|
||||||
try {
|
|
||||||
buf.limit(buf.position());
|
|
||||||
buf.position(startBufPos);
|
|
||||||
createChecksumBufIfNeeded();
|
|
||||||
int checksumsNeeded = (total + bytesPerChecksum - 1) /
|
|
||||||
bytesPerChecksum;
|
|
||||||
checksumBuf.clear();
|
|
||||||
checksumBuf.limit(checksumsNeeded * checksumSize);
|
|
||||||
long checksumPos = BlockMetadataHeader.getHeaderSize()
|
|
||||||
+ ((startDataPos / bytesPerChecksum) * checksumSize);
|
|
||||||
while (checksumBuf.hasRemaining()) {
|
|
||||||
int nRead = checksumIn.read(checksumBuf, checksumPos);
|
|
||||||
if (nRead < 0) {
|
|
||||||
throw new IOException("Got unexpected checksum file EOF at " +
|
|
||||||
checksumPos + ", block file position " + startDataPos +
|
|
||||||
" for block " + block + " of file " + filename);
|
|
||||||
}
|
|
||||||
checksumPos += nRead;
|
|
||||||
}
|
|
||||||
checksumBuf.flip();
|
|
||||||
|
|
||||||
checksum.verifyChunkedSums(buf, checksumBuf, filename, startDataPos);
|
|
||||||
} finally {
|
|
||||||
buf.position(buf.limit());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
dataPos += nRead;
|
||||||
|
total += nRead;
|
||||||
|
}
|
||||||
|
if (canSkipChecksum) {
|
||||||
|
freeChecksumBufIfExists();
|
||||||
return total;
|
return total;
|
||||||
}
|
}
|
||||||
|
if (total > 0) {
|
||||||
|
try {
|
||||||
|
buf.limit(buf.position());
|
||||||
|
buf.position(startBufPos);
|
||||||
|
createChecksumBufIfNeeded();
|
||||||
|
int checksumsNeeded = (total + bytesPerChecksum - 1) /
|
||||||
|
bytesPerChecksum;
|
||||||
|
checksumBuf.clear();
|
||||||
|
checksumBuf.limit(checksumsNeeded * checksumSize);
|
||||||
|
long checksumPos = BlockMetadataHeader.getHeaderSize()
|
||||||
|
+ ((startDataPos / bytesPerChecksum) * checksumSize);
|
||||||
|
while (checksumBuf.hasRemaining()) {
|
||||||
|
int nRead = checksumIn.read(checksumBuf, checksumPos);
|
||||||
|
if (nRead < 0) {
|
||||||
|
throw new IOException("Got unexpected checksum file EOF at " +
|
||||||
|
checksumPos + ", block file position " + startDataPos +
|
||||||
|
" for block " + block + " of file " + filename);
|
||||||
|
}
|
||||||
|
checksumPos += nRead;
|
||||||
|
}
|
||||||
|
checksumBuf.flip();
|
||||||
|
|
||||||
|
checksum.verifyChunkedSums(buf, checksumBuf, filename, startDataPos);
|
||||||
|
} finally {
|
||||||
|
buf.position(buf.limit());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return total;
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean createNoChecksumContext() {
|
private boolean createNoChecksumContext() {
|
||||||
|
@ -51,8 +51,6 @@
|
|||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.util.DataChecksum;
|
import org.apache.hadoop.util.DataChecksum;
|
||||||
import org.apache.hadoop.util.DirectBufferPool;
|
import org.apache.hadoop.util.DirectBufferPool;
|
||||||
import org.apache.htrace.core.TraceScope;
|
|
||||||
import org.apache.htrace.core.Tracer;
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
@ -184,7 +182,6 @@ private void removeBlockLocalPathInfo(ExtendedBlock b) {
|
|||||||
private long startOffset;
|
private long startOffset;
|
||||||
private final String filename;
|
private final String filename;
|
||||||
private long blockId;
|
private long blockId;
|
||||||
private final Tracer tracer;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The only way this object can be instantiated.
|
* The only way this object can be instantiated.
|
||||||
@ -193,8 +190,8 @@ static BlockReaderLocalLegacy newBlockReader(DfsClientConf conf,
|
|||||||
UserGroupInformation userGroupInformation,
|
UserGroupInformation userGroupInformation,
|
||||||
Configuration configuration, String file, ExtendedBlock blk,
|
Configuration configuration, String file, ExtendedBlock blk,
|
||||||
Token<BlockTokenIdentifier> token, DatanodeInfo node,
|
Token<BlockTokenIdentifier> token, DatanodeInfo node,
|
||||||
long startOffset, long length, StorageType storageType,
|
long startOffset, long length, StorageType storageType)
|
||||||
Tracer tracer) throws IOException {
|
throws IOException {
|
||||||
final ShortCircuitConf scConf = conf.getShortCircuitConf();
|
final ShortCircuitConf scConf = conf.getShortCircuitConf();
|
||||||
LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node
|
LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node
|
||||||
.getIpcPort());
|
.getIpcPort());
|
||||||
@ -239,11 +236,10 @@ static BlockReaderLocalLegacy newBlockReader(DfsClientConf conf,
|
|||||||
long firstChunkOffset = startOffset
|
long firstChunkOffset = startOffset
|
||||||
- (startOffset % checksum.getBytesPerChecksum());
|
- (startOffset % checksum.getBytesPerChecksum());
|
||||||
localBlockReader = new BlockReaderLocalLegacy(scConf, file, blk,
|
localBlockReader = new BlockReaderLocalLegacy(scConf, file, blk,
|
||||||
startOffset, checksum, true, dataIn, firstChunkOffset, checksumIn,
|
startOffset, checksum, true, dataIn, firstChunkOffset, checksumIn);
|
||||||
tracer);
|
|
||||||
} else {
|
} else {
|
||||||
localBlockReader = new BlockReaderLocalLegacy(scConf, file, blk,
|
localBlockReader = new BlockReaderLocalLegacy(scConf, file, blk,
|
||||||
startOffset, dataIn, tracer);
|
startOffset, dataIn);
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
// remove from cache
|
// remove from cache
|
||||||
@ -320,17 +316,17 @@ private static int getSlowReadBufferNumChunks(int bufferSizeBytes,
|
|||||||
}
|
}
|
||||||
|
|
||||||
private BlockReaderLocalLegacy(ShortCircuitConf conf, String hdfsfile,
|
private BlockReaderLocalLegacy(ShortCircuitConf conf, String hdfsfile,
|
||||||
ExtendedBlock block, long startOffset, FileInputStream dataIn,
|
ExtendedBlock block, long startOffset, FileInputStream dataIn)
|
||||||
Tracer tracer) throws IOException {
|
throws IOException {
|
||||||
this(conf, hdfsfile, block, startOffset,
|
this(conf, hdfsfile, block, startOffset,
|
||||||
DataChecksum.newDataChecksum(DataChecksum.Type.NULL, 4), false,
|
DataChecksum.newDataChecksum(DataChecksum.Type.NULL, 4), false,
|
||||||
dataIn, startOffset, null, tracer);
|
dataIn, startOffset, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
private BlockReaderLocalLegacy(ShortCircuitConf conf, String hdfsfile,
|
private BlockReaderLocalLegacy(ShortCircuitConf conf, String hdfsfile,
|
||||||
ExtendedBlock block, long startOffset, DataChecksum checksum,
|
ExtendedBlock block, long startOffset, DataChecksum checksum,
|
||||||
boolean verifyChecksum, FileInputStream dataIn, long firstChunkOffset,
|
boolean verifyChecksum, FileInputStream dataIn, long firstChunkOffset,
|
||||||
FileInputStream checksumIn, Tracer tracer) throws IOException {
|
FileInputStream checksumIn) throws IOException {
|
||||||
this.filename = hdfsfile;
|
this.filename = hdfsfile;
|
||||||
this.checksum = checksum;
|
this.checksum = checksum;
|
||||||
this.verifyChecksum = verifyChecksum;
|
this.verifyChecksum = verifyChecksum;
|
||||||
@ -369,7 +365,6 @@ private BlockReaderLocalLegacy(ShortCircuitConf conf, String hdfsfile,
|
|||||||
bufferPool.returnBuffer(checksumBuff);
|
bufferPool.returnBuffer(checksumBuff);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
this.tracer = tracer;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -377,23 +372,20 @@ private BlockReaderLocalLegacy(ShortCircuitConf conf, String hdfsfile,
|
|||||||
*/
|
*/
|
||||||
private int fillBuffer(FileInputStream stream, ByteBuffer buf)
|
private int fillBuffer(FileInputStream stream, ByteBuffer buf)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
try (TraceScope ignored = tracer.
|
int bytesRead = stream.getChannel().read(buf);
|
||||||
newScope("BlockReaderLocalLegacy#fillBuffer(" + blockId + ")")) {
|
if (bytesRead < 0) {
|
||||||
int bytesRead = stream.getChannel().read(buf);
|
//EOF
|
||||||
if (bytesRead < 0) {
|
return bytesRead;
|
||||||
|
}
|
||||||
|
while (buf.remaining() > 0) {
|
||||||
|
int n = stream.getChannel().read(buf);
|
||||||
|
if (n < 0) {
|
||||||
//EOF
|
//EOF
|
||||||
return bytesRead;
|
return bytesRead;
|
||||||
}
|
}
|
||||||
while (buf.remaining() > 0) {
|
bytesRead += n;
|
||||||
int n = stream.getChannel().read(buf);
|
|
||||||
if (n < 0) {
|
|
||||||
//EOF
|
|
||||||
return bytesRead;
|
|
||||||
}
|
|
||||||
bytesRead += n;
|
|
||||||
}
|
|
||||||
return bytesRead;
|
|
||||||
}
|
}
|
||||||
|
return bytesRead;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -49,11 +49,9 @@
|
|||||||
import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
|
import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.util.DataChecksum;
|
import org.apache.hadoop.util.DataChecksum;
|
||||||
import org.apache.htrace.core.TraceScope;
|
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
import org.apache.htrace.core.Tracer;
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
@ -121,8 +119,6 @@ public class BlockReaderRemote implements BlockReader {
|
|||||||
|
|
||||||
private boolean sentStatusCode = false;
|
private boolean sentStatusCode = false;
|
||||||
|
|
||||||
private final Tracer tracer;
|
|
||||||
|
|
||||||
private final int networkDistance;
|
private final int networkDistance;
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
@ -139,10 +135,7 @@ public synchronized int read(byte[] buf, int off, int len)
|
|||||||
|
|
||||||
if (curDataSlice == null ||
|
if (curDataSlice == null ||
|
||||||
curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
|
curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
|
||||||
try (TraceScope ignored = tracer.newScope(
|
readNextPacket();
|
||||||
"BlockReaderRemote2#readNextPacket(" + blockId + ")")) {
|
|
||||||
readNextPacket();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG.trace("Finishing read #{}", randomId);
|
LOG.trace("Finishing read #{}", randomId);
|
||||||
@ -163,10 +156,7 @@ public synchronized int read(byte[] buf, int off, int len)
|
|||||||
public synchronized int read(ByteBuffer buf) throws IOException {
|
public synchronized int read(ByteBuffer buf) throws IOException {
|
||||||
if (curDataSlice == null ||
|
if (curDataSlice == null ||
|
||||||
(curDataSlice.remaining() == 0 && bytesNeededToFinish > 0)) {
|
(curDataSlice.remaining() == 0 && bytesNeededToFinish > 0)) {
|
||||||
try (TraceScope ignored = tracer.newScope(
|
readNextPacket();
|
||||||
"BlockReaderRemote2#readNextPacket(" + blockId + ")")) {
|
|
||||||
readNextPacket();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
if (curDataSlice.remaining() == 0) {
|
if (curDataSlice.remaining() == 0) {
|
||||||
// we're at EOF now
|
// we're at EOF now
|
||||||
@ -280,7 +270,6 @@ protected BlockReaderRemote(String file, long blockId,
|
|||||||
long startOffset, long firstChunkOffset,
|
long startOffset, long firstChunkOffset,
|
||||||
long bytesToRead, Peer peer,
|
long bytesToRead, Peer peer,
|
||||||
DatanodeID datanodeID, PeerCache peerCache,
|
DatanodeID datanodeID, PeerCache peerCache,
|
||||||
Tracer tracer,
|
|
||||||
int networkDistance) {
|
int networkDistance) {
|
||||||
// Path is used only for printing block and file information in debug
|
// Path is used only for printing block and file information in debug
|
||||||
this.peer = peer;
|
this.peer = peer;
|
||||||
@ -300,7 +289,6 @@ protected BlockReaderRemote(String file, long blockId,
|
|||||||
this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset);
|
this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset);
|
||||||
bytesPerChecksum = this.checksum.getBytesPerChecksum();
|
bytesPerChecksum = this.checksum.getBytesPerChecksum();
|
||||||
checksumSize = this.checksum.getChecksumSize();
|
checksumSize = this.checksum.getChecksumSize();
|
||||||
this.tracer = tracer;
|
|
||||||
this.networkDistance = networkDistance;
|
this.networkDistance = networkDistance;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -397,7 +385,6 @@ public static BlockReader newBlockReader(String file,
|
|||||||
Peer peer, DatanodeID datanodeID,
|
Peer peer, DatanodeID datanodeID,
|
||||||
PeerCache peerCache,
|
PeerCache peerCache,
|
||||||
CachingStrategy cachingStrategy,
|
CachingStrategy cachingStrategy,
|
||||||
Tracer tracer,
|
|
||||||
int networkDistance) throws IOException {
|
int networkDistance) throws IOException {
|
||||||
// in and out will be closed when sock is closed (by the caller)
|
// in and out will be closed when sock is closed (by the caller)
|
||||||
final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
|
final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
|
||||||
@ -431,7 +418,7 @@ public static BlockReader newBlockReader(String file,
|
|||||||
|
|
||||||
return new BlockReaderRemote(file, block.getBlockId(), checksum,
|
return new BlockReaderRemote(file, block.getBlockId(), checksum,
|
||||||
verifyChecksum, startOffset, firstChunkOffset, len, peer, datanodeID,
|
verifyChecksum, startOffset, firstChunkOffset, len, peer, datanodeID,
|
||||||
peerCache, tracer, networkDistance);
|
peerCache, networkDistance);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void checkSuccess(
|
static void checkSuccess(
|
||||||
|
@ -128,7 +128,7 @@ private BlockReader createBlockReader(long offsetInBlock) {
|
|||||||
return BlockReaderRemote.newBlockReader(
|
return BlockReaderRemote.newBlockReader(
|
||||||
"dummy", block, blockToken, offsetInBlock,
|
"dummy", block, blockToken, offsetInBlock,
|
||||||
block.getNumBytes() - offsetInBlock, true, "", peer, source,
|
block.getNumBytes() - offsetInBlock, true, "", peer, source,
|
||||||
null, stripedReader.getCachingStrategy(), datanode.getTracer(), -1);
|
null, stripedReader.getCachingStrategy(), -1);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.info("Exception while creating remote block reader, datanode {}",
|
LOG.info("Exception while creating remote block reader, datanode {}",
|
||||||
source, e);
|
source, e);
|
||||||
|
@ -1032,7 +1032,6 @@ private void copyBlock(final DFSClient dfs, LocatedBlock lblock,
|
|||||||
setCachingStrategy(CachingStrategy.newDropBehind()).
|
setCachingStrategy(CachingStrategy.newDropBehind()).
|
||||||
setClientCacheContext(dfs.getClientContext()).
|
setClientCacheContext(dfs.getClientContext()).
|
||||||
setConfiguration(namenode.getConf()).
|
setConfiguration(namenode.getConf()).
|
||||||
setTracer(tracer).
|
|
||||||
setRemotePeerFactory(new RemotePeerFactory() {
|
setRemotePeerFactory(new RemotePeerFactory() {
|
||||||
@Override
|
@Override
|
||||||
public Peer newConnectedPeer(InetSocketAddress addr,
|
public Peer newConnectedPeer(InetSocketAddress addr,
|
||||||
|
@ -30,7 +30,6 @@
|
|||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.FsTracer;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.BlockReader;
|
import org.apache.hadoop.hdfs.BlockReader;
|
||||||
import org.apache.hadoop.hdfs.ClientContext;
|
import org.apache.hadoop.hdfs.ClientContext;
|
||||||
@ -206,7 +205,6 @@ public static BlockReader getBlockReader(final DistributedFileSystem fs,
|
|||||||
setCachingStrategy(CachingStrategy.newDefaultStrategy()).
|
setCachingStrategy(CachingStrategy.newDefaultStrategy()).
|
||||||
setConfiguration(fs.getConf()).
|
setConfiguration(fs.getConf()).
|
||||||
setAllowShortCircuitLocalReads(true).
|
setAllowShortCircuitLocalReads(true).
|
||||||
setTracer(FsTracer.get(fs.getConf())).
|
|
||||||
setRemotePeerFactory(new RemotePeerFactory() {
|
setRemotePeerFactory(new RemotePeerFactory() {
|
||||||
@Override
|
@Override
|
||||||
public Peer newConnectedPeer(InetSocketAddress addr,
|
public Peer newConnectedPeer(InetSocketAddress addr,
|
||||||
|
@ -52,7 +52,6 @@
|
|||||||
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplica;
|
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplica;
|
||||||
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm;
|
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm;
|
||||||
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.ShmId;
|
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.ShmId;
|
||||||
import org.apache.hadoop.fs.FsTracer;
|
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.hadoop.net.unix.DomainSocket;
|
import org.apache.hadoop.net.unix.DomainSocket;
|
||||||
@ -208,7 +207,6 @@ public void runBlockReaderLocalTest(BlockReaderLocalTest test,
|
|||||||
setShortCircuitReplica(replica).
|
setShortCircuitReplica(replica).
|
||||||
setCachingStrategy(new CachingStrategy(false, readahead)).
|
setCachingStrategy(new CachingStrategy(false, readahead)).
|
||||||
setVerifyChecksum(checksum).
|
setVerifyChecksum(checksum).
|
||||||
setTracer(FsTracer.get(conf)).
|
|
||||||
build();
|
build();
|
||||||
dataIn = null;
|
dataIn = null;
|
||||||
metaIn = null;
|
metaIn = null;
|
||||||
|
@ -33,7 +33,6 @@
|
|||||||
import org.apache.hadoop.fs.FSDataInputStream;
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.FsTracer;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.BlockReader;
|
import org.apache.hadoop.hdfs.BlockReader;
|
||||||
import org.apache.hadoop.hdfs.client.impl.BlockReaderFactory;
|
import org.apache.hadoop.hdfs.client.impl.BlockReaderFactory;
|
||||||
@ -167,7 +166,6 @@ protected void tryRead(final Configuration conf, LocatedBlock lblock,
|
|||||||
setCachingStrategy(CachingStrategy.newDefaultStrategy()).
|
setCachingStrategy(CachingStrategy.newDefaultStrategy()).
|
||||||
setClientCacheContext(ClientContext.getFromConf(conf)).
|
setClientCacheContext(ClientContext.getFromConf(conf)).
|
||||||
setConfiguration(conf).
|
setConfiguration(conf).
|
||||||
setTracer(FsTracer.get(conf)).
|
|
||||||
setRemotePeerFactory(new RemotePeerFactory() {
|
setRemotePeerFactory(new RemotePeerFactory() {
|
||||||
@Override
|
@Override
|
||||||
public Peer newConnectedPeer(InetSocketAddress addr,
|
public Peer newConnectedPeer(InetSocketAddress addr,
|
||||||
|
@ -40,7 +40,6 @@
|
|||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.FileUtil;
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
import org.apache.hadoop.fs.FsTracer;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.BlockReader;
|
import org.apache.hadoop.hdfs.BlockReader;
|
||||||
import org.apache.hadoop.hdfs.ClientContext;
|
import org.apache.hadoop.hdfs.ClientContext;
|
||||||
@ -655,7 +654,6 @@ private void accessBlock(DatanodeInfo datanode, LocatedBlock lblock)
|
|||||||
setCachingStrategy(CachingStrategy.newDefaultStrategy()).
|
setCachingStrategy(CachingStrategy.newDefaultStrategy()).
|
||||||
setClientCacheContext(ClientContext.getFromConf(conf)).
|
setClientCacheContext(ClientContext.getFromConf(conf)).
|
||||||
setConfiguration(conf).
|
setConfiguration(conf).
|
||||||
setTracer(FsTracer.get(conf)).
|
|
||||||
setRemotePeerFactory(new RemotePeerFactory() {
|
setRemotePeerFactory(new RemotePeerFactory() {
|
||||||
@Override
|
@Override
|
||||||
public Peer newConnectedPeer(InetSocketAddress addr,
|
public Peer newConnectedPeer(InetSocketAddress addr,
|
||||||
|
Loading…
Reference in New Issue
Block a user