HDFS-5676. fix inconsistent synchronization of CachingStrategy (cmccabe)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1552162 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Colin McCabe 2013-12-18 23:29:05 +00:00
parent fc966461e0
commit 90122f25e1
5 changed files with 66 additions and 33 deletions

View File

@ -241,9 +241,6 @@ Trunk (Unreleased)
HDFS-5431. Support cachepool-based limit management in path-based caching HDFS-5431. Support cachepool-based limit management in path-based caching
(awang via cmccabe) (awang via cmccabe)
HDFS-5634. Allow BlockReaderLocal to switch between checksumming and not
(cmccabe)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-5349. DNA_CACHE and DNA_UNCACHE should be by blockId only. (cmccabe) HDFS-5349. DNA_CACHE and DNA_UNCACHE should be by blockId only. (cmccabe)
@ -754,6 +751,9 @@ Release 2.4.0 - UNRELEASED
FSEditLogOp; change FSEditLogOpCodes.fromByte(..) to be more efficient; and FSEditLogOp; change FSEditLogOpCodes.fromByte(..) to be more efficient; and
change Some fields in FSEditLog to final. (szetszwo) change Some fields in FSEditLog to final. (szetszwo)
HDFS-5634. Allow BlockReaderLocal to switch between checksumming and not
(cmccabe)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-5239. Allow FSNamesystem lock fairness to be configurable (daryn) HDFS-5239. Allow FSNamesystem lock fairness to be configurable (daryn)
@ -809,6 +809,8 @@ Release 2.4.0 - UNRELEASED
HDFS-5580. Fix infinite loop in Balancer.waitForMoveCompletion. HDFS-5580. Fix infinite loop in Balancer.waitForMoveCompletion.
(Binglin Chang via junping_du) (Binglin Chang via junping_du)
HDFS-5676. fix inconsistent synchronization of CachingStrategy (cmccabe)
Release 2.3.0 - UNRELEASED Release 2.3.0 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -228,7 +228,7 @@ void addToDeadNodes(DatanodeInfo dnInfo) {
dfsClient.getConf().shortCircuitStreamsCacheSize, dfsClient.getConf().shortCircuitStreamsCacheSize,
dfsClient.getConf().shortCircuitStreamsCacheExpiryMs); dfsClient.getConf().shortCircuitStreamsCacheExpiryMs);
this.cachingStrategy = this.cachingStrategy =
dfsClient.getDefaultReadCachingStrategy().duplicate(); dfsClient.getDefaultReadCachingStrategy();
openInfo(); openInfo();
} }
@ -574,7 +574,7 @@ private synchronized DatanodeInfo blockSeekTo(long target) throws IOException {
Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken(); Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken();
blockReader = getBlockReader(targetAddr, chosenNode, src, blk, blockReader = getBlockReader(targetAddr, chosenNode, src, blk,
accessToken, offsetIntoBlock, blk.getNumBytes() - offsetIntoBlock, accessToken, offsetIntoBlock, blk.getNumBytes() - offsetIntoBlock,
buffersize, verifyChecksum, dfsClient.clientName); buffersize, verifyChecksum, dfsClient.clientName, cachingStrategy);
if(connectFailedOnce) { if(connectFailedOnce) {
DFSClient.LOG.info("Successfully connected to " + targetAddr + DFSClient.LOG.info("Successfully connected to " + targetAddr +
" for " + blk); " for " + blk);
@ -928,7 +928,11 @@ private void fetchBlockByteRange(LocatedBlock block, long start, long end,
// cached block locations may have been updated by chooseDataNode() // cached block locations may have been updated by chooseDataNode()
// or fetchBlockAt(). Always get the latest list of locations at the // or fetchBlockAt(). Always get the latest list of locations at the
// start of the loop. // start of the loop.
CachingStrategy curCachingStrategy;
synchronized (this) {
block = getBlockAt(block.getStartOffset(), false); block = getBlockAt(block.getStartOffset(), false);
curCachingStrategy = cachingStrategy;
}
DNAddrPair retval = chooseDataNode(block); DNAddrPair retval = chooseDataNode(block);
DatanodeInfo chosenNode = retval.info; DatanodeInfo chosenNode = retval.info;
InetSocketAddress targetAddr = retval.addr; InetSocketAddress targetAddr = retval.addr;
@ -940,7 +944,7 @@ private void fetchBlockByteRange(LocatedBlock block, long start, long end,
int len = (int) (end - start + 1); int len = (int) (end - start + 1);
reader = getBlockReader(targetAddr, chosenNode, src, block.getBlock(), reader = getBlockReader(targetAddr, chosenNode, src, block.getBlock(),
blockToken, start, len, buffersize, verifyChecksum, blockToken, start, len, buffersize, verifyChecksum,
dfsClient.clientName); dfsClient.clientName, curCachingStrategy);
int nread = reader.readAll(buf, offset, len); int nread = reader.readAll(buf, offset, len);
if (nread != len) { if (nread != len) {
throw new IOException("truncated return from reader.read(): " + throw new IOException("truncated return from reader.read(): " +
@ -1053,6 +1057,7 @@ private Peer newTcpPeer(InetSocketAddress addr) throws IOException {
* @param bufferSize The IO buffer size (not the client buffer size) * @param bufferSize The IO buffer size (not the client buffer size)
* @param verifyChecksum Whether to verify checksum * @param verifyChecksum Whether to verify checksum
* @param clientName Client name * @param clientName Client name
* @param CachingStrategy caching strategy to use
* @return New BlockReader instance * @return New BlockReader instance
*/ */
protected BlockReader getBlockReader(InetSocketAddress dnAddr, protected BlockReader getBlockReader(InetSocketAddress dnAddr,
@ -1064,7 +1069,8 @@ protected BlockReader getBlockReader(InetSocketAddress dnAddr,
long len, long len,
int bufferSize, int bufferSize,
boolean verifyChecksum, boolean verifyChecksum,
String clientName) String clientName,
CachingStrategy curCachingStrategy)
throws IOException { throws IOException {
// Firstly, we check to see if we have cached any file descriptors for // Firstly, we check to see if we have cached any file descriptors for
// local blocks. If so, we can just re-use those file descriptors. // local blocks. If so, we can just re-use those file descriptors.
@ -1084,7 +1090,7 @@ protected BlockReader getBlockReader(InetSocketAddress dnAddr,
setBlockMetadataHeader(BlockMetadataHeader. setBlockMetadataHeader(BlockMetadataHeader.
preadHeader(fis[1].getChannel())). preadHeader(fis[1].getChannel())).
setFileInputStreamCache(fileInputStreamCache). setFileInputStreamCache(fileInputStreamCache).
setCachingStrategy(cachingStrategy). setCachingStrategy(curCachingStrategy).
build(); build();
} }
@ -1119,7 +1125,7 @@ protected BlockReader getBlockReader(InetSocketAddress dnAddr,
dfsClient.getConf(), file, block, blockToken, startOffset, dfsClient.getConf(), file, block, blockToken, startOffset,
len, verifyChecksum, clientName, peer, chosenNode, len, verifyChecksum, clientName, peer, chosenNode,
dsFactory, peerCache, fileInputStreamCache, dsFactory, peerCache, fileInputStreamCache,
allowShortCircuitLocalReads, cachingStrategy); allowShortCircuitLocalReads, curCachingStrategy);
return reader; return reader;
} catch (IOException ex) { } catch (IOException ex) {
DFSClient.LOG.debug("Error making BlockReader with DomainSocket. " + DFSClient.LOG.debug("Error making BlockReader with DomainSocket. " +
@ -1142,7 +1148,7 @@ protected BlockReader getBlockReader(InetSocketAddress dnAddr,
dfsClient.getConf(), file, block, blockToken, startOffset, dfsClient.getConf(), file, block, blockToken, startOffset,
len, verifyChecksum, clientName, peer, chosenNode, len, verifyChecksum, clientName, peer, chosenNode,
dsFactory, peerCache, fileInputStreamCache, dsFactory, peerCache, fileInputStreamCache,
allowShortCircuitLocalReads, cachingStrategy); allowShortCircuitLocalReads, curCachingStrategy);
return reader; return reader;
} catch (IOException e) { } catch (IOException e) {
DFSClient.LOG.warn("failed to connect to " + domSock, e); DFSClient.LOG.warn("failed to connect to " + domSock, e);
@ -1166,7 +1172,7 @@ protected BlockReader getBlockReader(InetSocketAddress dnAddr,
dfsClient.getConf(), file, block, blockToken, startOffset, dfsClient.getConf(), file, block, blockToken, startOffset,
len, verifyChecksum, clientName, peer, chosenNode, len, verifyChecksum, clientName, peer, chosenNode,
dsFactory, peerCache, fileInputStreamCache, false, dsFactory, peerCache, fileInputStreamCache, false,
cachingStrategy); curCachingStrategy);
return reader; return reader;
} catch (IOException ex) { } catch (IOException ex) {
DFSClient.LOG.debug("Error making BlockReader. Closing stale " + DFSClient.LOG.debug("Error making BlockReader. Closing stale " +
@ -1186,7 +1192,7 @@ protected BlockReader getBlockReader(InetSocketAddress dnAddr,
dfsClient.getConf(), file, block, blockToken, startOffset, dfsClient.getConf(), file, block, blockToken, startOffset,
len, verifyChecksum, clientName, peer, chosenNode, len, verifyChecksum, clientName, peer, chosenNode,
dsFactory, peerCache, fileInputStreamCache, false, dsFactory, peerCache, fileInputStreamCache, false,
cachingStrategy); curCachingStrategy);
} }
@ -1460,14 +1466,18 @@ private synchronized void closeCurrentBlockReader() {
@Override @Override
public synchronized void setReadahead(Long readahead) public synchronized void setReadahead(Long readahead)
throws IOException { throws IOException {
this.cachingStrategy.setReadahead(readahead); this.cachingStrategy =
new CachingStrategy.Builder(this.cachingStrategy).
setReadahead(readahead).build();
closeCurrentBlockReader(); closeCurrentBlockReader();
} }
@Override @Override
public synchronized void setDropBehind(Boolean dropBehind) public synchronized void setDropBehind(Boolean dropBehind)
throws IOException { throws IOException {
this.cachingStrategy.setDropBehind(dropBehind); this.cachingStrategy =
new CachingStrategy.Builder(this.cachingStrategy).
setDropBehind(dropBehind).build();
closeCurrentBlockReader(); closeCurrentBlockReader();
} }

View File

@ -150,7 +150,7 @@ public class DFSOutputStream extends FSOutputSummer
private Progressable progress; private Progressable progress;
private final short blockReplication; // replication factor of file private final short blockReplication; // replication factor of file
private boolean shouldSyncBlock = false; // force blocks to disk upon close private boolean shouldSyncBlock = false; // force blocks to disk upon close
private CachingStrategy cachingStrategy; private AtomicReference<CachingStrategy> cachingStrategy;
private boolean failPacket = false; private boolean failPacket = false;
private static class Packet { private static class Packet {
@ -1183,7 +1183,7 @@ private boolean createBlockOutputStream(DatanodeInfo[] nodes, long newGS,
new Sender(out).writeBlock(block, accessToken, dfsClient.clientName, new Sender(out).writeBlock(block, accessToken, dfsClient.clientName,
nodes, null, recoveryFlag? stage.getRecoveryStage() : stage, nodes, null, recoveryFlag? stage.getRecoveryStage() : stage,
nodes.length, block.getNumBytes(), bytesSent, newGS, checksum, nodes.length, block.getNumBytes(), bytesSent, newGS, checksum,
cachingStrategy); cachingStrategy.get());
// receive ack for connect // receive ack for connect
BlockOpResponseProto resp = BlockOpResponseProto.parseFrom( BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
@ -1378,8 +1378,8 @@ private DFSOutputStream(DFSClient dfsClient, String src, Progressable progress,
this.blockSize = stat.getBlockSize(); this.blockSize = stat.getBlockSize();
this.blockReplication = stat.getReplication(); this.blockReplication = stat.getReplication();
this.progress = progress; this.progress = progress;
this.cachingStrategy = this.cachingStrategy = new AtomicReference<CachingStrategy>(
dfsClient.getDefaultWriteCachingStrategy().duplicate(); dfsClient.getDefaultWriteCachingStrategy());
if ((progress != null) && DFSClient.LOG.isDebugEnabled()) { if ((progress != null) && DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug( DFSClient.LOG.debug(
"Set non-null progress callback on DFSOutputStream " + src); "Set non-null progress callback on DFSOutputStream " + src);
@ -1993,7 +1993,14 @@ synchronized Token<BlockTokenIdentifier> getBlockToken() {
@Override @Override
public void setDropBehind(Boolean dropBehind) throws IOException { public void setDropBehind(Boolean dropBehind) throws IOException {
this.cachingStrategy.setDropBehind(dropBehind); CachingStrategy prevStrategy, nextStrategy;
// CachingStrategy is immutable. So build a new CachingStrategy with the
// modifications we want, and compare-and-swap it in.
do {
prevStrategy = this.cachingStrategy.get();
nextStrategy = new CachingStrategy.Builder(prevStrategy).
setDropBehind(dropBehind).build();
} while (!this.cachingStrategy.compareAndSet(prevStrategy, nextStrategy));
} }
@VisibleForTesting @VisibleForTesting

View File

@ -21,8 +21,8 @@
* The caching strategy we should use for an HDFS read or write operation. * The caching strategy we should use for an HDFS read or write operation.
*/ */
public class CachingStrategy { public class CachingStrategy {
private Boolean dropBehind; // null = use server defaults private final Boolean dropBehind; // null = use server defaults
private Long readahead; // null = use server defaults private final Long readahead; // null = use server defaults
public static CachingStrategy newDefaultStrategy() { public static CachingStrategy newDefaultStrategy() {
return new CachingStrategy(null, null); return new CachingStrategy(null, null);
@ -32,8 +32,28 @@ public static CachingStrategy newDropBehind() {
return new CachingStrategy(true, null); return new CachingStrategy(true, null);
} }
public CachingStrategy duplicate() { public static class Builder {
return new CachingStrategy(this.dropBehind, this.readahead); private Boolean dropBehind;
private Long readahead;
public Builder(CachingStrategy prev) {
this.dropBehind = prev.dropBehind;
this.readahead = prev.readahead;
}
public Builder setDropBehind(Boolean dropBehind) {
this.dropBehind = dropBehind;
return this;
}
public Builder setReadahead(Long readahead) {
this.readahead = readahead;
return this;
}
public CachingStrategy build() {
return new CachingStrategy(dropBehind, readahead);
}
} }
public CachingStrategy(Boolean dropBehind, Long readahead) { public CachingStrategy(Boolean dropBehind, Long readahead) {
@ -45,18 +65,10 @@ public Boolean getDropBehind() {
return dropBehind; return dropBehind;
} }
public void setDropBehind(Boolean dropBehind) {
this.dropBehind = dropBehind;
}
public Long getReadahead() { public Long getReadahead() {
return readahead; return readahead;
} }
public void setReadahead(Long readahead) {
this.readahead = readahead;
}
public String toString() { public String toString() {
return "CachingStrategy(dropBehind=" + dropBehind + return "CachingStrategy(dropBehind=" + dropBehind +
", readahead=" + readahead + ")"; ", readahead=" + readahead + ")";

View File

@ -28,6 +28,7 @@
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.junit.Assert; import org.junit.Assert;
@ -138,7 +139,8 @@ public void testReadFromOneDN() throws Exception {
Matchers.anyLong(), Matchers.anyLong(),
Matchers.anyInt(), Matchers.anyInt(),
Matchers.anyBoolean(), Matchers.anyBoolean(),
Matchers.anyString()); Matchers.anyString(),
(CachingStrategy)Matchers.anyObject());
// Initial read // Initial read
pread(in, 0, dataBuf, 0, dataBuf.length, authenticData); pread(in, 0, dataBuf, 0, dataBuf.length, authenticData);