diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocketWatcher.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocketWatcher.java index 673129dd98..797f7f2346 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocketWatcher.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocketWatcher.java @@ -37,6 +37,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.util.NativeCodeLoader; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.Uninterruptibles; @@ -48,7 +49,7 @@ import com.google.common.util.concurrent.Uninterruptibles; * See {@link DomainSocket} for more information about UNIX domain sockets. */ @InterfaceAudience.LimitedPrivate("HDFS") -public final class DomainSocketWatcher extends Thread implements Closeable { +public final class DomainSocketWatcher implements Closeable { static { if (SystemUtils.IS_OS_WINDOWS) { loadingFailureReason = "UNIX Domain sockets are not available on Windows."; @@ -281,7 +282,7 @@ public final class DomainSocketWatcher extends Thread implements Closeable { try { processedCond.await(); } catch (InterruptedException e) { - this.interrupt(); + Thread.currentThread().interrupt(); } if (!toAdd.contains(entry)) { break; @@ -308,7 +309,7 @@ public final class DomainSocketWatcher extends Thread implements Closeable { try { processedCond.await(); } catch (InterruptedException e) { - this.interrupt(); + Thread.currentThread().interrupt(); } if (!toRemove.containsKey(sock.fd)) { break; @@ -381,7 +382,8 @@ public final class DomainSocketWatcher extends Thread implements Closeable { } } - private final Thread watcherThread = new Thread(new Runnable() { + @VisibleForTesting + final Thread watcherThread = new Thread(new Runnable() { @Override public void run() { LOG.info(this + ": starting with interruptCheckPeriodMs = " + @@ -443,6 +445,7 @@ public final class DomainSocketWatcher extends Thread implements Closeable { } catch (IOException e) { LOG.error(toString() + " terminating on IOException", e); } finally { + kick(); // allow the handler for notificationSockets[0] to read a byte for (Entry entry : entries.values()) { sendCallback("close", entries, fdSet, entry.getDomainSocket().fd); } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TestDomainSocketWatcher.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TestDomainSocketWatcher.java index b9d76cb603..7c5b42de46 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TestDomainSocketWatcher.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TestDomainSocketWatcher.java @@ -73,9 +73,10 @@ public class TestDomainSocketWatcher { */ @Test(timeout=60000) public void testInterruption() throws Exception { - DomainSocketWatcher watcher = new DomainSocketWatcher(10); - watcher.interrupt(); - Uninterruptibles.joinUninterruptibly(watcher); + final DomainSocketWatcher watcher = new DomainSocketWatcher(10); + watcher.watcherThread.interrupt(); + Uninterruptibles.joinUninterruptibly(watcher.watcherThread); + watcher.close(); } @Test(timeout=300000) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index e9f915e04c..15026dd4ad 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -373,6 +373,9 @@ Release 2.4.0 - UNRELEASED HDFS-5810. Unify mmap cache and short-circuit file descriptor cache (cmccabe) + HDFS-5940. Minor cleanups to ShortCircuitReplica, FsDatasetCache, and + DomainSocketWatcher (cmccabe) + OPTIMIZATIONS HDFS-5790. LeaseManager.findPath is very slow when many leases need recovery diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java index f7eb3c7505..09462ef145 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java @@ -31,7 +31,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.client.ShortCircuitCache; import org.apache.hadoop.hdfs.client.ShortCircuitCache.ShortCircuitReplicaCreator; import org.apache.hadoop.hdfs.client.ShortCircuitReplica; -import org.apache.hadoop.hdfs.client.ShortCircuitReplica.Key; import org.apache.hadoop.hdfs.client.ShortCircuitReplicaInfo; import org.apache.hadoop.hdfs.net.DomainPeer; import org.apache.hadoop.hdfs.net.Peer; @@ -389,7 +388,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator { return null; } ShortCircuitCache cache = clientContext.getShortCircuitCache(); - Key key = new Key(block.getBlockId(), block.getBlockPoolId()); + ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId()); ShortCircuitReplicaInfo info = cache.fetchOrCreate(key, this); InvalidToken exc = info.getInvalidTokenException(); if (exc != null) { @@ -492,7 +491,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator { sock.recvFileInputStreams(fis, buf, 0, buf.length); ShortCircuitReplica replica = null; try { - Key key = new Key(block.getBlockId(), block.getBlockPoolId()); + ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId()); replica = new ShortCircuitReplica(key, fis[0], fis[1], clientContext.getShortCircuitCache(), Time.monotonicNow()); } catch (IOException e) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ExtendedBlockId.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ExtendedBlockId.java new file mode 100644 index 0000000000..716ee2c614 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ExtendedBlockId.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import org.apache.commons.lang.builder.EqualsBuilder; +import org.apache.commons.lang.builder.HashCodeBuilder; + +/** + * An immutable key which identifies a block. + */ +final public class ExtendedBlockId { + /** + * The block ID for this block. + */ + private final long blockId; + + /** + * The block pool ID for this block. + */ + private final String bpId; + + public ExtendedBlockId(long blockId, String bpId) { + this.blockId = blockId; + this.bpId = bpId; + } + + public long getBlockId() { + return this.blockId; + } + + public String getBlockPoolId() { + return this.bpId; + } + + @Override + public boolean equals(Object o) { + if ((o == null) || (o.getClass() != this.getClass())) { + return false; + } + ExtendedBlockId other = (ExtendedBlockId)o; + return new EqualsBuilder(). + append(blockId, other.blockId). + append(bpId, other.bpId). + isEquals(); + } + + @Override + public int hashCode() { + return new HashCodeBuilder(). + append(this.blockId). + append(this.bpId). + toHashCode(); + } + + @Override + public String toString() { + return new StringBuilder().append(blockId). + append("_").append(bpId).toString(); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitCache.java index 29bff7d797..3aef525b2d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitCache.java @@ -36,9 +36,9 @@ import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.ExtendedBlockId; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.client.ShortCircuitReplica; -import org.apache.hadoop.hdfs.client.ShortCircuitReplica.Key; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.RetriableException; import org.apache.hadoop.security.token.SecretManager.InvalidToken; @@ -183,8 +183,9 @@ public class ShortCircuitCache implements Closeable { * ShortCircuitReplicaInfo objects may contain a replica, or an InvalidToken * exception. */ - private final HashMap> - replicaInfoMap = new HashMap>(); + private final HashMap> + replicaInfoMap = new HashMap>(); /** * The CacheCleaner. We don't create this and schedule it until it becomes @@ -566,7 +567,7 @@ public class ShortCircuitCache implements Closeable { * @return Null if no replica could be found or created. * The replica, otherwise. */ - public ShortCircuitReplicaInfo fetchOrCreate(Key key, + public ShortCircuitReplicaInfo fetchOrCreate(ExtendedBlockId key, ShortCircuitReplicaCreator creator) { Waitable newWaitable = null; lock.lock(); @@ -612,7 +613,7 @@ public class ShortCircuitCache implements Closeable { * * @throws RetriableException If the caller needs to retry. */ - private ShortCircuitReplicaInfo fetch(Key key, + private ShortCircuitReplicaInfo fetch(ExtendedBlockId key, Waitable waitable) throws RetriableException { // Another thread is already in the process of loading this // ShortCircuitReplica. So we simply wait for it to complete. @@ -656,7 +657,7 @@ public class ShortCircuitCache implements Closeable { return info; } - private ShortCircuitReplicaInfo create(Key key, + private ShortCircuitReplicaInfo create(ExtendedBlockId key, ShortCircuitReplicaCreator creator, Waitable newWaitable) { // Handle loading a new replica. @@ -805,8 +806,8 @@ public class ShortCircuitCache implements Closeable { @VisibleForTesting // ONLY for testing public interface CacheVisitor { void visit(int numOutstandingMmaps, - Map replicas, - Map failedLoads, + Map replicas, + Map failedLoads, Map evictable, Map evictableMmapped); } @@ -815,11 +816,11 @@ public class ShortCircuitCache implements Closeable { public void accept(CacheVisitor visitor) { lock.lock(); try { - Map replicas = - new HashMap(); - Map failedLoads = - new HashMap(); - for (Entry> entry : + Map replicas = + new HashMap(); + Map failedLoads = + new HashMap(); + for (Entry> entry : replicaInfoMap.entrySet()) { Waitable waitable = entry.getValue(); if (waitable.hasVal()) { @@ -839,13 +840,13 @@ public class ShortCircuitCache implements Closeable { append("with outstandingMmapCount=").append(outstandingMmapCount). append(", replicas="); String prefix = ""; - for (Entry entry : replicas.entrySet()) { + for (Entry entry : replicas.entrySet()) { builder.append(prefix).append(entry.getValue()); prefix = ","; } prefix = ""; builder.append(", failedLoads="); - for (Entry entry : failedLoads.entrySet()) { + for (Entry entry : failedLoads.entrySet()) { builder.append(prefix).append(entry.getValue()); prefix = ","; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitReplica.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitReplica.java index 535c2df6a3..e6137c7b9c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitReplica.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitReplica.java @@ -25,10 +25,9 @@ import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; import java.nio.channels.FileChannel.MapMode; -import org.apache.commons.lang.builder.EqualsBuilder; -import org.apache.commons.lang.builder.HashCodeBuilder; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hdfs.ExtendedBlockId; import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.nativeio.NativeIO; @@ -49,65 +48,10 @@ import com.google.common.base.Preconditions; public class ShortCircuitReplica { public static final Log LOG = LogFactory.getLog(ShortCircuitCache.class); - /** - * Immutable class which identifies a ShortCircuitReplica object. - */ - public static final class Key { - public Key(long blockId, String bpId) { - this.blockId = blockId; - this.bpId = bpId; - } - - public long getBlockId() { - return this.blockId; - } - - public String getBlockPoolId() { - return this.bpId; - } - - @Override - public boolean equals(Object o) { - if ((o == null) || (o.getClass() != this.getClass())) { - return false; - } - Key other = (Key)o; - return new EqualsBuilder(). - append(blockId, other.blockId). - append(bpId, other.bpId). - isEquals(); - } - - @Override - public int hashCode() { - return new HashCodeBuilder(). - append(this.blockId). - append(this.bpId). - toHashCode(); - } - - @Override - public String toString() { - return new StringBuilder().append(blockId). - append("_").append(bpId).toString(); - } - - /** - * The block ID for this BlockDescriptors object. - */ - private final long blockId; - - /** - * The block pool ID for this BlockDescriptors object. - */ - private final String bpId; - } - - /** * Identifies this ShortCircuitReplica object. */ - final Key key; + final ExtendedBlockId key; /** * The block data input stream. @@ -168,7 +112,7 @@ public class ShortCircuitReplica { */ private Long evictableTimeNs = null; - public ShortCircuitReplica(Key key, + public ShortCircuitReplica(ExtendedBlockId key, FileInputStream dataStream, FileInputStream metaStream, ShortCircuitCache cache, long creationTimeMs) throws IOException { this.key = key; @@ -262,7 +206,7 @@ public class ShortCircuitReplica { return metaHeader; } - public Key getKey() { + public ExtendedBlockId getKey() { return key; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java index fc77b0570f..7384b1523c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java @@ -37,12 +37,12 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.io.IOUtils; -import org.apache.commons.lang.builder.HashCodeBuilder; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.ChecksumException; +import org.apache.hadoop.hdfs.ExtendedBlockId; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; @@ -56,43 +56,6 @@ import org.apache.hadoop.io.nativeio.NativeIO; @InterfaceAudience.Private @InterfaceStability.Unstable public class FsDatasetCache { - /** - * Keys which identify MappableBlocks. - */ - private static final class Key { - /** - * Block id. - */ - final long id; - - /** - * Block pool id. - */ - final String bpid; - - Key(long id, String bpid) { - this.id = id; - this.bpid = bpid; - } - - @Override - public boolean equals(Object o) { - if (o == null) { - return false; - } - if (!(o.getClass() == getClass())) { - return false; - } - Key other = (Key)o; - return ((other.id == this.id) && (other.bpid.equals(this.bpid))); - } - - @Override - public int hashCode() { - return new HashCodeBuilder().append(id).append(bpid).hashCode(); - } - }; - /** * MappableBlocks that we know about. */ @@ -143,7 +106,8 @@ public class FsDatasetCache { /** * Stores MappableBlock objects and the states they're in. */ - private final HashMap mappableBlockMap = new HashMap(); + private final HashMap mappableBlockMap = + new HashMap(); private final AtomicLong numBlocksCached = new AtomicLong(0); @@ -260,12 +224,12 @@ public class FsDatasetCache { */ synchronized List getCachedBlocks(String bpid) { List blocks = new ArrayList(); - for (Iterator> iter = + for (Iterator> iter = mappableBlockMap.entrySet().iterator(); iter.hasNext(); ) { - Entry entry = iter.next(); - if (entry.getKey().bpid.equals(bpid)) { + Entry entry = iter.next(); + if (entry.getKey().getBlockPoolId().equals(bpid)) { if (entry.getValue().state.shouldAdvertise()) { - blocks.add(entry.getKey().id); + blocks.add(entry.getKey().getBlockId()); } } } @@ -278,7 +242,7 @@ public class FsDatasetCache { synchronized void cacheBlock(long blockId, String bpid, String blockFileName, long length, long genstamp, Executor volumeExecutor) { - Key key = new Key(blockId, bpid); + ExtendedBlockId key = new ExtendedBlockId(blockId, bpid); Value prevValue = mappableBlockMap.get(key); if (prevValue != null) { if (LOG.isDebugEnabled()) { @@ -299,7 +263,7 @@ public class FsDatasetCache { } synchronized void uncacheBlock(String bpid, long blockId) { - Key key = new Key(blockId, bpid); + ExtendedBlockId key = new ExtendedBlockId(blockId, bpid); Value prevValue = mappableBlockMap.get(key); if (prevValue == null) { @@ -344,12 +308,12 @@ public class FsDatasetCache { * Background worker that mmaps, mlocks, and checksums a block */ private class CachingTask implements Runnable { - private final Key key; + private final ExtendedBlockId key; private final String blockFileName; private final long length; private final long genstamp; - CachingTask(Key key, String blockFileName, long length, long genstamp) { + CachingTask(ExtendedBlockId key, String blockFileName, long length, long genstamp) { this.key = key; this.blockFileName = blockFileName; this.length = length; @@ -361,13 +325,13 @@ public class FsDatasetCache { boolean success = false; FileInputStream blockIn = null, metaIn = null; MappableBlock mappableBlock = null; - ExtendedBlock extBlk = - new ExtendedBlock(key.bpid, key.id, length, genstamp); + ExtendedBlock extBlk = new ExtendedBlock(key.getBlockPoolId(), + key.getBlockId(), length, genstamp); long newUsedBytes = usedBytesCount.reserve(length); if (newUsedBytes < 0) { - LOG.warn("Failed to cache block id " + key.id + ", pool " + key.bpid + - ": could not reserve " + length + " more bytes in the " + - "cache: " + DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY + + LOG.warn("Failed to cache " + key + ": could not reserve " + length + + " more bytes in the cache: " + + DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY + " of " + maxBytes + " exceeded."); numBlocksFailedToCache.incrementAndGet(); return; @@ -378,16 +342,15 @@ public class FsDatasetCache { metaIn = (FileInputStream)dataset.getMetaDataInputStream(extBlk) .getWrappedStream(); } catch (ClassCastException e) { - LOG.warn("Failed to cache block with id " + key.id + ", pool " + - key.bpid + ": Underlying blocks are not backed by files.", e); + LOG.warn("Failed to cache " + key + + ": Underlying blocks are not backed by files.", e); return; } catch (FileNotFoundException e) { - LOG.info("Failed to cache block with id " + key.id + ", pool " + - key.bpid + ": failed to find backing files."); + LOG.info("Failed to cache " + key + ": failed to find backing " + + "files."); return; } catch (IOException e) { - LOG.warn("Failed to cache block with id " + key.id + ", pool " + - key.bpid + ": failed to open file", e); + LOG.warn("Failed to cache " + key + ": failed to open file", e); return; } try { @@ -395,11 +358,10 @@ public class FsDatasetCache { load(length, blockIn, metaIn, blockFileName); } catch (ChecksumException e) { // Exception message is bogus since this wasn't caused by a file read - LOG.warn("Failed to cache block " + key.id + " in " + key.bpid + ": " + - "checksum verification failed."); + LOG.warn("Failed to cache " + key + ": checksum verification failed."); return; } catch (IOException e) { - LOG.warn("Failed to cache block " + key.id + " in " + key.bpid, e); + LOG.warn("Failed to cache " + key, e); return; } synchronized (FsDatasetCache.this) { @@ -409,15 +371,14 @@ public class FsDatasetCache { value.state == State.CACHING_CANCELLED); if (value.state == State.CACHING_CANCELLED) { mappableBlockMap.remove(key); - LOG.warn("Caching of block " + key.id + " in " + key.bpid + - " was cancelled."); + LOG.warn("Caching of " + key + " was cancelled."); return; } mappableBlockMap.put(key, new Value(mappableBlock, State.CACHED)); } if (LOG.isDebugEnabled()) { - LOG.debug("Successfully cached block " + key.id + " in " + key.bpid + - ". We are now caching " + newUsedBytes + " bytes in total."); + LOG.debug("Successfully cached " + key + ". We are now caching " + + newUsedBytes + " bytes in total."); } numBlocksCached.addAndGet(1); success = true; @@ -425,9 +386,8 @@ public class FsDatasetCache { if (!success) { newUsedBytes = usedBytesCount.release(length); if (LOG.isDebugEnabled()) { - LOG.debug("Caching of block " + key.id + " in " + - key.bpid + " was aborted. We are now caching only " + - newUsedBytes + " + bytes in total."); + LOG.debug("Caching of " + key + " was aborted. We are now " + + "caching only " + newUsedBytes + " + bytes in total."); } IOUtils.closeQuietly(blockIn); IOUtils.closeQuietly(metaIn); @@ -445,9 +405,9 @@ public class FsDatasetCache { } private class UncachingTask implements Runnable { - private final Key key; + private final ExtendedBlockId key; - UncachingTask(Key key) { + UncachingTask(ExtendedBlockId key) { this.key = key; } @@ -470,8 +430,8 @@ public class FsDatasetCache { usedBytesCount.release(value.mappableBlock.getLength()); numBlocksCached.addAndGet(-1); if (LOG.isDebugEnabled()) { - LOG.debug("Uncaching of block " + key.id + " in " + key.bpid + - " completed. usedBytes = " + newUsedBytes); + LOG.debug("Uncaching of " + key + " completed. " + + "usedBytes = " + newUsedBytes); } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java index a95379a6a7..6f0fafa262 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java @@ -34,7 +34,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.BlockReaderTestUtil; +import org.apache.hadoop.hdfs.ExtendedBlockId; import org.apache.hadoop.hdfs.ClientContext; import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSConfigKeys; @@ -46,7 +46,6 @@ import org.apache.hadoop.hdfs.client.HdfsDataInputStream; import org.apache.hadoop.hdfs.client.ShortCircuitCache; import org.apache.hadoop.hdfs.client.ShortCircuitCache.CacheVisitor; import org.apache.hadoop.hdfs.client.ShortCircuitReplica; -import org.apache.hadoop.hdfs.client.ShortCircuitReplica.Key; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.io.ByteBufferPool; import org.apache.hadoop.io.IOUtils; @@ -275,8 +274,8 @@ public class TestEnhancedByteBufferAccess { @Override public void visit(int numOutstandingMmaps, - Map replicas, - Map failedLoads, + Map replicas, + Map failedLoads, Map evictable, Map evictableMmapped) { if (expectedNumOutstandingMmaps >= 0) { @@ -341,12 +340,12 @@ public class TestEnhancedByteBufferAccess { cache.accept(new CacheVisitor() { @Override public void visit(int numOutstandingMmaps, - Map replicas, - Map failedLoads, + Map replicas, + Map failedLoads, Map evictable, Map evictableMmapped) { ShortCircuitReplica replica = replicas.get( - new Key(firstBlock.getBlockId(), firstBlock.getBlockPoolId())); + new ExtendedBlockId(firstBlock.getBlockId(), firstBlock.getBlockPoolId())); Assert.assertNotNull(replica); Assert.assertTrue(replica.hasMmap()); // The replica should not yet be evictable, since we have it open. @@ -378,8 +377,8 @@ public class TestEnhancedByteBufferAccess { cache.accept(new CacheVisitor() { @Override public void visit(int numOutstandingMmaps, - Map replicas, - Map failedLoads, + Map replicas, + Map failedLoads, Map evictable, Map evictableMmapped) { finished.setValue(evictableMmapped.isEmpty()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java index ffea447e5e..03dced7bee 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java @@ -32,14 +32,12 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.client.HdfsDataInputStream; import org.apache.hadoop.hdfs.client.ShortCircuitCache; import org.apache.hadoop.hdfs.client.ShortCircuitReplica; -import org.apache.hadoop.hdfs.client.ShortCircuitReplica.Key; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.net.unix.DomainSocket; import org.apache.hadoop.net.unix.TemporarySocketDirectory; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Time; import org.junit.AfterClass; import org.junit.Assert; @@ -170,7 +168,7 @@ public class TestBlockReaderLocal { }; dataIn = streams[0]; metaIn = streams[1]; - Key key = new Key(block.getBlockId(), block.getBlockPoolId()); + ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId()); ShortCircuitReplica replica = new ShortCircuitReplica( key, dataIn, metaIn, shortCircuitCache, Time.now()); blockReaderLocal = new BlockReaderLocal.Builder( diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitCache.java index 6e88042029..ce1c2275a7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitCache.java @@ -20,17 +20,13 @@ package org.apache.hadoop.hdfs; import org.apache.commons.lang.mutable.MutableBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hdfs.client.ClientMmap; import org.apache.hadoop.hdfs.client.ShortCircuitCache; -import org.apache.hadoop.hdfs.client.ShortCircuitCache.CacheVisitor; import org.apache.hadoop.hdfs.client.ShortCircuitCache.ShortCircuitReplicaCreator; import org.apache.hadoop.hdfs.client.ShortCircuitReplica; -import org.apache.hadoop.hdfs.client.ShortCircuitReplica.Key; import org.apache.hadoop.hdfs.client.ShortCircuitReplicaInfo; import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.net.unix.TemporarySocketDirectory; -import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.Time; @@ -44,7 +40,6 @@ import java.io.DataOutputStream; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; -import java.util.Map; public class TestShortCircuitCache { static final Log LOG = LogFactory.getLog(TestShortCircuitCache.class); @@ -105,7 +100,7 @@ public class TestShortCircuitCache { @Override public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() { try { - Key key = new Key(blockId, "test_bp1"); + ExtendedBlockId key = new ExtendedBlockId(blockId, "test_bp1"); return new ShortCircuitReplicaInfo( new ShortCircuitReplica(key, pair.getFileInputStreams()[0], pair.getFileInputStreams()[1], @@ -129,14 +124,14 @@ public class TestShortCircuitCache { new ShortCircuitCache(10, 10000000, 10, 10000000, 1, 10000); final TestFileDescriptorPair pair = new TestFileDescriptorPair(); ShortCircuitReplicaInfo replicaInfo1 = - cache.fetchOrCreate(new Key(123, "test_bp1"), + cache.fetchOrCreate(new ExtendedBlockId(123, "test_bp1"), new SimpleReplicaCreator(123, cache, pair)); Preconditions.checkNotNull(replicaInfo1.getReplica()); Preconditions.checkState(replicaInfo1.getInvalidTokenException() == null); pair.compareWith(replicaInfo1.getReplica().getDataStream(), replicaInfo1.getReplica().getMetaStream()); ShortCircuitReplicaInfo replicaInfo2 = - cache.fetchOrCreate(new Key(123, "test_bp1"), + cache.fetchOrCreate(new ExtendedBlockId(123, "test_bp1"), new ShortCircuitReplicaCreator() { @Override public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() { @@ -157,7 +152,7 @@ public class TestShortCircuitCache { // really long here) ShortCircuitReplicaInfo replicaInfo3 = cache.fetchOrCreate( - new Key(123, "test_bp1"), new ShortCircuitReplicaCreator() { + new ExtendedBlockId(123, "test_bp1"), new ShortCircuitReplicaCreator() { @Override public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() { Assert.fail("expected to use existing entry."); @@ -179,7 +174,7 @@ public class TestShortCircuitCache { final TestFileDescriptorPair pair = new TestFileDescriptorPair(); ShortCircuitReplicaInfo replicaInfo1 = cache.fetchOrCreate( - new Key(123, "test_bp1"), new SimpleReplicaCreator(123, cache, pair)); + new ExtendedBlockId(123, "test_bp1"), new SimpleReplicaCreator(123, cache, pair)); Preconditions.checkNotNull(replicaInfo1.getReplica()); Preconditions.checkState(replicaInfo1.getInvalidTokenException() == null); pair.compareWith(replicaInfo1.getReplica().getDataStream(), @@ -190,7 +185,7 @@ public class TestShortCircuitCache { Thread.sleep(10); ShortCircuitReplicaInfo replicaInfo2 = cache.fetchOrCreate( - new Key(123, "test_bp1"), new ShortCircuitReplicaCreator() { + new ExtendedBlockId(123, "test_bp1"), new ShortCircuitReplicaCreator() { @Override public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() { triedToCreate.setValue(true); @@ -221,7 +216,7 @@ public class TestShortCircuitCache { }; for (int i = 0; i < pairs.length; i++) { replicaInfos[i] = cache.fetchOrCreate( - new Key(i, "test_bp1"), + new ExtendedBlockId(i, "test_bp1"), new SimpleReplicaCreator(i, cache, pairs[i])); Preconditions.checkNotNull(replicaInfos[i].getReplica()); Preconditions.checkState(replicaInfos[i].getInvalidTokenException() == null); @@ -237,7 +232,7 @@ public class TestShortCircuitCache { for (int i = 1; i < pairs.length; i++) { final Integer iVal = new Integer(i); replicaInfos[i] = cache.fetchOrCreate( - new Key(i, "test_bp1"), + new ExtendedBlockId(i, "test_bp1"), new ShortCircuitReplicaCreator() { @Override public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() { @@ -253,7 +248,7 @@ public class TestShortCircuitCache { // The first (oldest) replica should not be cached. final MutableBoolean calledCreate = new MutableBoolean(false); replicaInfos[0] = cache.fetchOrCreate( - new Key(0, "test_bp1"), + new ExtendedBlockId(0, "test_bp1"), new ShortCircuitReplicaCreator() { @Override public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() { @@ -289,7 +284,7 @@ public class TestShortCircuitCache { final long HOUR_IN_MS = 60 * 60 * 1000; for (int i = 0; i < pairs.length; i++) { final Integer iVal = new Integer(i); - final Key key = new Key(i, "test_bp1"); + final ExtendedBlockId key = new ExtendedBlockId(i, "test_bp1"); replicaInfos[i] = cache.fetchOrCreate(key, new ShortCircuitReplicaCreator() { @Override @@ -316,7 +311,7 @@ public class TestShortCircuitCache { @Override public Boolean get() { ShortCircuitReplicaInfo info = cache.fetchOrCreate( - new Key(0, "test_bp1"), new ShortCircuitReplicaCreator() { + new ExtendedBlockId(0, "test_bp1"), new ShortCircuitReplicaCreator() { @Override public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() { return null; @@ -332,7 +327,7 @@ public class TestShortCircuitCache { // Make sure that second replica did not go stale. ShortCircuitReplicaInfo info = cache.fetchOrCreate( - new Key(1, "test_bp1"), new ShortCircuitReplicaCreator() { + new ExtendedBlockId(1, "test_bp1"), new ShortCircuitReplicaCreator() { @Override public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() { Assert.fail("second replica went stale, despite 1 " +