HDFS-5940. Minor cleanups to ShortCircuitReplica, FsDatasetCache, and DomainSocketWatcher (cmccabe)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1567835 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
dab635980d
commit
f0d64a078d
@ -37,6 +37,7 @@ import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.util.NativeCodeLoader;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.util.concurrent.Uninterruptibles;
|
||||
|
||||
@ -48,7 +49,7 @@ import com.google.common.util.concurrent.Uninterruptibles;
|
||||
* See {@link DomainSocket} for more information about UNIX domain sockets.
|
||||
*/
|
||||
@InterfaceAudience.LimitedPrivate("HDFS")
|
||||
public final class DomainSocketWatcher extends Thread implements Closeable {
|
||||
public final class DomainSocketWatcher implements Closeable {
|
||||
static {
|
||||
if (SystemUtils.IS_OS_WINDOWS) {
|
||||
loadingFailureReason = "UNIX Domain sockets are not available on Windows.";
|
||||
@ -281,7 +282,7 @@ public final class DomainSocketWatcher extends Thread implements Closeable {
|
||||
try {
|
||||
processedCond.await();
|
||||
} catch (InterruptedException e) {
|
||||
this.interrupt();
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
if (!toAdd.contains(entry)) {
|
||||
break;
|
||||
@ -308,7 +309,7 @@ public final class DomainSocketWatcher extends Thread implements Closeable {
|
||||
try {
|
||||
processedCond.await();
|
||||
} catch (InterruptedException e) {
|
||||
this.interrupt();
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
if (!toRemove.containsKey(sock.fd)) {
|
||||
break;
|
||||
@ -381,7 +382,8 @@ public final class DomainSocketWatcher extends Thread implements Closeable {
|
||||
}
|
||||
}
|
||||
|
||||
private final Thread watcherThread = new Thread(new Runnable() {
|
||||
@VisibleForTesting
|
||||
final Thread watcherThread = new Thread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
LOG.info(this + ": starting with interruptCheckPeriodMs = " +
|
||||
@ -443,6 +445,7 @@ public final class DomainSocketWatcher extends Thread implements Closeable {
|
||||
} catch (IOException e) {
|
||||
LOG.error(toString() + " terminating on IOException", e);
|
||||
} finally {
|
||||
kick(); // allow the handler for notificationSockets[0] to read a byte
|
||||
for (Entry entry : entries.values()) {
|
||||
sendCallback("close", entries, fdSet, entry.getDomainSocket().fd);
|
||||
}
|
||||
|
@ -73,9 +73,10 @@ public class TestDomainSocketWatcher {
|
||||
*/
|
||||
@Test(timeout=60000)
|
||||
public void testInterruption() throws Exception {
|
||||
DomainSocketWatcher watcher = new DomainSocketWatcher(10);
|
||||
watcher.interrupt();
|
||||
Uninterruptibles.joinUninterruptibly(watcher);
|
||||
final DomainSocketWatcher watcher = new DomainSocketWatcher(10);
|
||||
watcher.watcherThread.interrupt();
|
||||
Uninterruptibles.joinUninterruptibly(watcher.watcherThread);
|
||||
watcher.close();
|
||||
}
|
||||
|
||||
@Test(timeout=300000)
|
||||
|
@ -373,6 +373,9 @@ Release 2.4.0 - UNRELEASED
|
||||
HDFS-5810. Unify mmap cache and short-circuit file descriptor cache
|
||||
(cmccabe)
|
||||
|
||||
HDFS-5940. Minor cleanups to ShortCircuitReplica, FsDatasetCache, and
|
||||
DomainSocketWatcher (cmccabe)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HDFS-5790. LeaseManager.findPath is very slow when many leases need recovery
|
||||
|
@ -31,7 +31,6 @@ import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.client.ShortCircuitCache;
|
||||
import org.apache.hadoop.hdfs.client.ShortCircuitCache.ShortCircuitReplicaCreator;
|
||||
import org.apache.hadoop.hdfs.client.ShortCircuitReplica;
|
||||
import org.apache.hadoop.hdfs.client.ShortCircuitReplica.Key;
|
||||
import org.apache.hadoop.hdfs.client.ShortCircuitReplicaInfo;
|
||||
import org.apache.hadoop.hdfs.net.DomainPeer;
|
||||
import org.apache.hadoop.hdfs.net.Peer;
|
||||
@ -389,7 +388,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
|
||||
return null;
|
||||
}
|
||||
ShortCircuitCache cache = clientContext.getShortCircuitCache();
|
||||
Key key = new Key(block.getBlockId(), block.getBlockPoolId());
|
||||
ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId());
|
||||
ShortCircuitReplicaInfo info = cache.fetchOrCreate(key, this);
|
||||
InvalidToken exc = info.getInvalidTokenException();
|
||||
if (exc != null) {
|
||||
@ -492,7 +491,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
|
||||
sock.recvFileInputStreams(fis, buf, 0, buf.length);
|
||||
ShortCircuitReplica replica = null;
|
||||
try {
|
||||
Key key = new Key(block.getBlockId(), block.getBlockPoolId());
|
||||
ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId());
|
||||
replica = new ShortCircuitReplica(key, fis[0], fis[1],
|
||||
clientContext.getShortCircuitCache(), Time.monotonicNow());
|
||||
} catch (IOException e) {
|
||||
|
@ -0,0 +1,75 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs;
|
||||
|
||||
import org.apache.commons.lang.builder.EqualsBuilder;
|
||||
import org.apache.commons.lang.builder.HashCodeBuilder;
|
||||
|
||||
/**
|
||||
* An immutable key which identifies a block.
|
||||
*/
|
||||
final public class ExtendedBlockId {
|
||||
/**
|
||||
* The block ID for this block.
|
||||
*/
|
||||
private final long blockId;
|
||||
|
||||
/**
|
||||
* The block pool ID for this block.
|
||||
*/
|
||||
private final String bpId;
|
||||
|
||||
public ExtendedBlockId(long blockId, String bpId) {
|
||||
this.blockId = blockId;
|
||||
this.bpId = bpId;
|
||||
}
|
||||
|
||||
public long getBlockId() {
|
||||
return this.blockId;
|
||||
}
|
||||
|
||||
public String getBlockPoolId() {
|
||||
return this.bpId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if ((o == null) || (o.getClass() != this.getClass())) {
|
||||
return false;
|
||||
}
|
||||
ExtendedBlockId other = (ExtendedBlockId)o;
|
||||
return new EqualsBuilder().
|
||||
append(blockId, other.blockId).
|
||||
append(bpId, other.bpId).
|
||||
isEquals();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return new HashCodeBuilder().
|
||||
append(this.blockId).
|
||||
append(this.bpId).
|
||||
toHashCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return new StringBuilder().append(blockId).
|
||||
append("_").append(bpId).toString();
|
||||
}
|
||||
}
|
@ -36,9 +36,9 @@ import java.util.concurrent.locks.ReentrantLock;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.ExtendedBlockId;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.client.ShortCircuitReplica;
|
||||
import org.apache.hadoop.hdfs.client.ShortCircuitReplica.Key;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.ipc.RetriableException;
|
||||
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
||||
@ -183,8 +183,9 @@ public class ShortCircuitCache implements Closeable {
|
||||
* ShortCircuitReplicaInfo objects may contain a replica, or an InvalidToken
|
||||
* exception.
|
||||
*/
|
||||
private final HashMap<Key, Waitable<ShortCircuitReplicaInfo>>
|
||||
replicaInfoMap = new HashMap<Key, Waitable<ShortCircuitReplicaInfo>>();
|
||||
private final HashMap<ExtendedBlockId, Waitable<ShortCircuitReplicaInfo>>
|
||||
replicaInfoMap = new HashMap<ExtendedBlockId,
|
||||
Waitable<ShortCircuitReplicaInfo>>();
|
||||
|
||||
/**
|
||||
* The CacheCleaner. We don't create this and schedule it until it becomes
|
||||
@ -566,7 +567,7 @@ public class ShortCircuitCache implements Closeable {
|
||||
* @return Null if no replica could be found or created.
|
||||
* The replica, otherwise.
|
||||
*/
|
||||
public ShortCircuitReplicaInfo fetchOrCreate(Key key,
|
||||
public ShortCircuitReplicaInfo fetchOrCreate(ExtendedBlockId key,
|
||||
ShortCircuitReplicaCreator creator) {
|
||||
Waitable<ShortCircuitReplicaInfo> newWaitable = null;
|
||||
lock.lock();
|
||||
@ -612,7 +613,7 @@ public class ShortCircuitCache implements Closeable {
|
||||
*
|
||||
* @throws RetriableException If the caller needs to retry.
|
||||
*/
|
||||
private ShortCircuitReplicaInfo fetch(Key key,
|
||||
private ShortCircuitReplicaInfo fetch(ExtendedBlockId key,
|
||||
Waitable<ShortCircuitReplicaInfo> waitable) throws RetriableException {
|
||||
// Another thread is already in the process of loading this
|
||||
// ShortCircuitReplica. So we simply wait for it to complete.
|
||||
@ -656,7 +657,7 @@ public class ShortCircuitCache implements Closeable {
|
||||
return info;
|
||||
}
|
||||
|
||||
private ShortCircuitReplicaInfo create(Key key,
|
||||
private ShortCircuitReplicaInfo create(ExtendedBlockId key,
|
||||
ShortCircuitReplicaCreator creator,
|
||||
Waitable<ShortCircuitReplicaInfo> newWaitable) {
|
||||
// Handle loading a new replica.
|
||||
@ -805,8 +806,8 @@ public class ShortCircuitCache implements Closeable {
|
||||
@VisibleForTesting // ONLY for testing
|
||||
public interface CacheVisitor {
|
||||
void visit(int numOutstandingMmaps,
|
||||
Map<Key, ShortCircuitReplica> replicas,
|
||||
Map<Key, InvalidToken> failedLoads,
|
||||
Map<ExtendedBlockId, ShortCircuitReplica> replicas,
|
||||
Map<ExtendedBlockId, InvalidToken> failedLoads,
|
||||
Map<Long, ShortCircuitReplica> evictable,
|
||||
Map<Long, ShortCircuitReplica> evictableMmapped);
|
||||
}
|
||||
@ -815,11 +816,11 @@ public class ShortCircuitCache implements Closeable {
|
||||
public void accept(CacheVisitor visitor) {
|
||||
lock.lock();
|
||||
try {
|
||||
Map<Key, ShortCircuitReplica> replicas =
|
||||
new HashMap<Key, ShortCircuitReplica>();
|
||||
Map<Key, InvalidToken> failedLoads =
|
||||
new HashMap<Key, InvalidToken>();
|
||||
for (Entry<Key, Waitable<ShortCircuitReplicaInfo>> entry :
|
||||
Map<ExtendedBlockId, ShortCircuitReplica> replicas =
|
||||
new HashMap<ExtendedBlockId, ShortCircuitReplica>();
|
||||
Map<ExtendedBlockId, InvalidToken> failedLoads =
|
||||
new HashMap<ExtendedBlockId, InvalidToken>();
|
||||
for (Entry<ExtendedBlockId, Waitable<ShortCircuitReplicaInfo>> entry :
|
||||
replicaInfoMap.entrySet()) {
|
||||
Waitable<ShortCircuitReplicaInfo> waitable = entry.getValue();
|
||||
if (waitable.hasVal()) {
|
||||
@ -839,13 +840,13 @@ public class ShortCircuitCache implements Closeable {
|
||||
append("with outstandingMmapCount=").append(outstandingMmapCount).
|
||||
append(", replicas=");
|
||||
String prefix = "";
|
||||
for (Entry<Key, ShortCircuitReplica> entry : replicas.entrySet()) {
|
||||
for (Entry<ExtendedBlockId, ShortCircuitReplica> entry : replicas.entrySet()) {
|
||||
builder.append(prefix).append(entry.getValue());
|
||||
prefix = ",";
|
||||
}
|
||||
prefix = "";
|
||||
builder.append(", failedLoads=");
|
||||
for (Entry<Key, InvalidToken> entry : failedLoads.entrySet()) {
|
||||
for (Entry<ExtendedBlockId, InvalidToken> entry : failedLoads.entrySet()) {
|
||||
builder.append(prefix).append(entry.getValue());
|
||||
prefix = ",";
|
||||
}
|
||||
|
@ -25,10 +25,9 @@ import java.nio.MappedByteBuffer;
|
||||
import java.nio.channels.FileChannel;
|
||||
import java.nio.channels.FileChannel.MapMode;
|
||||
|
||||
import org.apache.commons.lang.builder.EqualsBuilder;
|
||||
import org.apache.commons.lang.builder.HashCodeBuilder;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hdfs.ExtendedBlockId;
|
||||
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.io.nativeio.NativeIO;
|
||||
@ -49,65 +48,10 @@ import com.google.common.base.Preconditions;
|
||||
public class ShortCircuitReplica {
|
||||
public static final Log LOG = LogFactory.getLog(ShortCircuitCache.class);
|
||||
|
||||
/**
|
||||
* Immutable class which identifies a ShortCircuitReplica object.
|
||||
*/
|
||||
public static final class Key {
|
||||
public Key(long blockId, String bpId) {
|
||||
this.blockId = blockId;
|
||||
this.bpId = bpId;
|
||||
}
|
||||
|
||||
public long getBlockId() {
|
||||
return this.blockId;
|
||||
}
|
||||
|
||||
public String getBlockPoolId() {
|
||||
return this.bpId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if ((o == null) || (o.getClass() != this.getClass())) {
|
||||
return false;
|
||||
}
|
||||
Key other = (Key)o;
|
||||
return new EqualsBuilder().
|
||||
append(blockId, other.blockId).
|
||||
append(bpId, other.bpId).
|
||||
isEquals();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return new HashCodeBuilder().
|
||||
append(this.blockId).
|
||||
append(this.bpId).
|
||||
toHashCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return new StringBuilder().append(blockId).
|
||||
append("_").append(bpId).toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* The block ID for this BlockDescriptors object.
|
||||
*/
|
||||
private final long blockId;
|
||||
|
||||
/**
|
||||
* The block pool ID for this BlockDescriptors object.
|
||||
*/
|
||||
private final String bpId;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Identifies this ShortCircuitReplica object.
|
||||
*/
|
||||
final Key key;
|
||||
final ExtendedBlockId key;
|
||||
|
||||
/**
|
||||
* The block data input stream.
|
||||
@ -168,7 +112,7 @@ public class ShortCircuitReplica {
|
||||
*/
|
||||
private Long evictableTimeNs = null;
|
||||
|
||||
public ShortCircuitReplica(Key key,
|
||||
public ShortCircuitReplica(ExtendedBlockId key,
|
||||
FileInputStream dataStream, FileInputStream metaStream,
|
||||
ShortCircuitCache cache, long creationTimeMs) throws IOException {
|
||||
this.key = key;
|
||||
@ -262,7 +206,7 @@ public class ShortCircuitReplica {
|
||||
return metaHeader;
|
||||
}
|
||||
|
||||
public Key getKey() {
|
||||
public ExtendedBlockId getKey() {
|
||||
return key;
|
||||
}
|
||||
|
||||
|
@ -37,12 +37,12 @@ import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.lang.builder.HashCodeBuilder;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.fs.ChecksumException;
|
||||
import org.apache.hadoop.hdfs.ExtendedBlockId;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
@ -56,43 +56,6 @@ import org.apache.hadoop.io.nativeio.NativeIO;
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Unstable
|
||||
public class FsDatasetCache {
|
||||
/**
|
||||
* Keys which identify MappableBlocks.
|
||||
*/
|
||||
private static final class Key {
|
||||
/**
|
||||
* Block id.
|
||||
*/
|
||||
final long id;
|
||||
|
||||
/**
|
||||
* Block pool id.
|
||||
*/
|
||||
final String bpid;
|
||||
|
||||
Key(long id, String bpid) {
|
||||
this.id = id;
|
||||
this.bpid = bpid;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (o == null) {
|
||||
return false;
|
||||
}
|
||||
if (!(o.getClass() == getClass())) {
|
||||
return false;
|
||||
}
|
||||
Key other = (Key)o;
|
||||
return ((other.id == this.id) && (other.bpid.equals(this.bpid)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return new HashCodeBuilder().append(id).append(bpid).hashCode();
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* MappableBlocks that we know about.
|
||||
*/
|
||||
@ -143,7 +106,8 @@ public class FsDatasetCache {
|
||||
/**
|
||||
* Stores MappableBlock objects and the states they're in.
|
||||
*/
|
||||
private final HashMap<Key, Value> mappableBlockMap = new HashMap<Key, Value>();
|
||||
private final HashMap<ExtendedBlockId, Value> mappableBlockMap =
|
||||
new HashMap<ExtendedBlockId, Value>();
|
||||
|
||||
private final AtomicLong numBlocksCached = new AtomicLong(0);
|
||||
|
||||
@ -260,12 +224,12 @@ public class FsDatasetCache {
|
||||
*/
|
||||
synchronized List<Long> getCachedBlocks(String bpid) {
|
||||
List<Long> blocks = new ArrayList<Long>();
|
||||
for (Iterator<Entry<Key, Value>> iter =
|
||||
for (Iterator<Entry<ExtendedBlockId, Value>> iter =
|
||||
mappableBlockMap.entrySet().iterator(); iter.hasNext(); ) {
|
||||
Entry<Key, Value> entry = iter.next();
|
||||
if (entry.getKey().bpid.equals(bpid)) {
|
||||
Entry<ExtendedBlockId, Value> entry = iter.next();
|
||||
if (entry.getKey().getBlockPoolId().equals(bpid)) {
|
||||
if (entry.getValue().state.shouldAdvertise()) {
|
||||
blocks.add(entry.getKey().id);
|
||||
blocks.add(entry.getKey().getBlockId());
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -278,7 +242,7 @@ public class FsDatasetCache {
|
||||
synchronized void cacheBlock(long blockId, String bpid,
|
||||
String blockFileName, long length, long genstamp,
|
||||
Executor volumeExecutor) {
|
||||
Key key = new Key(blockId, bpid);
|
||||
ExtendedBlockId key = new ExtendedBlockId(blockId, bpid);
|
||||
Value prevValue = mappableBlockMap.get(key);
|
||||
if (prevValue != null) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
@ -299,7 +263,7 @@ public class FsDatasetCache {
|
||||
}
|
||||
|
||||
synchronized void uncacheBlock(String bpid, long blockId) {
|
||||
Key key = new Key(blockId, bpid);
|
||||
ExtendedBlockId key = new ExtendedBlockId(blockId, bpid);
|
||||
Value prevValue = mappableBlockMap.get(key);
|
||||
|
||||
if (prevValue == null) {
|
||||
@ -344,12 +308,12 @@ public class FsDatasetCache {
|
||||
* Background worker that mmaps, mlocks, and checksums a block
|
||||
*/
|
||||
private class CachingTask implements Runnable {
|
||||
private final Key key;
|
||||
private final ExtendedBlockId key;
|
||||
private final String blockFileName;
|
||||
private final long length;
|
||||
private final long genstamp;
|
||||
|
||||
CachingTask(Key key, String blockFileName, long length, long genstamp) {
|
||||
CachingTask(ExtendedBlockId key, String blockFileName, long length, long genstamp) {
|
||||
this.key = key;
|
||||
this.blockFileName = blockFileName;
|
||||
this.length = length;
|
||||
@ -361,13 +325,13 @@ public class FsDatasetCache {
|
||||
boolean success = false;
|
||||
FileInputStream blockIn = null, metaIn = null;
|
||||
MappableBlock mappableBlock = null;
|
||||
ExtendedBlock extBlk =
|
||||
new ExtendedBlock(key.bpid, key.id, length, genstamp);
|
||||
ExtendedBlock extBlk = new ExtendedBlock(key.getBlockPoolId(),
|
||||
key.getBlockId(), length, genstamp);
|
||||
long newUsedBytes = usedBytesCount.reserve(length);
|
||||
if (newUsedBytes < 0) {
|
||||
LOG.warn("Failed to cache block id " + key.id + ", pool " + key.bpid +
|
||||
": could not reserve " + length + " more bytes in the " +
|
||||
"cache: " + DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY +
|
||||
LOG.warn("Failed to cache " + key + ": could not reserve " + length +
|
||||
" more bytes in the cache: " +
|
||||
DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY +
|
||||
" of " + maxBytes + " exceeded.");
|
||||
numBlocksFailedToCache.incrementAndGet();
|
||||
return;
|
||||
@ -378,16 +342,15 @@ public class FsDatasetCache {
|
||||
metaIn = (FileInputStream)dataset.getMetaDataInputStream(extBlk)
|
||||
.getWrappedStream();
|
||||
} catch (ClassCastException e) {
|
||||
LOG.warn("Failed to cache block with id " + key.id + ", pool " +
|
||||
key.bpid + ": Underlying blocks are not backed by files.", e);
|
||||
LOG.warn("Failed to cache " + key +
|
||||
": Underlying blocks are not backed by files.", e);
|
||||
return;
|
||||
} catch (FileNotFoundException e) {
|
||||
LOG.info("Failed to cache block with id " + key.id + ", pool " +
|
||||
key.bpid + ": failed to find backing files.");
|
||||
LOG.info("Failed to cache " + key + ": failed to find backing " +
|
||||
"files.");
|
||||
return;
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Failed to cache block with id " + key.id + ", pool " +
|
||||
key.bpid + ": failed to open file", e);
|
||||
LOG.warn("Failed to cache " + key + ": failed to open file", e);
|
||||
return;
|
||||
}
|
||||
try {
|
||||
@ -395,11 +358,10 @@ public class FsDatasetCache {
|
||||
load(length, blockIn, metaIn, blockFileName);
|
||||
} catch (ChecksumException e) {
|
||||
// Exception message is bogus since this wasn't caused by a file read
|
||||
LOG.warn("Failed to cache block " + key.id + " in " + key.bpid + ": " +
|
||||
"checksum verification failed.");
|
||||
LOG.warn("Failed to cache " + key + ": checksum verification failed.");
|
||||
return;
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Failed to cache block " + key.id + " in " + key.bpid, e);
|
||||
LOG.warn("Failed to cache " + key, e);
|
||||
return;
|
||||
}
|
||||
synchronized (FsDatasetCache.this) {
|
||||
@ -409,15 +371,14 @@ public class FsDatasetCache {
|
||||
value.state == State.CACHING_CANCELLED);
|
||||
if (value.state == State.CACHING_CANCELLED) {
|
||||
mappableBlockMap.remove(key);
|
||||
LOG.warn("Caching of block " + key.id + " in " + key.bpid +
|
||||
" was cancelled.");
|
||||
LOG.warn("Caching of " + key + " was cancelled.");
|
||||
return;
|
||||
}
|
||||
mappableBlockMap.put(key, new Value(mappableBlock, State.CACHED));
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Successfully cached block " + key.id + " in " + key.bpid +
|
||||
". We are now caching " + newUsedBytes + " bytes in total.");
|
||||
LOG.debug("Successfully cached " + key + ". We are now caching " +
|
||||
newUsedBytes + " bytes in total.");
|
||||
}
|
||||
numBlocksCached.addAndGet(1);
|
||||
success = true;
|
||||
@ -425,9 +386,8 @@ public class FsDatasetCache {
|
||||
if (!success) {
|
||||
newUsedBytes = usedBytesCount.release(length);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Caching of block " + key.id + " in " +
|
||||
key.bpid + " was aborted. We are now caching only " +
|
||||
newUsedBytes + " + bytes in total.");
|
||||
LOG.debug("Caching of " + key + " was aborted. We are now " +
|
||||
"caching only " + newUsedBytes + " + bytes in total.");
|
||||
}
|
||||
IOUtils.closeQuietly(blockIn);
|
||||
IOUtils.closeQuietly(metaIn);
|
||||
@ -445,9 +405,9 @@ public class FsDatasetCache {
|
||||
}
|
||||
|
||||
private class UncachingTask implements Runnable {
|
||||
private final Key key;
|
||||
private final ExtendedBlockId key;
|
||||
|
||||
UncachingTask(Key key) {
|
||||
UncachingTask(ExtendedBlockId key) {
|
||||
this.key = key;
|
||||
}
|
||||
|
||||
@ -470,8 +430,8 @@ public class FsDatasetCache {
|
||||
usedBytesCount.release(value.mappableBlock.getLength());
|
||||
numBlocksCached.addAndGet(-1);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Uncaching of block " + key.id + " in " + key.bpid +
|
||||
" completed. usedBytes = " + newUsedBytes);
|
||||
LOG.debug("Uncaching of " + key + " completed. " +
|
||||
"usedBytes = " + newUsedBytes);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -34,7 +34,7 @@ import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.BlockReaderTestUtil;
|
||||
import org.apache.hadoop.hdfs.ExtendedBlockId;
|
||||
import org.apache.hadoop.hdfs.ClientContext;
|
||||
import org.apache.hadoop.hdfs.DFSClient;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
@ -46,7 +46,6 @@ import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
|
||||
import org.apache.hadoop.hdfs.client.ShortCircuitCache;
|
||||
import org.apache.hadoop.hdfs.client.ShortCircuitCache.CacheVisitor;
|
||||
import org.apache.hadoop.hdfs.client.ShortCircuitReplica;
|
||||
import org.apache.hadoop.hdfs.client.ShortCircuitReplica.Key;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.io.ByteBufferPool;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
@ -275,8 +274,8 @@ public class TestEnhancedByteBufferAccess {
|
||||
|
||||
@Override
|
||||
public void visit(int numOutstandingMmaps,
|
||||
Map<Key, ShortCircuitReplica> replicas,
|
||||
Map<Key, InvalidToken> failedLoads,
|
||||
Map<ExtendedBlockId, ShortCircuitReplica> replicas,
|
||||
Map<ExtendedBlockId, InvalidToken> failedLoads,
|
||||
Map<Long, ShortCircuitReplica> evictable,
|
||||
Map<Long, ShortCircuitReplica> evictableMmapped) {
|
||||
if (expectedNumOutstandingMmaps >= 0) {
|
||||
@ -341,12 +340,12 @@ public class TestEnhancedByteBufferAccess {
|
||||
cache.accept(new CacheVisitor() {
|
||||
@Override
|
||||
public void visit(int numOutstandingMmaps,
|
||||
Map<Key, ShortCircuitReplica> replicas,
|
||||
Map<Key, InvalidToken> failedLoads,
|
||||
Map<ExtendedBlockId, ShortCircuitReplica> replicas,
|
||||
Map<ExtendedBlockId, InvalidToken> failedLoads,
|
||||
Map<Long, ShortCircuitReplica> evictable,
|
||||
Map<Long, ShortCircuitReplica> evictableMmapped) {
|
||||
ShortCircuitReplica replica = replicas.get(
|
||||
new Key(firstBlock.getBlockId(), firstBlock.getBlockPoolId()));
|
||||
new ExtendedBlockId(firstBlock.getBlockId(), firstBlock.getBlockPoolId()));
|
||||
Assert.assertNotNull(replica);
|
||||
Assert.assertTrue(replica.hasMmap());
|
||||
// The replica should not yet be evictable, since we have it open.
|
||||
@ -378,8 +377,8 @@ public class TestEnhancedByteBufferAccess {
|
||||
cache.accept(new CacheVisitor() {
|
||||
@Override
|
||||
public void visit(int numOutstandingMmaps,
|
||||
Map<Key, ShortCircuitReplica> replicas,
|
||||
Map<Key, InvalidToken> failedLoads,
|
||||
Map<ExtendedBlockId, ShortCircuitReplica> replicas,
|
||||
Map<ExtendedBlockId, InvalidToken> failedLoads,
|
||||
Map<Long, ShortCircuitReplica> evictable,
|
||||
Map<Long, ShortCircuitReplica> evictableMmapped) {
|
||||
finished.setValue(evictableMmapped.isEmpty());
|
||||
|
@ -32,14 +32,12 @@ import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
|
||||
import org.apache.hadoop.hdfs.client.ShortCircuitCache;
|
||||
import org.apache.hadoop.hdfs.client.ShortCircuitReplica;
|
||||
import org.apache.hadoop.hdfs.client.ShortCircuitReplica.Key;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.net.unix.DomainSocket;
|
||||
import org.apache.hadoop.net.unix.TemporarySocketDirectory;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
@ -170,7 +168,7 @@ public class TestBlockReaderLocal {
|
||||
};
|
||||
dataIn = streams[0];
|
||||
metaIn = streams[1];
|
||||
Key key = new Key(block.getBlockId(), block.getBlockPoolId());
|
||||
ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId());
|
||||
ShortCircuitReplica replica = new ShortCircuitReplica(
|
||||
key, dataIn, metaIn, shortCircuitCache, Time.now());
|
||||
blockReaderLocal = new BlockReaderLocal.Builder(
|
||||
|
@ -20,17 +20,13 @@ package org.apache.hadoop.hdfs;
|
||||
import org.apache.commons.lang.mutable.MutableBoolean;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hdfs.client.ClientMmap;
|
||||
import org.apache.hadoop.hdfs.client.ShortCircuitCache;
|
||||
import org.apache.hadoop.hdfs.client.ShortCircuitCache.CacheVisitor;
|
||||
import org.apache.hadoop.hdfs.client.ShortCircuitCache.ShortCircuitReplicaCreator;
|
||||
import org.apache.hadoop.hdfs.client.ShortCircuitReplica;
|
||||
import org.apache.hadoop.hdfs.client.ShortCircuitReplica.Key;
|
||||
import org.apache.hadoop.hdfs.client.ShortCircuitReplicaInfo;
|
||||
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.net.unix.TemporarySocketDirectory;
|
||||
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.util.DataChecksum;
|
||||
import org.apache.hadoop.util.Time;
|
||||
@ -44,7 +40,6 @@ import java.io.DataOutputStream;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
|
||||
public class TestShortCircuitCache {
|
||||
static final Log LOG = LogFactory.getLog(TestShortCircuitCache.class);
|
||||
@ -105,7 +100,7 @@ public class TestShortCircuitCache {
|
||||
@Override
|
||||
public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
|
||||
try {
|
||||
Key key = new Key(blockId, "test_bp1");
|
||||
ExtendedBlockId key = new ExtendedBlockId(blockId, "test_bp1");
|
||||
return new ShortCircuitReplicaInfo(
|
||||
new ShortCircuitReplica(key,
|
||||
pair.getFileInputStreams()[0], pair.getFileInputStreams()[1],
|
||||
@ -129,14 +124,14 @@ public class TestShortCircuitCache {
|
||||
new ShortCircuitCache(10, 10000000, 10, 10000000, 1, 10000);
|
||||
final TestFileDescriptorPair pair = new TestFileDescriptorPair();
|
||||
ShortCircuitReplicaInfo replicaInfo1 =
|
||||
cache.fetchOrCreate(new Key(123, "test_bp1"),
|
||||
cache.fetchOrCreate(new ExtendedBlockId(123, "test_bp1"),
|
||||
new SimpleReplicaCreator(123, cache, pair));
|
||||
Preconditions.checkNotNull(replicaInfo1.getReplica());
|
||||
Preconditions.checkState(replicaInfo1.getInvalidTokenException() == null);
|
||||
pair.compareWith(replicaInfo1.getReplica().getDataStream(),
|
||||
replicaInfo1.getReplica().getMetaStream());
|
||||
ShortCircuitReplicaInfo replicaInfo2 =
|
||||
cache.fetchOrCreate(new Key(123, "test_bp1"),
|
||||
cache.fetchOrCreate(new ExtendedBlockId(123, "test_bp1"),
|
||||
new ShortCircuitReplicaCreator() {
|
||||
@Override
|
||||
public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
|
||||
@ -157,7 +152,7 @@ public class TestShortCircuitCache {
|
||||
// really long here)
|
||||
ShortCircuitReplicaInfo replicaInfo3 =
|
||||
cache.fetchOrCreate(
|
||||
new Key(123, "test_bp1"), new ShortCircuitReplicaCreator() {
|
||||
new ExtendedBlockId(123, "test_bp1"), new ShortCircuitReplicaCreator() {
|
||||
@Override
|
||||
public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
|
||||
Assert.fail("expected to use existing entry.");
|
||||
@ -179,7 +174,7 @@ public class TestShortCircuitCache {
|
||||
final TestFileDescriptorPair pair = new TestFileDescriptorPair();
|
||||
ShortCircuitReplicaInfo replicaInfo1 =
|
||||
cache.fetchOrCreate(
|
||||
new Key(123, "test_bp1"), new SimpleReplicaCreator(123, cache, pair));
|
||||
new ExtendedBlockId(123, "test_bp1"), new SimpleReplicaCreator(123, cache, pair));
|
||||
Preconditions.checkNotNull(replicaInfo1.getReplica());
|
||||
Preconditions.checkState(replicaInfo1.getInvalidTokenException() == null);
|
||||
pair.compareWith(replicaInfo1.getReplica().getDataStream(),
|
||||
@ -190,7 +185,7 @@ public class TestShortCircuitCache {
|
||||
Thread.sleep(10);
|
||||
ShortCircuitReplicaInfo replicaInfo2 =
|
||||
cache.fetchOrCreate(
|
||||
new Key(123, "test_bp1"), new ShortCircuitReplicaCreator() {
|
||||
new ExtendedBlockId(123, "test_bp1"), new ShortCircuitReplicaCreator() {
|
||||
@Override
|
||||
public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
|
||||
triedToCreate.setValue(true);
|
||||
@ -221,7 +216,7 @@ public class TestShortCircuitCache {
|
||||
};
|
||||
for (int i = 0; i < pairs.length; i++) {
|
||||
replicaInfos[i] = cache.fetchOrCreate(
|
||||
new Key(i, "test_bp1"),
|
||||
new ExtendedBlockId(i, "test_bp1"),
|
||||
new SimpleReplicaCreator(i, cache, pairs[i]));
|
||||
Preconditions.checkNotNull(replicaInfos[i].getReplica());
|
||||
Preconditions.checkState(replicaInfos[i].getInvalidTokenException() == null);
|
||||
@ -237,7 +232,7 @@ public class TestShortCircuitCache {
|
||||
for (int i = 1; i < pairs.length; i++) {
|
||||
final Integer iVal = new Integer(i);
|
||||
replicaInfos[i] = cache.fetchOrCreate(
|
||||
new Key(i, "test_bp1"),
|
||||
new ExtendedBlockId(i, "test_bp1"),
|
||||
new ShortCircuitReplicaCreator() {
|
||||
@Override
|
||||
public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
|
||||
@ -253,7 +248,7 @@ public class TestShortCircuitCache {
|
||||
// The first (oldest) replica should not be cached.
|
||||
final MutableBoolean calledCreate = new MutableBoolean(false);
|
||||
replicaInfos[0] = cache.fetchOrCreate(
|
||||
new Key(0, "test_bp1"),
|
||||
new ExtendedBlockId(0, "test_bp1"),
|
||||
new ShortCircuitReplicaCreator() {
|
||||
@Override
|
||||
public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
|
||||
@ -289,7 +284,7 @@ public class TestShortCircuitCache {
|
||||
final long HOUR_IN_MS = 60 * 60 * 1000;
|
||||
for (int i = 0; i < pairs.length; i++) {
|
||||
final Integer iVal = new Integer(i);
|
||||
final Key key = new Key(i, "test_bp1");
|
||||
final ExtendedBlockId key = new ExtendedBlockId(i, "test_bp1");
|
||||
replicaInfos[i] = cache.fetchOrCreate(key,
|
||||
new ShortCircuitReplicaCreator() {
|
||||
@Override
|
||||
@ -316,7 +311,7 @@ public class TestShortCircuitCache {
|
||||
@Override
|
||||
public Boolean get() {
|
||||
ShortCircuitReplicaInfo info = cache.fetchOrCreate(
|
||||
new Key(0, "test_bp1"), new ShortCircuitReplicaCreator() {
|
||||
new ExtendedBlockId(0, "test_bp1"), new ShortCircuitReplicaCreator() {
|
||||
@Override
|
||||
public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
|
||||
return null;
|
||||
@ -332,7 +327,7 @@ public class TestShortCircuitCache {
|
||||
|
||||
// Make sure that second replica did not go stale.
|
||||
ShortCircuitReplicaInfo info = cache.fetchOrCreate(
|
||||
new Key(1, "test_bp1"), new ShortCircuitReplicaCreator() {
|
||||
new ExtendedBlockId(1, "test_bp1"), new ShortCircuitReplicaCreator() {
|
||||
@Override
|
||||
public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
|
||||
Assert.fail("second replica went stale, despite 1 " +
|
||||
|
Loading…
x
Reference in New Issue
Block a user