HDFS-4817. Make HDFS advisory caching configurable on a per-file basis. (Colin Patrick McCabe)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1505753 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Colin McCabe 2013-07-22 18:15:18 +00:00
parent 74906216fe
commit c1314eb2a3
36 changed files with 942 additions and 106 deletions

View File

@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@InterfaceAudience.Public
@InterfaceStability.Evolving
public interface CanSetDropBehind {
/**
* Configure whether the stream should drop the cache.
*
* @param dropCache Whether to drop the cache. null means to use the
* default value.
* @throws IOException If there was an error changing the dropBehind
* setting.
* UnsupportedOperationException If this stream doesn't support
* setting the drop-behind.
*/
public void setDropBehind(Boolean dropCache)
throws IOException, UnsupportedOperationException;
}

View File

@ -0,0 +1,40 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@InterfaceAudience.Public
@InterfaceStability.Evolving
public interface CanSetReadahead {
/**
* Set the readahead on this stream.
*
* @param readahead The readahead to use. null means to use the default.
* @throws IOException If there was an error changing the dropBehind
* setting.
* UnsupportedOperationException If this stream doesn't support
* setting readahead.
*/
public void setReadahead(Long readahead)
throws IOException, UnsupportedOperationException;
}

View File

@ -28,7 +28,8 @@
@InterfaceAudience.Public @InterfaceAudience.Public
@InterfaceStability.Stable @InterfaceStability.Stable
public class FSDataInputStream extends DataInputStream public class FSDataInputStream extends DataInputStream
implements Seekable, PositionedReadable, Closeable, ByteBufferReadable, HasFileDescriptor { implements Seekable, PositionedReadable, Closeable,
ByteBufferReadable, HasFileDescriptor, CanSetDropBehind, CanSetReadahead {
public FSDataInputStream(InputStream in) public FSDataInputStream(InputStream in)
throws IOException { throws IOException {
@ -143,4 +144,27 @@ public FileDescriptor getFileDescriptor() throws IOException {
return null; return null;
} }
} }
@Override
public void setReadahead(Long readahead)
throws IOException, UnsupportedOperationException {
try {
((CanSetReadahead)in).setReadahead(readahead);
} catch (ClassCastException e) {
throw new UnsupportedOperationException(
"this stream does not support setting the readahead " +
"caching strategy.");
}
}
@Override
public void setDropBehind(Boolean dropBehind)
throws IOException, UnsupportedOperationException {
try {
((CanSetDropBehind)in).setDropBehind(dropBehind);
} catch (ClassCastException e) {
throw new UnsupportedOperationException("this stream does not " +
"support setting the drop-behind caching setting.");
}
}
} }

View File

@ -17,7 +17,6 @@
*/ */
package org.apache.hadoop.fs; package org.apache.hadoop.fs;
import java.io.BufferedOutputStream;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.FilterOutputStream; import java.io.FilterOutputStream;
import java.io.IOException; import java.io.IOException;
@ -30,7 +29,8 @@
*/ */
@InterfaceAudience.Public @InterfaceAudience.Public
@InterfaceStability.Stable @InterfaceStability.Stable
public class FSDataOutputStream extends DataOutputStream implements Syncable { public class FSDataOutputStream extends DataOutputStream
implements Syncable, CanSetDropBehind {
private final OutputStream wrappedStream; private final OutputStream wrappedStream;
private static class PositionCache extends FilterOutputStream { private static class PositionCache extends FilterOutputStream {
@ -125,4 +125,14 @@ public void hsync() throws IOException {
wrappedStream.flush(); wrappedStream.flush();
} }
} }
@Override
public void setDropBehind(Boolean dropBehind) throws IOException {
try {
((CanSetDropBehind)wrappedStream).setDropBehind(dropBehind);
} catch (ClassCastException e) {
throw new UnsupportedOperationException("the wrapped stream does " +
"not support setting the drop-behind caching setting.");
}
}
} }

View File

@ -810,7 +810,8 @@ private static class HarFSDataInputStream extends FSDataInputStream {
/** /**
* Create an input stream that fakes all the reads/positions/seeking. * Create an input stream that fakes all the reads/positions/seeking.
*/ */
private static class HarFsInputStream extends FSInputStream { private static class HarFsInputStream extends FSInputStream
implements CanSetDropBehind, CanSetReadahead {
private long position, start, end; private long position, start, end;
//The underlying data input stream that the //The underlying data input stream that the
// underlying filesystem will return. // underlying filesystem will return.
@ -958,6 +959,17 @@ public void readFully(long pos, byte[] b) throws IOException {
readFully(pos, b, 0, b.length); readFully(pos, b, 0, b.length);
} }
@Override
public void setReadahead(Long readahead)
throws IOException, UnsupportedEncodingException {
underLyingStream.setReadahead(readahead);
}
@Override
public void setDropBehind(Boolean dropBehind)
throws IOException, UnsupportedEncodingException {
underLyingStream.setDropBehind(dropBehind);
}
} }
/** /**

View File

@ -203,7 +203,7 @@ public void run() {
// It's also possible that we'll end up requesting readahead on some // It's also possible that we'll end up requesting readahead on some
// other FD, which may be wasted work, but won't cause a problem. // other FD, which may be wasted work, but won't cause a problem.
try { try {
NativeIO.POSIX.posixFadviseIfPossible(fd, off, len, NativeIO.POSIX.posixFadviseIfPossible(identifier, fd, off, len,
NativeIO.POSIX.POSIX_FADV_WILLNEED); NativeIO.POSIX.POSIX_FADV_WILLNEED);
} catch (IOException ioe) { } catch (IOException ioe) {
if (canceled) { if (canceled) {

View File

@ -37,6 +37,8 @@
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import com.google.common.annotations.VisibleForTesting;
/** /**
* JNI wrappers for various native IO-related calls not available in Java. * JNI wrappers for various native IO-related calls not available in Java.
* These functions should generally be used alongside a fallback to another * These functions should generally be used alongside a fallback to another
@ -92,6 +94,9 @@ public static class POSIX {
private static final Log LOG = LogFactory.getLog(NativeIO.class); private static final Log LOG = LogFactory.getLog(NativeIO.class);
@VisibleForTesting
public static CacheTracker cacheTracker = null;
private static boolean nativeLoaded = false; private static boolean nativeLoaded = false;
private static boolean fadvisePossible = true; private static boolean fadvisePossible = true;
private static boolean syncFileRangePossible = true; private static boolean syncFileRangePossible = true;
@ -102,6 +107,10 @@ public static class POSIX {
private static long cacheTimeout = -1; private static long cacheTimeout = -1;
public static interface CacheTracker {
public void fadvise(String identifier, long offset, long len, int flags);
}
static { static {
if (NativeCodeLoader.isNativeCodeLoaded()) { if (NativeCodeLoader.isNativeCodeLoaded()) {
try { try {
@ -178,9 +187,12 @@ static native void sync_file_range(
* *
* @throws NativeIOException if there is an error with the syscall * @throws NativeIOException if there is an error with the syscall
*/ */
public static void posixFadviseIfPossible( public static void posixFadviseIfPossible(String identifier,
FileDescriptor fd, long offset, long len, int flags) FileDescriptor fd, long offset, long len, int flags)
throws NativeIOException { throws NativeIOException {
if (cacheTracker != null) {
cacheTracker.fadvise(identifier, offset, len, flags);
}
if (nativeLoaded && fadvisePossible) { if (nativeLoaded && fadvisePossible) {
try { try {
posix_fadvise(fd, offset, len, flags); posix_fadvise(fd, offset, len, flags);

View File

@ -258,6 +258,9 @@ Release 2.3.0 - UNRELEASED
HDFS-4278. Log an ERROR when DFS_BLOCK_ACCESS_TOKEN_ENABLE config is HDFS-4278. Log an ERROR when DFS_BLOCK_ACCESS_TOKEN_ENABLE config is
disabled but security is turned on. (Kousuke Saruta via harsh) disabled but security is turned on. (Kousuke Saruta via harsh)
HDFS-4817. Make HDFS advisory caching configurable on a per-file basis.
(Colin Patrick McCabe)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -35,6 +35,7 @@
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.unix.DomainSocket; import org.apache.hadoop.net.unix.DomainSocket;
@ -85,7 +86,8 @@ public static BlockReader newBlockReader(DFSClient.Conf conf,
DomainSocketFactory domSockFactory, DomainSocketFactory domSockFactory,
PeerCache peerCache, PeerCache peerCache,
FileInputStreamCache fisCache, FileInputStreamCache fisCache,
boolean allowShortCircuitLocalReads) boolean allowShortCircuitLocalReads,
CachingStrategy cachingStrategy)
throws IOException { throws IOException {
peer.setReadTimeout(conf.socketTimeout); peer.setReadTimeout(conf.socketTimeout);
peer.setWriteTimeout(HdfsServerConstants.WRITE_TIMEOUT); peer.setWriteTimeout(HdfsServerConstants.WRITE_TIMEOUT);
@ -122,12 +124,14 @@ public static BlockReader newBlockReader(DFSClient.Conf conf,
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
RemoteBlockReader reader = RemoteBlockReader.newBlockReader(file, RemoteBlockReader reader = RemoteBlockReader.newBlockReader(file,
block, blockToken, startOffset, len, conf.ioBufferSize, block, blockToken, startOffset, len, conf.ioBufferSize,
verifyChecksum, clientName, peer, datanodeID, peerCache); verifyChecksum, clientName, peer, datanodeID, peerCache,
cachingStrategy);
return reader; return reader;
} else { } else {
return RemoteBlockReader2.newBlockReader( return RemoteBlockReader2.newBlockReader(
file, block, blockToken, startOffset, len, file, block, blockToken, startOffset, len,
verifyChecksum, clientName, peer, datanodeID, peerCache); verifyChecksum, clientName, peer, datanodeID, peerCache,
cachingStrategy);
} }
} }

View File

@ -44,6 +44,9 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_READS;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_WRITES;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_READAHEAD;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
@ -133,6 +136,7 @@
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.SafeModeException; import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.DataOutputBuffer;
@ -198,6 +202,8 @@ public class DFSClient implements java.io.Closeable {
private SocketAddress[] localInterfaceAddrs; private SocketAddress[] localInterfaceAddrs;
private DataEncryptionKey encryptionKey; private DataEncryptionKey encryptionKey;
private boolean shouldUseLegacyBlockReaderLocal; private boolean shouldUseLegacyBlockReaderLocal;
private final CachingStrategy defaultReadCachingStrategy;
private final CachingStrategy defaultWriteCachingStrategy;
/** /**
* DFSClient configuration * DFSClient configuration
@ -496,6 +502,16 @@ public DFSClient(URI nameNodeUri, Configuration conf,
} }
this.peerCache = PeerCache.getInstance(dfsClientConf.socketCacheCapacity, dfsClientConf.socketCacheExpiry); this.peerCache = PeerCache.getInstance(dfsClientConf.socketCacheCapacity, dfsClientConf.socketCacheExpiry);
Boolean readDropBehind = (conf.get(DFS_CLIENT_CACHE_DROP_BEHIND_READS) == null) ?
null : conf.getBoolean(DFS_CLIENT_CACHE_DROP_BEHIND_READS, false);
Long readahead = (conf.get(DFS_CLIENT_CACHE_READAHEAD) == null) ?
null : conf.getLong(DFS_CLIENT_CACHE_READAHEAD, 0);
Boolean writeDropBehind = (conf.get(DFS_CLIENT_CACHE_DROP_BEHIND_WRITES) == null) ?
null : conf.getBoolean(DFS_CLIENT_CACHE_DROP_BEHIND_WRITES, false);
this.defaultReadCachingStrategy =
new CachingStrategy(readDropBehind, readahead);
this.defaultWriteCachingStrategy =
new CachingStrategy(writeDropBehind, readahead);
} }
/** /**
@ -1949,7 +1965,8 @@ private static Type inferChecksumTypeByReading(
HdfsConstants.SMALL_BUFFER_SIZE)); HdfsConstants.SMALL_BUFFER_SIZE));
DataInputStream in = new DataInputStream(pair.in); DataInputStream in = new DataInputStream(pair.in);
new Sender(out).readBlock(lb.getBlock(), lb.getBlockToken(), clientName, 0, 1, true); new Sender(out).readBlock(lb.getBlock(), lb.getBlockToken(), clientName,
0, 1, true, CachingStrategy.newDefaultStrategy());
final BlockOpResponseProto reply = final BlockOpResponseProto reply =
BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in)); BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in));
@ -2448,4 +2465,12 @@ public void disableLegacyBlockReaderLocal() {
public boolean useLegacyBlockReaderLocal() { public boolean useLegacyBlockReaderLocal() {
return shouldUseLegacyBlockReaderLocal; return shouldUseLegacyBlockReaderLocal;
} }
public CachingStrategy getDefaultReadCachingStrategy() {
return defaultReadCachingStrategy;
}
public CachingStrategy getDefaultWriteCachingStrategy() {
return defaultWriteCachingStrategy;
}
} }

View File

@ -54,6 +54,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final int DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT = 16; public static final int DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT = 16;
public static final String DFS_CLIENT_USE_DN_HOSTNAME = "dfs.client.use.datanode.hostname"; public static final String DFS_CLIENT_USE_DN_HOSTNAME = "dfs.client.use.datanode.hostname";
public static final boolean DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT = false; public static final boolean DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT = false;
public static final String DFS_CLIENT_CACHE_DROP_BEHIND_WRITES = "dfs.client.cache.drop.behind.writes";
public static final String DFS_CLIENT_CACHE_DROP_BEHIND_READS = "dfs.client.cache.drop.behind.reads";
public static final String DFS_CLIENT_CACHE_READAHEAD = "dfs.client.cache.readahead";
public static final String DFS_HDFS_BLOCKS_METADATA_ENABLED = "dfs.datanode.hdfs-blocks-metadata.enabled"; public static final String DFS_HDFS_BLOCKS_METADATA_ENABLED = "dfs.datanode.hdfs-blocks-metadata.enabled";
public static final boolean DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT = false; public static final boolean DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT = false;
public static final String DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS = "dfs.client.file-block-storage-locations.num-threads"; public static final String DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS = "dfs.client.file-block-storage-locations.num-threads";

View File

@ -36,6 +36,8 @@
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.ByteBufferReadable; import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.fs.CanSetDropBehind;
import org.apache.hadoop.fs.CanSetReadahead;
import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.UnresolvedLinkException; import org.apache.hadoop.fs.UnresolvedLinkException;
@ -50,6 +52,7 @@
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException; import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
@ -65,7 +68,8 @@
* negotiation of the namenode and various datanodes as necessary. * negotiation of the namenode and various datanodes as necessary.
****************************************************************/ ****************************************************************/
@InterfaceAudience.Private @InterfaceAudience.Private
public class DFSInputStream extends FSInputStream implements ByteBufferReadable { public class DFSInputStream extends FSInputStream
implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead {
@VisibleForTesting @VisibleForTesting
static boolean tcpReadsDisabledForTesting = false; static boolean tcpReadsDisabledForTesting = false;
private final PeerCache peerCache; private final PeerCache peerCache;
@ -80,6 +84,7 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
private LocatedBlock currentLocatedBlock = null; private LocatedBlock currentLocatedBlock = null;
private long pos = 0; private long pos = 0;
private long blockEnd = -1; private long blockEnd = -1;
private CachingStrategy cachingStrategy;
private final ReadStatistics readStatistics = new ReadStatistics(); private final ReadStatistics readStatistics = new ReadStatistics();
public static class ReadStatistics { public static class ReadStatistics {
@ -185,6 +190,8 @@ void addToDeadNodes(DatanodeInfo dnInfo) {
this.fileInputStreamCache = new FileInputStreamCache( this.fileInputStreamCache = new FileInputStreamCache(
dfsClient.getConf().shortCircuitStreamsCacheSize, dfsClient.getConf().shortCircuitStreamsCacheSize,
dfsClient.getConf().shortCircuitStreamsCacheExpiryMs); dfsClient.getConf().shortCircuitStreamsCacheExpiryMs);
this.cachingStrategy =
dfsClient.getDefaultReadCachingStrategy().duplicate();
openInfo(); openInfo();
} }
@ -1035,7 +1042,7 @@ protected BlockReader getBlockReader(InetSocketAddress dnAddr,
dfsClient.getConf(), file, block, blockToken, startOffset, dfsClient.getConf(), file, block, blockToken, startOffset,
len, verifyChecksum, clientName, peer, chosenNode, len, verifyChecksum, clientName, peer, chosenNode,
dsFactory, peerCache, fileInputStreamCache, dsFactory, peerCache, fileInputStreamCache,
allowShortCircuitLocalReads); allowShortCircuitLocalReads, cachingStrategy);
return reader; return reader;
} catch (IOException ex) { } catch (IOException ex) {
DFSClient.LOG.debug("Error making BlockReader with DomainSocket. " + DFSClient.LOG.debug("Error making BlockReader with DomainSocket. " +
@ -1058,7 +1065,7 @@ protected BlockReader getBlockReader(InetSocketAddress dnAddr,
dfsClient.getConf(), file, block, blockToken, startOffset, dfsClient.getConf(), file, block, blockToken, startOffset,
len, verifyChecksum, clientName, peer, chosenNode, len, verifyChecksum, clientName, peer, chosenNode,
dsFactory, peerCache, fileInputStreamCache, dsFactory, peerCache, fileInputStreamCache,
allowShortCircuitLocalReads); allowShortCircuitLocalReads, cachingStrategy);
return reader; return reader;
} catch (IOException e) { } catch (IOException e) {
DFSClient.LOG.warn("failed to connect to " + domSock, e); DFSClient.LOG.warn("failed to connect to " + domSock, e);
@ -1081,7 +1088,8 @@ protected BlockReader getBlockReader(InetSocketAddress dnAddr,
reader = BlockReaderFactory.newBlockReader( reader = BlockReaderFactory.newBlockReader(
dfsClient.getConf(), file, block, blockToken, startOffset, dfsClient.getConf(), file, block, blockToken, startOffset,
len, verifyChecksum, clientName, peer, chosenNode, len, verifyChecksum, clientName, peer, chosenNode,
dsFactory, peerCache, fileInputStreamCache, false); dsFactory, peerCache, fileInputStreamCache, false,
cachingStrategy);
return reader; return reader;
} catch (IOException ex) { } catch (IOException ex) {
DFSClient.LOG.debug("Error making BlockReader. Closing stale " + DFSClient.LOG.debug("Error making BlockReader. Closing stale " +
@ -1100,7 +1108,8 @@ protected BlockReader getBlockReader(InetSocketAddress dnAddr,
return BlockReaderFactory.newBlockReader( return BlockReaderFactory.newBlockReader(
dfsClient.getConf(), file, block, blockToken, startOffset, dfsClient.getConf(), file, block, blockToken, startOffset,
len, verifyChecksum, clientName, peer, chosenNode, len, verifyChecksum, clientName, peer, chosenNode,
dsFactory, peerCache, fileInputStreamCache, false); dsFactory, peerCache, fileInputStreamCache, false,
cachingStrategy);
} }
@ -1358,4 +1367,30 @@ static class DNAddrPair {
public synchronized ReadStatistics getReadStatistics() { public synchronized ReadStatistics getReadStatistics() {
return new ReadStatistics(readStatistics); return new ReadStatistics(readStatistics);
} }
private synchronized void closeCurrentBlockReader() {
if (blockReader == null) return;
// Close the current block reader so that the new caching settings can
// take effect immediately.
try {
blockReader.close();
} catch (IOException e) {
DFSClient.LOG.error("error closing blockReader", e);
}
blockReader = null;
}
@Override
public synchronized void setReadahead(Long readahead)
throws IOException {
this.cachingStrategy.setReadahead(readahead);
closeCurrentBlockReader();
}
@Override
public synchronized void setDropBehind(Boolean dropBehind)
throws IOException {
this.cachingStrategy.setDropBehind(dropBehind);
closeCurrentBlockReader();
}
} }

View File

@ -40,6 +40,7 @@
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.CanSetDropBehind;
import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSOutputSummer; import org.apache.hadoop.fs.FSOutputSummer;
import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileAlreadyExistsException;
@ -71,6 +72,7 @@
import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException; import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
import org.apache.hadoop.hdfs.server.namenode.SafeModeException; import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
import org.apache.hadoop.io.EnumSetWritable; import org.apache.hadoop.io.EnumSetWritable;
@ -83,6 +85,7 @@
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.mortbay.log.Log;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheBuilder;
@ -115,7 +118,8 @@
* starts sending packets from the dataQueue. * starts sending packets from the dataQueue.
****************************************************************/ ****************************************************************/
@InterfaceAudience.Private @InterfaceAudience.Private
public class DFSOutputStream extends FSOutputSummer implements Syncable { public class DFSOutputStream extends FSOutputSummer
implements Syncable, CanSetDropBehind {
private static final int MAX_PACKETS = 80; // each packet 64K, total 5MB private static final int MAX_PACKETS = 80; // each packet 64K, total 5MB
private final DFSClient dfsClient; private final DFSClient dfsClient;
private Socket s; private Socket s;
@ -147,6 +151,7 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
private Progressable progress; private Progressable progress;
private final short blockReplication; // replication factor of file private final short blockReplication; // replication factor of file
private boolean shouldSyncBlock = false; // force blocks to disk upon close private boolean shouldSyncBlock = false; // force blocks to disk upon close
private CachingStrategy cachingStrategy;
private static class Packet { private static class Packet {
private static final long HEART_BEAT_SEQNO = -1L; private static final long HEART_BEAT_SEQNO = -1L;
@ -1138,7 +1143,8 @@ private boolean createBlockOutputStream(DatanodeInfo[] nodes, long newGS,
// send the request // send the request
new Sender(out).writeBlock(block, accessToken, dfsClient.clientName, new Sender(out).writeBlock(block, accessToken, dfsClient.clientName,
nodes, null, recoveryFlag? stage.getRecoveryStage() : stage, nodes, null, recoveryFlag? stage.getRecoveryStage() : stage,
nodes.length, block.getNumBytes(), bytesSent, newGS, checksum); nodes.length, block.getNumBytes(), bytesSent, newGS, checksum,
cachingStrategy);
// receive ack for connect // receive ack for connect
BlockOpResponseProto resp = BlockOpResponseProto.parseFrom( BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
@ -1336,6 +1342,8 @@ private DFSOutputStream(DFSClient dfsClient, String src, Progressable progress,
this.blockSize = stat.getBlockSize(); this.blockSize = stat.getBlockSize();
this.blockReplication = stat.getReplication(); this.blockReplication = stat.getReplication();
this.progress = progress; this.progress = progress;
this.cachingStrategy =
dfsClient.getDefaultWriteCachingStrategy().duplicate();
if ((progress != null) && DFSClient.LOG.isDebugEnabled()) { if ((progress != null) && DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug( DFSClient.LOG.debug(
"Set non-null progress callback on DFSOutputStream " + src); "Set non-null progress callback on DFSOutputStream " + src);
@ -1930,4 +1938,8 @@ synchronized Token<BlockTokenIdentifier> getBlockToken() {
return streamer.getBlockToken(); return streamer.getBlockToken();
} }
@Override
public void setDropBehind(Boolean dropBehind) throws IOException {
this.cachingStrategy.setDropBehind(dropBehind);
}
} }

View File

@ -38,6 +38,7 @@
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
@ -381,13 +382,14 @@ public static RemoteBlockReader newBlockReader(String file,
int bufferSize, boolean verifyChecksum, int bufferSize, boolean verifyChecksum,
String clientName, Peer peer, String clientName, Peer peer,
DatanodeID datanodeID, DatanodeID datanodeID,
PeerCache peerCache) PeerCache peerCache,
throws IOException { CachingStrategy cachingStrategy)
throws IOException {
// in and out will be closed when sock is closed (by the caller) // in and out will be closed when sock is closed (by the caller)
final DataOutputStream out = final DataOutputStream out =
new DataOutputStream(new BufferedOutputStream(peer.getOutputStream())); new DataOutputStream(new BufferedOutputStream(peer.getOutputStream()));
new Sender(out).readBlock(block, blockToken, clientName, startOffset, len, new Sender(out).readBlock(block, blockToken, clientName, startOffset, len,
verifyChecksum); verifyChecksum, cachingStrategy);
// //
// Get bytes in block, set streams // Get bytes in block, set streams

View File

@ -44,6 +44,7 @@
import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
@ -375,12 +376,13 @@ public static BlockReader newBlockReader(String file,
boolean verifyChecksum, boolean verifyChecksum,
String clientName, String clientName,
Peer peer, DatanodeID datanodeID, Peer peer, DatanodeID datanodeID,
PeerCache peerCache) throws IOException { PeerCache peerCache,
CachingStrategy cachingStrategy) throws IOException {
// in and out will be closed when sock is closed (by the caller) // in and out will be closed when sock is closed (by the caller)
final DataOutputStream out = new DataOutputStream(new BufferedOutputStream( final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
peer.getOutputStream())); peer.getOutputStream()));
new Sender(out).readBlock(block, blockToken, clientName, startOffset, len, new Sender(out).readBlock(block, blockToken, clientName, startOffset, len,
verifyChecksum); verifyChecksum, cachingStrategy);
// //
// Get bytes in block // Get bytes in block

View File

@ -26,6 +26,7 @@
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
@ -57,13 +58,15 @@ public interface DataTransferProtocol {
* @param length maximum number of bytes for this read. * @param length maximum number of bytes for this read.
* @param sendChecksum if false, the DN should skip reading and sending * @param sendChecksum if false, the DN should skip reading and sending
* checksums * checksums
* @param cachingStrategy The caching strategy to use.
*/ */
public void readBlock(final ExtendedBlock blk, public void readBlock(final ExtendedBlock blk,
final Token<BlockTokenIdentifier> blockToken, final Token<BlockTokenIdentifier> blockToken,
final String clientName, final String clientName,
final long blockOffset, final long blockOffset,
final long length, final long length,
final boolean sendChecksum) throws IOException; final boolean sendChecksum,
final CachingStrategy cachingStrategy) throws IOException;
/** /**
* Write a block to a datanode pipeline. * Write a block to a datanode pipeline.
@ -89,7 +92,8 @@ public void writeBlock(final ExtendedBlock blk,
final long minBytesRcvd, final long minBytesRcvd,
final long maxBytesRcvd, final long maxBytesRcvd,
final long latestGenerationStamp, final long latestGenerationStamp,
final DataChecksum requestedChecksum) throws IOException; final DataChecksum requestedChecksum,
final CachingStrategy cachingStrategy) throws IOException;
/** /**
* Transfer a block to another datanode. * Transfer a block to another datanode.

View File

@ -31,8 +31,10 @@
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpTransferBlockProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpTransferBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpRequestShortCircuitAccessProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpRequestShortCircuitAccessProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
/** Receiver */ /** Receiver */
@InterfaceAudience.Private @InterfaceAudience.Private
@ -85,6 +87,14 @@ protected final void processOp(Op op) throws IOException {
} }
} }
static private CachingStrategy getCachingStrategy(CachingStrategyProto strategy) {
Boolean dropBehind = strategy.hasDropBehind() ?
strategy.getDropBehind() : null;
Long readahead = strategy.hasReadahead() ?
strategy.getReadahead() : null;
return new CachingStrategy(dropBehind, readahead);
}
/** Receive OP_READ_BLOCK */ /** Receive OP_READ_BLOCK */
private void opReadBlock() throws IOException { private void opReadBlock() throws IOException {
OpReadBlockProto proto = OpReadBlockProto.parseFrom(vintPrefixed(in)); OpReadBlockProto proto = OpReadBlockProto.parseFrom(vintPrefixed(in));
@ -93,7 +103,10 @@ private void opReadBlock() throws IOException {
proto.getHeader().getClientName(), proto.getHeader().getClientName(),
proto.getOffset(), proto.getOffset(),
proto.getLen(), proto.getLen(),
proto.getSendChecksums()); proto.getSendChecksums(),
(proto.hasCachingStrategy() ?
getCachingStrategy(proto.getCachingStrategy()) :
CachingStrategy.newDefaultStrategy()));
} }
/** Receive OP_WRITE_BLOCK */ /** Receive OP_WRITE_BLOCK */
@ -108,7 +121,10 @@ private void opWriteBlock(DataInputStream in) throws IOException {
proto.getPipelineSize(), proto.getPipelineSize(),
proto.getMinBytesRcvd(), proto.getMaxBytesRcvd(), proto.getMinBytesRcvd(), proto.getMaxBytesRcvd(),
proto.getLatestGenerationStamp(), proto.getLatestGenerationStamp(),
fromProto(proto.getRequestedChecksum())); fromProto(proto.getRequestedChecksum()),
(proto.hasCachingStrategy() ?
getCachingStrategy(proto.getCachingStrategy()) :
CachingStrategy.newDefaultStrategy()));
} }
/** Receive {@link Op#TRANSFER_BLOCK} */ /** Receive {@link Op#TRANSFER_BLOCK} */

View File

@ -35,9 +35,11 @@
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpTransferBlockProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpTransferBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpRequestShortCircuitAccessProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpRequestShortCircuitAccessProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
@ -72,19 +74,32 @@ private static void send(final DataOutputStream out, final Op opcode,
out.flush(); out.flush();
} }
static private CachingStrategyProto getCachingStrategy(CachingStrategy cachingStrategy) {
CachingStrategyProto.Builder builder = CachingStrategyProto.newBuilder();
if (cachingStrategy.getReadahead() != null) {
builder.setReadahead(cachingStrategy.getReadahead().longValue());
}
if (cachingStrategy.getDropBehind() != null) {
builder.setDropBehind(cachingStrategy.getDropBehind().booleanValue());
}
return builder.build();
}
@Override @Override
public void readBlock(final ExtendedBlock blk, public void readBlock(final ExtendedBlock blk,
final Token<BlockTokenIdentifier> blockToken, final Token<BlockTokenIdentifier> blockToken,
final String clientName, final String clientName,
final long blockOffset, final long blockOffset,
final long length, final long length,
final boolean sendChecksum) throws IOException { final boolean sendChecksum,
final CachingStrategy cachingStrategy) throws IOException {
OpReadBlockProto proto = OpReadBlockProto.newBuilder() OpReadBlockProto proto = OpReadBlockProto.newBuilder()
.setHeader(DataTransferProtoUtil.buildClientHeader(blk, clientName, blockToken)) .setHeader(DataTransferProtoUtil.buildClientHeader(blk, clientName, blockToken))
.setOffset(blockOffset) .setOffset(blockOffset)
.setLen(length) .setLen(length)
.setSendChecksums(sendChecksum) .setSendChecksums(sendChecksum)
.setCachingStrategy(getCachingStrategy(cachingStrategy))
.build(); .build();
send(out, Op.READ_BLOCK, proto); send(out, Op.READ_BLOCK, proto);
@ -102,7 +117,8 @@ public void writeBlock(final ExtendedBlock blk,
final long minBytesRcvd, final long minBytesRcvd,
final long maxBytesRcvd, final long maxBytesRcvd,
final long latestGenerationStamp, final long latestGenerationStamp,
DataChecksum requestedChecksum) throws IOException { DataChecksum requestedChecksum,
final CachingStrategy cachingStrategy) throws IOException {
ClientOperationHeaderProto header = DataTransferProtoUtil.buildClientHeader( ClientOperationHeaderProto header = DataTransferProtoUtil.buildClientHeader(
blk, clientName, blockToken); blk, clientName, blockToken);
@ -117,7 +133,8 @@ public void writeBlock(final ExtendedBlock blk,
.setMinBytesRcvd(minBytesRcvd) .setMinBytesRcvd(minBytesRcvd)
.setMaxBytesRcvd(maxBytesRcvd) .setMaxBytesRcvd(maxBytesRcvd)
.setLatestGenerationStamp(latestGenerationStamp) .setLatestGenerationStamp(latestGenerationStamp)
.setRequestedChecksum(checksumProto); .setRequestedChecksum(checksumProto)
.setCachingStrategy(getCachingStrategy(cachingStrategy));
if (source != null) { if (source != null) {
proto.setSource(PBHelper.convertDatanodeInfo(source)); proto.setSource(PBHelper.convertDatanodeInfo(source));

View File

@ -56,6 +56,7 @@
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeHttpServer; import org.apache.hadoop.hdfs.server.namenode.NameNodeHttpServer;
import org.apache.hadoop.hdfs.web.resources.DelegationParam; import org.apache.hadoop.hdfs.web.resources.DelegationParam;
@ -217,7 +218,7 @@ public static void streamBlockInAscii(InetSocketAddress addr, String poolId,
"JspHelper", TcpPeerServer.peerFromSocketAndKey(s, encryptionKey), "JspHelper", TcpPeerServer.peerFromSocketAndKey(s, encryptionKey),
new DatanodeID(addr.getAddress().getHostAddress(), new DatanodeID(addr.getAddress().getHostAddress(),
addr.getHostName(), poolId, addr.getPort(), 0, 0), null, addr.getHostName(), poolId, addr.getPort(), 0, 0), null,
null, null, false); null, null, false, CachingStrategy.newDefaultStrategy());
final byte[] buf = new byte[amtToRead]; final byte[] buf = new byte[amtToRead];
int readOffset = 0; int readOffset = 0;

View File

@ -417,7 +417,7 @@ void verifyBlock(ExtendedBlock block) {
adjustThrottler(); adjustThrottler();
blockSender = new BlockSender(block, 0, -1, false, true, true, blockSender = new BlockSender(block, 0, -1, false, true, true,
datanode, null); datanode, null, CachingStrategy.newDropBehind());
DataOutputStream out = DataOutputStream out =
new DataOutputStream(new IOUtils.NullOutputStream()); new DataOutputStream(new IOUtils.NullOutputStream());

View File

@ -52,6 +52,8 @@
import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
import com.google.common.annotations.VisibleForTesting;
/** A class that receives a block and writes to its own disk, meanwhile /** A class that receives a block and writes to its own disk, meanwhile
* may copies it to another site. If a throttler is provided, * may copies it to another site. If a throttler is provided,
* streaming throttling is also supported. * streaming throttling is also supported.
@ -60,7 +62,8 @@ class BlockReceiver implements Closeable {
public static final Log LOG = DataNode.LOG; public static final Log LOG = DataNode.LOG;
static final Log ClientTraceLog = DataNode.ClientTraceLog; static final Log ClientTraceLog = DataNode.ClientTraceLog;
private static final long CACHE_DROP_LAG_BYTES = 8 * 1024 * 1024; @VisibleForTesting
static long CACHE_DROP_LAG_BYTES = 8 * 1024 * 1024;
private DataInputStream in = null; // from where data are read private DataInputStream in = null; // from where data are read
private DataChecksum clientChecksum; // checksum used by client private DataChecksum clientChecksum; // checksum used by client
@ -96,8 +99,8 @@ class BlockReceiver implements Closeable {
// Cache management state // Cache management state
private boolean dropCacheBehindWrites; private boolean dropCacheBehindWrites;
private long lastCacheManagementOffset = 0;
private boolean syncBehindWrites; private boolean syncBehindWrites;
private long lastCacheDropOffset = 0;
/** The client name. It is empty if a datanode is the client */ /** The client name. It is empty if a datanode is the client */
private final String clientname; private final String clientname;
@ -119,8 +122,8 @@ class BlockReceiver implements Closeable {
final BlockConstructionStage stage, final BlockConstructionStage stage,
final long newGs, final long minBytesRcvd, final long maxBytesRcvd, final long newGs, final long minBytesRcvd, final long maxBytesRcvd,
final String clientname, final DatanodeInfo srcDataNode, final String clientname, final DatanodeInfo srcDataNode,
final DataNode datanode, DataChecksum requestedChecksum) final DataNode datanode, DataChecksum requestedChecksum,
throws IOException { CachingStrategy cachingStrategy) throws IOException {
try{ try{
this.block = block; this.block = block;
this.in = in; this.in = in;
@ -145,6 +148,7 @@ class BlockReceiver implements Closeable {
+ "\n isClient =" + isClient + ", clientname=" + clientname + "\n isClient =" + isClient + ", clientname=" + clientname
+ "\n isDatanode=" + isDatanode + ", srcDataNode=" + srcDataNode + "\n isDatanode=" + isDatanode + ", srcDataNode=" + srcDataNode
+ "\n inAddr=" + inAddr + ", myAddr=" + myAddr + "\n inAddr=" + inAddr + ", myAddr=" + myAddr
+ "\n cachingStrategy = " + cachingStrategy
); );
} }
@ -191,7 +195,9 @@ class BlockReceiver implements Closeable {
" while receiving block " + block + " from " + inAddr); " while receiving block " + block + " from " + inAddr);
} }
} }
this.dropCacheBehindWrites = datanode.getDnConf().dropCacheBehindWrites; this.dropCacheBehindWrites = (cachingStrategy.getDropBehind() == null) ?
datanode.getDnConf().dropCacheBehindWrites :
cachingStrategy.getDropBehind();
this.syncBehindWrites = datanode.getDnConf().syncBehindWrites; this.syncBehindWrites = datanode.getDnConf().syncBehindWrites;
final boolean isCreate = isDatanode || isTransfer final boolean isCreate = isDatanode || isTransfer
@ -597,7 +603,7 @@ private int receivePacket() throws IOException {
datanode.metrics.incrBytesWritten(len); datanode.metrics.incrBytesWritten(len);
dropOsCacheBehindWriter(offsetInBlock); manageWriterOsCache(offsetInBlock);
} }
} catch (IOException iex) { } catch (IOException iex) {
datanode.checkDiskError(iex); datanode.checkDiskError(iex);
@ -619,25 +625,44 @@ private int receivePacket() throws IOException {
return lastPacketInBlock?-1:len; return lastPacketInBlock?-1:len;
} }
private void dropOsCacheBehindWriter(long offsetInBlock) { private void manageWriterOsCache(long offsetInBlock) {
try { try {
if (outFd != null && if (outFd != null &&
offsetInBlock > lastCacheDropOffset + CACHE_DROP_LAG_BYTES) { offsetInBlock > lastCacheManagementOffset + CACHE_DROP_LAG_BYTES) {
long twoWindowsAgo = lastCacheDropOffset - CACHE_DROP_LAG_BYTES; //
if (twoWindowsAgo > 0 && dropCacheBehindWrites) { // For SYNC_FILE_RANGE_WRITE, we want to sync from
NativeIO.POSIX.posixFadviseIfPossible(outFd, 0, lastCacheDropOffset, // lastCacheManagementOffset to a position "two windows ago"
NativeIO.POSIX.POSIX_FADV_DONTNEED); //
} // <========= sync ===========>
// +-----------------------O--------------------------X
// start last curPos
// of file
//
if (syncBehindWrites) { if (syncBehindWrites) {
NativeIO.POSIX.syncFileRangeIfPossible(outFd, lastCacheDropOffset, CACHE_DROP_LAG_BYTES, NativeIO.POSIX.syncFileRangeIfPossible(outFd,
lastCacheManagementOffset,
offsetInBlock - lastCacheManagementOffset,
NativeIO.POSIX.SYNC_FILE_RANGE_WRITE); NativeIO.POSIX.SYNC_FILE_RANGE_WRITE);
} }
//
lastCacheDropOffset += CACHE_DROP_LAG_BYTES; // For POSIX_FADV_DONTNEED, we want to drop from the beginning
// of the file to a position prior to the current position.
//
// <=== drop =====>
// <---W--->
// +--------------+--------O--------------------------X
// start dropPos last curPos
// of file
//
long dropPos = lastCacheManagementOffset - CACHE_DROP_LAG_BYTES;
if (dropPos > 0 && dropCacheBehindWrites) {
NativeIO.POSIX.posixFadviseIfPossible(block.getBlockName(),
outFd, 0, dropPos, NativeIO.POSIX.POSIX_FADV_DONTNEED);
}
lastCacheManagementOffset = offsetInBlock;
} }
} catch (Throwable t) { } catch (Throwable t) {
LOG.warn("Couldn't drop os cache behind writer for " + block, t); LOG.warn("Error managing cache for writer of block " + block, t);
} }
} }

View File

@ -45,6 +45,7 @@
import org.apache.hadoop.net.SocketOutputStream; import org.apache.hadoop.net.SocketOutputStream;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
/** /**
@ -141,13 +142,22 @@ class BlockSender implements java.io.Closeable {
// Cache-management related fields // Cache-management related fields
private final long readaheadLength; private final long readaheadLength;
private boolean shouldDropCacheBehindRead;
private ReadaheadRequest curReadahead; private ReadaheadRequest curReadahead;
private final boolean alwaysReadahead;
private final boolean dropCacheBehindLargeReads;
private final boolean dropCacheBehindAllReads;
private long lastCacheDropOffset; private long lastCacheDropOffset;
private static final long CACHE_DROP_INTERVAL_BYTES = 1024 * 1024; // 1MB
@VisibleForTesting
static long CACHE_DROP_INTERVAL_BYTES = 1024 * 1024; // 1MB
/** /**
* Minimum length of read below which management of the OS * See {{@link BlockSender#isLongRead()}
* buffer cache is disabled.
*/ */
private static final long LONG_READ_THRESHOLD_BYTES = 256 * 1024; private static final long LONG_READ_THRESHOLD_BYTES = 256 * 1024;
@ -167,16 +177,42 @@ class BlockSender implements java.io.Closeable {
*/ */
BlockSender(ExtendedBlock block, long startOffset, long length, BlockSender(ExtendedBlock block, long startOffset, long length,
boolean corruptChecksumOk, boolean verifyChecksum, boolean corruptChecksumOk, boolean verifyChecksum,
boolean sendChecksum, boolean sendChecksum, DataNode datanode, String clientTraceFmt,
DataNode datanode, String clientTraceFmt) CachingStrategy cachingStrategy)
throws IOException { throws IOException {
try { try {
this.block = block; this.block = block;
this.corruptChecksumOk = corruptChecksumOk; this.corruptChecksumOk = corruptChecksumOk;
this.verifyChecksum = verifyChecksum; this.verifyChecksum = verifyChecksum;
this.clientTraceFmt = clientTraceFmt; this.clientTraceFmt = clientTraceFmt;
this.readaheadLength = datanode.getDnConf().readaheadLength;
this.shouldDropCacheBehindRead = datanode.getDnConf().dropCacheBehindReads; /*
* If the client asked for the cache to be dropped behind all reads,
* we honor that. Otherwise, we use the DataNode defaults.
* When using DataNode defaults, we use a heuristic where we only
* drop the cache for large reads.
*/
if (cachingStrategy.getDropBehind() == null) {
this.dropCacheBehindAllReads = false;
this.dropCacheBehindLargeReads =
datanode.getDnConf().dropCacheBehindReads;
} else {
this.dropCacheBehindAllReads =
this.dropCacheBehindLargeReads =
cachingStrategy.getDropBehind().booleanValue();
}
/*
* Similarly, if readahead was explicitly requested, we always do it.
* Otherwise, we read ahead based on the DataNode settings, and only
* when the reads are large.
*/
if (cachingStrategy.getReadahead() == null) {
this.alwaysReadahead = false;
this.readaheadLength = datanode.getDnConf().readaheadLength;
} else {
this.alwaysReadahead = true;
this.readaheadLength = cachingStrategy.getReadahead().longValue();
}
this.datanode = datanode; this.datanode = datanode;
if (verifyChecksum) { if (verifyChecksum) {
@ -335,10 +371,11 @@ class BlockSender implements java.io.Closeable {
*/ */
@Override @Override
public void close() throws IOException { public void close() throws IOException {
if (blockInFd != null && shouldDropCacheBehindRead && isLongRead()) { if (blockInFd != null &&
// drop the last few MB of the file from cache ((dropCacheBehindAllReads) ||
(dropCacheBehindLargeReads && isLongRead()))) {
try { try {
NativeIO.POSIX.posixFadviseIfPossible( NativeIO.POSIX.posixFadviseIfPossible(block.getBlockName(),
blockInFd, lastCacheDropOffset, offset - lastCacheDropOffset, blockInFd, lastCacheDropOffset, offset - lastCacheDropOffset,
NativeIO.POSIX.POSIX_FADV_DONTNEED); NativeIO.POSIX.POSIX_FADV_DONTNEED);
} catch (Exception e) { } catch (Exception e) {
@ -637,7 +674,7 @@ long sendBlock(DataOutputStream out, OutputStream baseStream,
if (isLongRead() && blockInFd != null) { if (isLongRead() && blockInFd != null) {
// Advise that this file descriptor will be accessed sequentially. // Advise that this file descriptor will be accessed sequentially.
NativeIO.POSIX.posixFadviseIfPossible( NativeIO.POSIX.posixFadviseIfPossible(block.getBlockName(),
blockInFd, 0, 0, NativeIO.POSIX.POSIX_FADV_SEQUENTIAL); blockInFd, 0, 0, NativeIO.POSIX.POSIX_FADV_SEQUENTIAL);
} }
@ -705,37 +742,47 @@ long sendBlock(DataOutputStream out, OutputStream baseStream,
* and drop-behind. * and drop-behind.
*/ */
private void manageOsCache() throws IOException { private void manageOsCache() throws IOException {
if (!isLongRead() || blockInFd == null) { // We can't manage the cache for this block if we don't have a file
// don't manage cache manually for short-reads, like // descriptor to work with.
// HBase random read workloads. if (blockInFd == null) return;
return;
}
// Perform readahead if necessary // Perform readahead if necessary
if (readaheadLength > 0 && datanode.readaheadPool != null) { if ((readaheadLength > 0) && (datanode.readaheadPool != null) &&
(alwaysReadahead || isLongRead())) {
curReadahead = datanode.readaheadPool.readaheadStream( curReadahead = datanode.readaheadPool.readaheadStream(
clientTraceFmt, blockInFd, clientTraceFmt, blockInFd, offset, readaheadLength, Long.MAX_VALUE,
offset, readaheadLength, Long.MAX_VALUE,
curReadahead); curReadahead);
} }
// Drop what we've just read from cache, since we aren't // Drop what we've just read from cache, since we aren't
// likely to need it again // likely to need it again
long nextCacheDropOffset = lastCacheDropOffset + CACHE_DROP_INTERVAL_BYTES; if (dropCacheBehindAllReads ||
if (shouldDropCacheBehindRead && (dropCacheBehindLargeReads && isLongRead())) {
offset >= nextCacheDropOffset) { long nextCacheDropOffset = lastCacheDropOffset + CACHE_DROP_INTERVAL_BYTES;
long dropLength = offset - lastCacheDropOffset; if (offset >= nextCacheDropOffset) {
if (dropLength >= 1024) { long dropLength = offset - lastCacheDropOffset;
NativeIO.POSIX.posixFadviseIfPossible(blockInFd, NativeIO.POSIX.posixFadviseIfPossible(block.getBlockName(),
lastCacheDropOffset, dropLength, blockInFd, lastCacheDropOffset, dropLength,
NativeIO.POSIX.POSIX_FADV_DONTNEED); NativeIO.POSIX.POSIX_FADV_DONTNEED);
lastCacheDropOffset = offset;
} }
lastCacheDropOffset += CACHE_DROP_INTERVAL_BYTES;
} }
} }
/**
* Returns true if we have done a long enough read for this block to qualify
* for the DataNode-wide cache management defaults. We avoid applying the
* cache management defaults to smaller reads because the overhead would be
* too high.
*
* Note that if the client explicitly asked for dropBehind, we will do it
* even on short reads.
*
* This is also used to determine when to invoke
* posix_fadvise(POSIX_FADV_SEQUENTIAL).
*/
private boolean isLongRead() { private boolean isLongRead() {
return (endOffset - offset) > LONG_READ_THRESHOLD_BYTES; return (endOffset - initialOffset) > LONG_READ_THRESHOLD_BYTES;
} }
/** /**

View File

@ -0,0 +1,64 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
/**
* The caching strategy we should use for an HDFS read or write operation.
*/
public class CachingStrategy {
private Boolean dropBehind; // null = use server defaults
private Long readahead; // null = use server defaults
public static CachingStrategy newDefaultStrategy() {
return new CachingStrategy(null, null);
}
public static CachingStrategy newDropBehind() {
return new CachingStrategy(true, null);
}
public CachingStrategy duplicate() {
return new CachingStrategy(this.dropBehind, this.readahead);
}
public CachingStrategy(Boolean dropBehind, Long readahead) {
this.dropBehind = dropBehind;
this.readahead = readahead;
}
public Boolean getDropBehind() {
return dropBehind;
}
public void setDropBehind(Boolean dropBehind) {
this.dropBehind = dropBehind;
}
public Long getReadahead() {
return readahead;
}
public void setReadahead(Long readahead) {
this.readahead = readahead;
}
public String toString() {
return "CachingStrategy(dropBehind=" + dropBehind +
", readahead=" + readahead + ")";
}
}

View File

@ -1519,6 +1519,7 @@ private class DataTransfer implements Runnable {
final BlockConstructionStage stage; final BlockConstructionStage stage;
final private DatanodeRegistration bpReg; final private DatanodeRegistration bpReg;
final String clientname; final String clientname;
final CachingStrategy cachingStrategy;
/** /**
* Connect to the first item in the target list. Pass along the * Connect to the first item in the target list. Pass along the
@ -1539,6 +1540,8 @@ private class DataTransfer implements Runnable {
BPOfferService bpos = blockPoolManager.get(b.getBlockPoolId()); BPOfferService bpos = blockPoolManager.get(b.getBlockPoolId());
bpReg = bpos.bpRegistration; bpReg = bpos.bpRegistration;
this.clientname = clientname; this.clientname = clientname;
this.cachingStrategy =
new CachingStrategy(true, getDnConf().readaheadLength);
} }
/** /**
@ -1581,7 +1584,7 @@ public void run() {
HdfsConstants.SMALL_BUFFER_SIZE)); HdfsConstants.SMALL_BUFFER_SIZE));
in = new DataInputStream(unbufIn); in = new DataInputStream(unbufIn);
blockSender = new BlockSender(b, 0, b.getNumBytes(), blockSender = new BlockSender(b, 0, b.getNumBytes(),
false, false, true, DataNode.this, null); false, false, true, DataNode.this, null, cachingStrategy);
DatanodeInfo srcNode = new DatanodeInfo(bpReg); DatanodeInfo srcNode = new DatanodeInfo(bpReg);
// //
@ -1594,7 +1597,7 @@ public void run() {
} }
new Sender(out).writeBlock(b, accessToken, clientname, targets, srcNode, new Sender(out).writeBlock(b, accessToken, clientname, targets, srcNode,
stage, 0, 0, 0, 0, blockSender.getChecksum()); stage, 0, 0, 0, 0, blockSender.getChecksum(), cachingStrategy);
// send data & checksum // send data & checksum
blockSender.sendBlock(out, unbufOut, null); blockSender.sendBlock(out, unbufOut, null);

View File

@ -299,7 +299,8 @@ public void readBlock(final ExtendedBlock block,
final String clientName, final String clientName,
final long blockOffset, final long blockOffset,
final long length, final long length,
final boolean sendChecksum) throws IOException { final boolean sendChecksum,
final CachingStrategy cachingStrategy) throws IOException {
previousOpClientName = clientName; previousOpClientName = clientName;
OutputStream baseStream = getOutputStream(); OutputStream baseStream = getOutputStream();
@ -324,7 +325,8 @@ public void readBlock(final ExtendedBlock block,
try { try {
try { try {
blockSender = new BlockSender(block, blockOffset, length, blockSender = new BlockSender(block, blockOffset, length,
true, false, sendChecksum, datanode, clientTraceFmt); true, false, sendChecksum, datanode, clientTraceFmt,
cachingStrategy);
} catch(IOException e) { } catch(IOException e) {
String msg = "opReadBlock " + block + " received exception " + e; String msg = "opReadBlock " + block + " received exception " + e;
LOG.info(msg); LOG.info(msg);
@ -393,7 +395,8 @@ public void writeBlock(final ExtendedBlock block,
final long minBytesRcvd, final long minBytesRcvd,
final long maxBytesRcvd, final long maxBytesRcvd,
final long latestGenerationStamp, final long latestGenerationStamp,
DataChecksum requestedChecksum) throws IOException { DataChecksum requestedChecksum,
CachingStrategy cachingStrategy) throws IOException {
previousOpClientName = clientname; previousOpClientName = clientname;
updateCurrentThreadName("Receiving block " + block); updateCurrentThreadName("Receiving block " + block);
final boolean isDatanode = clientname.length() == 0; final boolean isDatanode = clientname.length() == 0;
@ -452,7 +455,8 @@ public void writeBlock(final ExtendedBlock block,
peer.getRemoteAddressString(), peer.getRemoteAddressString(),
peer.getLocalAddressString(), peer.getLocalAddressString(),
stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd, stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
clientname, srcDataNode, datanode, requestedChecksum); clientname, srcDataNode, datanode, requestedChecksum,
cachingStrategy);
} else { } else {
datanode.data.recoverClose(block, latestGenerationStamp, minBytesRcvd); datanode.data.recoverClose(block, latestGenerationStamp, minBytesRcvd);
} }
@ -497,7 +501,8 @@ public void writeBlock(final ExtendedBlock block,
new Sender(mirrorOut).writeBlock(originalBlock, blockToken, new Sender(mirrorOut).writeBlock(originalBlock, blockToken,
clientname, targets, srcDataNode, stage, pipelineSize, clientname, targets, srcDataNode, stage, pipelineSize,
minBytesRcvd, maxBytesRcvd, latestGenerationStamp, requestedChecksum); minBytesRcvd, maxBytesRcvd, latestGenerationStamp, requestedChecksum,
cachingStrategy);
mirrorOut.flush(); mirrorOut.flush();
@ -715,7 +720,7 @@ public void copyBlock(final ExtendedBlock block,
try { try {
// check if the block exists or not // check if the block exists or not
blockSender = new BlockSender(block, 0, -1, false, false, true, datanode, blockSender = new BlockSender(block, 0, -1, false, false, true, datanode,
null); null, CachingStrategy.newDropBehind());
// set up response stream // set up response stream
OutputStream baseStream = getOutputStream(); OutputStream baseStream = getOutputStream();
@ -846,7 +851,8 @@ public void replaceBlock(final ExtendedBlock block,
blockReceiver = new BlockReceiver( blockReceiver = new BlockReceiver(
block, proxyReply, proxySock.getRemoteSocketAddress().toString(), block, proxyReply, proxySock.getRemoteSocketAddress().toString(),
proxySock.getLocalSocketAddress().toString(), proxySock.getLocalSocketAddress().toString(),
null, 0, 0, 0, "", null, datanode, remoteChecksum); null, 0, 0, 0, "", null, datanode, remoteChecksum,
CachingStrategy.newDropBehind());
// receive a block // receive a block
blockReceiver.receiveBlock(null, null, null, null, blockReceiver.receiveBlock(null, null, null, null,

View File

@ -51,6 +51,7 @@
import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.NodeBase; import org.apache.hadoop.net.NodeBase;
@ -569,8 +570,8 @@ private void copyBlock(DFSClient dfs, LocatedBlock lblock,
blockReader = BlockReaderFactory.newBlockReader(dfs.getConf(), blockReader = BlockReaderFactory.newBlockReader(dfs.getConf(),
file, block, lblock.getBlockToken(), 0, -1, true, "fsck", file, block, lblock.getBlockToken(), 0, -1, true, "fsck",
TcpPeerServer.peerFromSocketAndKey(s, namenode.getRpcServer(). TcpPeerServer.peerFromSocketAndKey(s, namenode.getRpcServer().
getDataEncryptionKey()), getDataEncryptionKey()), chosenNode, null, null, null,
chosenNode, null, null, null, false); false, CachingStrategy.newDropBehind());
} catch (IOException ex) { } catch (IOException ex) {
// Put chosen node into dead list, continue // Put chosen node into dead list, continue

View File

@ -54,11 +54,17 @@ message ClientOperationHeaderProto {
required string clientName = 2; required string clientName = 2;
} }
message CachingStrategyProto {
optional bool dropBehind = 1;
optional int64 readahead = 2;
}
message OpReadBlockProto { message OpReadBlockProto {
required ClientOperationHeaderProto header = 1; required ClientOperationHeaderProto header = 1;
required uint64 offset = 2; required uint64 offset = 2;
required uint64 len = 3; required uint64 len = 3;
optional bool sendChecksums = 4 [default = true]; optional bool sendChecksums = 4 [default = true];
optional CachingStrategyProto cachingStrategy = 5;
} }
@ -100,6 +106,7 @@ message OpWriteBlockProto {
* The requested checksum mechanism for this block write. * The requested checksum mechanism for this block write.
*/ */
required ChecksumProto requestedChecksum = 9; required ChecksumProto requestedChecksum = 9;
optional CachingStrategyProto cachingStrategy = 10;
} }
message OpTransferBlockProto { message OpTransferBlockProto {

View File

@ -1307,4 +1307,49 @@
</description> </description>
</property> </property>
<property>
<name>dfs.client.cache.drop.behind.writes</name>
<value></value>
<description>
Just like dfs.datanode.drop.cache.behind.writes, this setting causes the
page cache to be dropped behind HDFS writes, potentially freeing up more
memory for other uses. Unlike dfs.datanode.drop.cache.behind.writes, this
is a client-side setting rather than a setting for the entire datanode.
If present, this setting will override the DataNode default.
If the native libraries are not available to the DataNode, this
configuration has no effect.
</description>
</property>
<property>
<name>dfs.client.cache.drop.behind.reads</name>
<value></value>
<description>
Just like dfs.datanode.drop.cache.behind.reads, this setting causes the
page cache to be dropped behind HDFS reads, potentially freeing up more
memory for other uses. Unlike dfs.datanode.drop.cache.behind.reads, this
is a client-side setting rather than a setting for the entire datanode. If
present, this setting will override the DataNode default.
If the native libraries are not available to the DataNode, this
configuration has no effect.
</description>
</property>
<property>
<name>dfs.client.cache.readahead</name>
<value></value>
<description>
Just like dfs.datanode.readahead.bytes, this setting causes the datanode to
read ahead in the block file using posix_fadvise, potentially decreasing
I/O wait times. Unlike dfs.datanode.readahead.bytes, this is a client-side
setting rather than a setting for the entire datanode. If present, this
setting will override the DataNode default.
If the native libraries are not available to the DataNode, this
configuration has no effect.
</description>
</property>
</configuration> </configuration>

View File

@ -35,6 +35,7 @@
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
@ -155,7 +156,7 @@ public BlockReader getBlockReader(LocatedBlock testBlock, int offset, int lenToR
testBlock.getBlockToken(), testBlock.getBlockToken(),
offset, lenToRead, offset, lenToRead,
true, "BlockReaderTestUtil", TcpPeerServer.peerFromSocket(sock), true, "BlockReaderTestUtil", TcpPeerServer.peerFromSocket(sock),
nodes[0], null, null, null, false); nodes[0], null, null, null, false, CachingStrategy.newDefaultStrategy());
} }
/** /**

View File

@ -56,6 +56,7 @@
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager; import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
@ -186,7 +187,7 @@ private void testWrite(ExtendedBlock block, BlockConstructionStage stage, long n
sender.writeBlock(block, BlockTokenSecretManager.DUMMY_TOKEN, "cl", sender.writeBlock(block, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
new DatanodeInfo[1], null, stage, new DatanodeInfo[1], null, stage,
0, block.getNumBytes(), block.getNumBytes(), newGS, 0, block.getNumBytes(), block.getNumBytes(), newGS,
DEFAULT_CHECKSUM); DEFAULT_CHECKSUM, CachingStrategy.newDefaultStrategy());
if (eofExcepted) { if (eofExcepted) {
sendResponse(Status.ERROR, null, null, recvOut); sendResponse(Status.ERROR, null, null, recvOut);
sendRecvData(description, true); sendRecvData(description, true);
@ -385,7 +386,7 @@ public void testDataTransferProtocol() throws IOException {
new DatanodeInfo[1], null, new DatanodeInfo[1], null,
BlockConstructionStage.PIPELINE_SETUP_CREATE, BlockConstructionStage.PIPELINE_SETUP_CREATE,
0, 0L, 0L, 0L, 0, 0L, 0L, 0L,
badChecksum); badChecksum, CachingStrategy.newDefaultStrategy());
recvBuf.reset(); recvBuf.reset();
sendResponse(Status.ERROR, null, null, recvOut); sendResponse(Status.ERROR, null, null, recvOut);
sendRecvData("wrong bytesPerChecksum while writing", true); sendRecvData("wrong bytesPerChecksum while writing", true);
@ -396,7 +397,7 @@ public void testDataTransferProtocol() throws IOException {
BlockTokenSecretManager.DUMMY_TOKEN, "cl", BlockTokenSecretManager.DUMMY_TOKEN, "cl",
new DatanodeInfo[1], null, new DatanodeInfo[1], null,
BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0L, 0L, 0L, BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0L, 0L, 0L,
DEFAULT_CHECKSUM); DEFAULT_CHECKSUM, CachingStrategy.newDefaultStrategy());
PacketHeader hdr = new PacketHeader( PacketHeader hdr = new PacketHeader(
4, // size of packet 4, // size of packet
@ -419,7 +420,7 @@ public void testDataTransferProtocol() throws IOException {
BlockTokenSecretManager.DUMMY_TOKEN, "cl", BlockTokenSecretManager.DUMMY_TOKEN, "cl",
new DatanodeInfo[1], null, new DatanodeInfo[1], null,
BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0L, 0L, 0L, BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0L, 0L, 0L,
DEFAULT_CHECKSUM); DEFAULT_CHECKSUM, CachingStrategy.newDefaultStrategy());
hdr = new PacketHeader( hdr = new PacketHeader(
8, // size of packet 8, // size of packet
@ -446,21 +447,21 @@ public void testDataTransferProtocol() throws IOException {
recvBuf.reset(); recvBuf.reset();
blk.setBlockId(blkid-1); blk.setBlockId(blkid-1);
sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl", sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
0L, fileLen, true); 0L, fileLen, true, CachingStrategy.newDefaultStrategy());
sendRecvData("Wrong block ID " + newBlockId + " for read", false); sendRecvData("Wrong block ID " + newBlockId + " for read", false);
// negative block start offset -1L // negative block start offset -1L
sendBuf.reset(); sendBuf.reset();
blk.setBlockId(blkid); blk.setBlockId(blkid);
sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl", sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
-1L, fileLen, true); -1L, fileLen, true, CachingStrategy.newDefaultStrategy());
sendRecvData("Negative start-offset for read for block " + sendRecvData("Negative start-offset for read for block " +
firstBlock.getBlockId(), false); firstBlock.getBlockId(), false);
// bad block start offset // bad block start offset
sendBuf.reset(); sendBuf.reset();
sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl", sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
fileLen, fileLen, true); fileLen, fileLen, true, CachingStrategy.newDefaultStrategy());
sendRecvData("Wrong start-offset for reading block " + sendRecvData("Wrong start-offset for reading block " +
firstBlock.getBlockId(), false); firstBlock.getBlockId(), false);
@ -477,7 +478,8 @@ public void testDataTransferProtocol() throws IOException {
sendBuf.reset(); sendBuf.reset();
sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl", sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
0L, -1L-random.nextInt(oneMil), true); 0L, -1L-random.nextInt(oneMil), true,
CachingStrategy.newDefaultStrategy());
sendRecvData("Negative length for reading block " + sendRecvData("Negative length for reading block " +
firstBlock.getBlockId(), false); firstBlock.getBlockId(), false);
@ -490,14 +492,14 @@ public void testDataTransferProtocol() throws IOException {
recvOut); recvOut);
sendBuf.reset(); sendBuf.reset();
sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl", sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
0L, fileLen+1, true); 0L, fileLen+1, true, CachingStrategy.newDefaultStrategy());
sendRecvData("Wrong length for reading block " + sendRecvData("Wrong length for reading block " +
firstBlock.getBlockId(), false); firstBlock.getBlockId(), false);
//At the end of all this, read the file to make sure that succeeds finally. //At the end of all this, read the file to make sure that succeeds finally.
sendBuf.reset(); sendBuf.reset();
sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl", sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
0L, fileLen, true); 0L, fileLen, true, CachingStrategy.newDefaultStrategy());
readFile(fileSys, file, fileLen); readFile(fileSys, file, fileLen);
} finally { } finally {
cluster.shutdown(); cluster.shutdown();

View File

@ -52,6 +52,7 @@
import org.apache.hadoop.hdfs.security.token.block.SecurityTestUtil; import org.apache.hadoop.hdfs.security.token.block.SecurityTestUtil;
import org.apache.hadoop.hdfs.server.balancer.TestBalancer; import org.apache.hadoop.hdfs.server.balancer.TestBalancer;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
@ -148,7 +149,8 @@ private static void tryRead(Configuration conf, LocatedBlock lblock,
blockReader = BlockReaderFactory.newBlockReader( blockReader = BlockReaderFactory.newBlockReader(
new DFSClient.Conf(conf), file, block, lblock.getBlockToken(), 0, -1, new DFSClient.Conf(conf), file, block, lblock.getBlockToken(), 0, -1,
true, "TestBlockTokenWithDFS", TcpPeerServer.peerFromSocket(s), true, "TestBlockTokenWithDFS", TcpPeerServer.peerFromSocket(s),
nodes[0], null, null, null, false); nodes[0], null, null, null, false,
CachingStrategy.newDefaultStrategy());
} catch (IOException ex) { } catch (IOException ex) {
if (ex instanceof InvalidBlockTokenException) { if (ex instanceof InvalidBlockTokenException) {

View File

@ -0,0 +1,369 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
import java.util.TreeMap;
import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.io.nativeio.NativeIO.POSIX.CacheTracker;
import org.junit.BeforeClass;
import org.junit.Test;
public class TestCachingStrategy {
private static final Log LOG = LogFactory.getLog(TestCachingStrategy.class);
private static int MAX_TEST_FILE_LEN = 1024 * 1024;
private static int WRITE_PACKET_SIZE = DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
private final static TestRecordingCacheTracker tracker =
new TestRecordingCacheTracker();
@BeforeClass
public static void setupTest() {
EditLogFileOutputStream.setShouldSkipFsyncForTesting(true);
// Track calls to posix_fadvise.
NativeIO.POSIX.cacheTracker = tracker;
// Normally, we wait for a few megabytes of data to be read or written
// before dropping the cache. This is to avoid an excessive number of
// JNI calls to the posix_fadvise function. However, for the purpose
// of this test, we want to use small files and see all fadvise calls
// happen.
BlockSender.CACHE_DROP_INTERVAL_BYTES = 4096;
BlockReceiver.CACHE_DROP_LAG_BYTES = 4096;
}
private static class Stats {
private final String fileName;
private final boolean dropped[] = new boolean[MAX_TEST_FILE_LEN];
Stats(String fileName) {
this.fileName = fileName;
}
synchronized void fadvise(int offset, int len, int flags) {
LOG.debug("got fadvise(offset=" + offset + ", len=" + len +
",flags=" + flags + ")");
if (flags == NativeIO.POSIX.POSIX_FADV_DONTNEED) {
for (int i = 0; i < (int)len; i++) {
dropped[(int)(offset + i)] = true;
}
}
}
synchronized void assertNotDroppedInRange(int start, int end) {
for (int i = start; i < end; i++) {
if (dropped[i]) {
throw new RuntimeException("in file " + fileName + ", we " +
"dropped the cache at offset " + i);
}
}
}
synchronized void assertDroppedInRange(int start, int end) {
for (int i = start; i < end; i++) {
if (!dropped[i]) {
throw new RuntimeException("in file " + fileName + ", we " +
"did not drop the cache at offset " + i);
}
}
}
synchronized void clear() {
Arrays.fill(dropped, false);
}
}
private static class TestRecordingCacheTracker implements CacheTracker {
private final Map<String, Stats> map = new TreeMap<String, Stats>();
@Override
synchronized public void fadvise(String name,
long offset, long len, int flags) {
if ((len < 0) || (len > Integer.MAX_VALUE)) {
throw new RuntimeException("invalid length of " + len +
" passed to posixFadviseIfPossible");
}
if ((offset < 0) || (offset > Integer.MAX_VALUE)) {
throw new RuntimeException("invalid offset of " + offset +
" passed to posixFadviseIfPossible");
}
Stats stats = map.get(name);
if (stats == null) {
stats = new Stats(name);
map.put(name, stats);
}
stats.fadvise((int)offset, (int)len, flags);
}
synchronized void clear() {
map.clear();
}
synchronized Stats getStats(String fileName) {
return map.get(fileName);
}
synchronized public String toString() {
StringBuilder bld = new StringBuilder();
bld.append("TestRecordingCacheManipulator{");
String prefix = "";
for (String fileName : map.keySet()) {
bld.append(prefix);
prefix = ", ";
bld.append(fileName);
}
bld.append("}");
return bld.toString();
}
}
static void createHdfsFile(FileSystem fs, Path p, long length,
Boolean dropBehind) throws Exception {
FSDataOutputStream fos = null;
try {
// create file with replication factor of 1
fos = fs.create(p, (short)1);
if (dropBehind != null) {
fos.setDropBehind(dropBehind);
}
byte buf[] = new byte[8196];
while (length > 0) {
int amt = (length > buf.length) ? (int)buf.length : (int)length;
fos.write(buf, 0, amt);
length -= amt;
}
} catch (IOException e) {
LOG.error("ioexception", e);
} finally {
if (fos != null) {
fos.close();
}
}
}
static long readHdfsFile(FileSystem fs, Path p, long length,
Boolean dropBehind) throws Exception {
FSDataInputStream fis = null;
long totalRead = 0;
try {
fis = fs.open(p);
if (dropBehind != null) {
fis.setDropBehind(dropBehind);
}
byte buf[] = new byte[8196];
while (length > 0) {
int amt = (length > buf.length) ? (int)buf.length : (int)length;
int ret = fis.read(buf, 0, amt);
if (ret == -1) {
return totalRead;
}
totalRead += ret;
length -= ret;
}
} catch (IOException e) {
LOG.error("ioexception", e);
} finally {
if (fis != null) {
fis.close();
}
}
throw new RuntimeException("unreachable");
}
@Test(timeout=120000)
public void testFadviseAfterWriteThenRead() throws Exception {
// start a cluster
LOG.info("testFadviseAfterWriteThenRead");
tracker.clear();
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = null;
String TEST_PATH = "/test";
int TEST_PATH_LEN = MAX_TEST_FILE_LEN;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
.build();
cluster.waitActive();
FileSystem fs = cluster.getFileSystem();
// create new file
createHdfsFile(fs, new Path(TEST_PATH), TEST_PATH_LEN, true);
// verify that we dropped everything from the cache during file creation.
ExtendedBlock block = cluster.getNameNode().getRpcServer().getBlockLocations(
TEST_PATH, 0, Long.MAX_VALUE).get(0).getBlock();
String fadvisedFileName = MiniDFSCluster.getBlockFile(0, block).getName();
Stats stats = tracker.getStats(fadvisedFileName);
stats.assertDroppedInRange(0, TEST_PATH_LEN - WRITE_PACKET_SIZE);
stats.clear();
// read file
readHdfsFile(fs, new Path(TEST_PATH), Long.MAX_VALUE, true);
// verify that we dropped everything from the cache.
Assert.assertNotNull(stats);
stats.assertDroppedInRange(0, TEST_PATH_LEN - WRITE_PACKET_SIZE);
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
/***
* Test the scenario where the DataNode defaults to not dropping the cache,
* but our client defaults are set.
*/
@Test(timeout=120000)
public void testClientDefaults() throws Exception {
// start a cluster
LOG.info("testClientDefaults");
tracker.clear();
Configuration conf = new HdfsConfiguration();
conf.setBoolean(DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_KEY, false);
conf.setBoolean(DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_KEY, false);
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_READS, true);
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_WRITES, true);
MiniDFSCluster cluster = null;
String TEST_PATH = "/test";
int TEST_PATH_LEN = MAX_TEST_FILE_LEN;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
.build();
cluster.waitActive();
FileSystem fs = cluster.getFileSystem();
// create new file
createHdfsFile(fs, new Path(TEST_PATH), TEST_PATH_LEN, null);
// verify that we dropped everything from the cache during file creation.
ExtendedBlock block = cluster.getNameNode().getRpcServer().getBlockLocations(
TEST_PATH, 0, Long.MAX_VALUE).get(0).getBlock();
String fadvisedFileName = MiniDFSCluster.getBlockFile(0, block).getName();
Stats stats = tracker.getStats(fadvisedFileName);
stats.assertDroppedInRange(0, TEST_PATH_LEN - WRITE_PACKET_SIZE);
stats.clear();
// read file
readHdfsFile(fs, new Path(TEST_PATH), Long.MAX_VALUE, null);
// verify that we dropped everything from the cache.
Assert.assertNotNull(stats);
stats.assertDroppedInRange(0, TEST_PATH_LEN - WRITE_PACKET_SIZE);
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
@Test(timeout=120000)
public void testFadviseSkippedForSmallReads() throws Exception {
// start a cluster
LOG.info("testFadviseSkippedForSmallReads");
tracker.clear();
Configuration conf = new HdfsConfiguration();
conf.setBoolean(DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_KEY, true);
conf.setBoolean(DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_KEY, true);
MiniDFSCluster cluster = null;
String TEST_PATH = "/test";
int TEST_PATH_LEN = MAX_TEST_FILE_LEN;
FSDataInputStream fis = null;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
.build();
cluster.waitActive();
FileSystem fs = cluster.getFileSystem();
// create new file
createHdfsFile(fs, new Path(TEST_PATH), TEST_PATH_LEN, null);
// Since the DataNode was configured with drop-behind, and we didn't
// specify any policy, we should have done drop-behind.
ExtendedBlock block = cluster.getNameNode().getRpcServer().getBlockLocations(
TEST_PATH, 0, Long.MAX_VALUE).get(0).getBlock();
String fadvisedFileName = MiniDFSCluster.getBlockFile(0, block).getName();
Stats stats = tracker.getStats(fadvisedFileName);
stats.assertDroppedInRange(0, TEST_PATH_LEN - WRITE_PACKET_SIZE);
stats.clear();
stats.assertNotDroppedInRange(0, TEST_PATH_LEN);
// read file
fis = fs.open(new Path(TEST_PATH));
byte buf[] = new byte[17];
fis.readFully(4096, buf, 0, buf.length);
// we should not have dropped anything because of the small read.
stats = tracker.getStats(fadvisedFileName);
stats.assertNotDroppedInRange(0, TEST_PATH_LEN - WRITE_PACKET_SIZE);
} finally {
IOUtils.cleanup(null, fis);
if (cluster != null) {
cluster.shutdown();
}
}
}
@Test(timeout=120000)
public void testNoFadviseAfterWriteThenRead() throws Exception {
// start a cluster
LOG.info("testNoFadviseAfterWriteThenRead");
tracker.clear();
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = null;
String TEST_PATH = "/test";
int TEST_PATH_LEN = MAX_TEST_FILE_LEN;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
.build();
cluster.waitActive();
FileSystem fs = cluster.getFileSystem();
// create new file
createHdfsFile(fs, new Path(TEST_PATH), TEST_PATH_LEN, false);
// verify that we did not drop everything from the cache during file creation.
ExtendedBlock block = cluster.getNameNode().getRpcServer().getBlockLocations(
TEST_PATH, 0, Long.MAX_VALUE).get(0).getBlock();
String fadvisedFileName = MiniDFSCluster.getBlockFile(0, block).getName();
Stats stats = tracker.getStats(fadvisedFileName);
Assert.assertNull(stats);
// read file
readHdfsFile(fs, new Path(TEST_PATH), Long.MAX_VALUE, false);
// verify that we dropped everything from the cache.
Assert.assertNull(stats);
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
}

View File

@ -287,7 +287,8 @@ private void accessBlock(DatanodeInfo datanode, LocatedBlock lblock)
BlockReader blockReader = BlockReader blockReader =
BlockReaderFactory.newBlockReader(new DFSClient.Conf(conf), file, block, BlockReaderFactory.newBlockReader(new DFSClient.Conf(conf), file, block,
lblock.getBlockToken(), 0, -1, true, "TestDataNodeVolumeFailure", lblock.getBlockToken(), 0, -1, true, "TestDataNodeVolumeFailure",
TcpPeerServer.peerFromSocket(s), datanode, null, null, null, false); TcpPeerServer.peerFromSocket(s), datanode, null, null, null, false,
CachingStrategy.newDefaultStrategy());
blockReader.close(); blockReader.close();
} }

View File

@ -148,7 +148,7 @@ public void testReplicationError() throws Exception {
BlockTokenSecretManager.DUMMY_TOKEN, "", BlockTokenSecretManager.DUMMY_TOKEN, "",
new DatanodeInfo[0], null, new DatanodeInfo[0], null,
BlockConstructionStage.PIPELINE_SETUP_CREATE, 1, 0L, 0L, 0L, BlockConstructionStage.PIPELINE_SETUP_CREATE, 1, 0L, 0L, 0L,
checksum); checksum, CachingStrategy.newDefaultStrategy());
out.flush(); out.flush();
// close the connection before sending the content of the block // close the connection before sending the content of the block

View File

@ -69,7 +69,7 @@ public void close() throws Exception {
} }
if (manageOsCache && getEndOffset() - getStartOffset() > 0) { if (manageOsCache && getEndOffset() - getStartOffset() > 0) {
try { try {
NativeIO.POSIX.posixFadviseIfPossible( NativeIO.POSIX.posixFadviseIfPossible(identifier,
fd, fd,
getStartOffset(), getEndOffset() - getStartOffset(), getStartOffset(), getEndOffset() - getStartOffset(),
NativeIO.POSIX.POSIX_FADV_DONTNEED); NativeIO.POSIX.POSIX_FADV_DONTNEED);

View File

@ -71,7 +71,7 @@ public void releaseExternalResources() {
} }
if (manageOsCache && getCount() > 0) { if (manageOsCache && getCount() > 0) {
try { try {
NativeIO.POSIX.posixFadviseIfPossible( NativeIO.POSIX.posixFadviseIfPossible(identifier,
fd, getPosition(), getCount(), fd, getPosition(), getCount(),
NativeIO.POSIX.POSIX_FADV_DONTNEED); NativeIO.POSIX.POSIX_FADV_DONTNEED);
} catch (Throwable t) { } catch (Throwable t) {