HDFS-6750. The DataNode should use its shared memory segment to mark short-circuit replicas that have been unlinked as stale (cmccabe)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1613537 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Colin McCabe 2014-07-25 23:56:47 +00:00
parent 57d8f829d9
commit e85a3fecc6
7 changed files with 163 additions and 16 deletions

View File

@ -321,6 +321,9 @@ Release 2.6.0 - UNRELEASED
DFSOutputStream#close gives up its attempt to contact the namenode DFSOutputStream#close gives up its attempt to contact the namenode
(mitdesai21 via cmccabe) (mitdesai21 via cmccabe)
HDFS-6750. The DataNode should use its shared memory segment to mark
short-circuit replicas that have been unlinked as stale (cmccabe)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-6690. Deduplicate xattr names in memory. (wang) HDFS-6690. Deduplicate xattr names in memory. (wang)

View File

@ -74,7 +74,7 @@
* DN also marks the block's slots as "unanchorable" to prevent additional * DN also marks the block's slots as "unanchorable" to prevent additional
* clients from initiating these operations in the future. * clients from initiating these operations in the future.
* *
* The counterpart fo this class on the client is {@link DfsClientShmManager}. * The counterpart of this class on the client is {@link DfsClientShmManager}.
*/ */
public class ShortCircuitRegistry { public class ShortCircuitRegistry {
public static final Log LOG = LogFactory.getLog(ShortCircuitRegistry.class); public static final Log LOG = LogFactory.getLog(ShortCircuitRegistry.class);
@ -218,6 +218,31 @@ public synchronized boolean processBlockMunlockRequest(
return allowMunlock; return allowMunlock;
} }
/**
* Invalidate any slot associated with a blockId that we are invalidating
* (deleting) from this DataNode. When a slot is invalid, the DFSClient will
* not use the corresponding replica for new read or mmap operations (although
* existing, ongoing read or mmap operations will complete.)
*
* @param blockId The block ID.
*/
public synchronized void processBlockInvalidation(ExtendedBlockId blockId) {
if (!enabled) return;
final Set<Slot> affectedSlots = slots.get(blockId);
if (!affectedSlots.isEmpty()) {
final StringBuilder bld = new StringBuilder();
String prefix = "";
bld.append("Block ").append(blockId).append(" has been invalidated. ").
append("Marking short-circuit slots as invalid: ");
for (Slot slot : affectedSlots) {
slot.makeInvalid();
bld.append(prefix).append(slot.toString());
prefix = ", ";
}
LOG.info(bld.toString());
}
}
public static class NewShmInfo implements Closeable { public static class NewShmInfo implements Closeable {
public final ShmId shmId; public final ShmId shmId;
public final FileInputStream stream; public final FileInputStream stream;

View File

@ -44,6 +44,7 @@
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
@ -1232,8 +1233,15 @@ public void invalidate(String bpid, Block invalidBlks[]) throws IOException {
} }
volumeMap.remove(bpid, invalidBlks[i]); volumeMap.remove(bpid, invalidBlks[i]);
} }
// If a DFSClient has the replica in its cache of short-circuit file
// descriptors (and the client is using ShortCircuitShm), invalidate it.
datanode.getShortCircuitRegistry().processBlockInvalidation(
new ExtendedBlockId(invalidBlks[i].getBlockId(), bpid));
// If the block is cached, start uncaching it. // If the block is cached, start uncaching it.
cacheManager.uncacheBlock(bpid, invalidBlks[i].getBlockId()); cacheManager.uncacheBlock(bpid, invalidBlks[i].getBlockId());
// Delete the block asynchronously to make sure we can do it fast enough. // Delete the block asynchronously to make sure we can do it fast enough.
// It's ok to unlink the block file before the uncache operation // It's ok to unlink the block file before the uncache operation
// finishes. // finishes.

View File

@ -32,11 +32,16 @@
* DfsClientShm is a subclass of ShortCircuitShm which is used by the * DfsClientShm is a subclass of ShortCircuitShm which is used by the
* DfsClient. * DfsClient.
* When the UNIX domain socket associated with this shared memory segment * When the UNIX domain socket associated with this shared memory segment
* closes unexpectedly, we mark the slots inside this segment as stale. * closes unexpectedly, we mark the slots inside this segment as disconnected.
* ShortCircuitReplica objects that contain stale slots are themselves stale, * ShortCircuitReplica objects that contain disconnected slots are stale,
* and will not be used to service new reads or mmap operations. * and will not be used to service new reads or mmap operations.
* However, in-progress read or mmap operations will continue to proceed. * However, in-progress read or mmap operations will continue to proceed.
* Once the last slot is deallocated, the segment can be safely munmapped. * Once the last slot is deallocated, the segment can be safely munmapped.
*
* Slots may also become stale because the associated replica has been deleted
* on the DataNode. In this case, the DataNode will clear the 'valid' bit.
* The client will then see these slots as stale (see
* #{ShortCircuitReplica#isStale}).
*/ */
public class DfsClientShm extends ShortCircuitShm public class DfsClientShm extends ShortCircuitShm
implements DomainSocketWatcher.Handler { implements DomainSocketWatcher.Handler {
@ -58,7 +63,7 @@ public class DfsClientShm extends ShortCircuitShm
* *
* {@link DfsClientShm#handle} sets this to true. * {@link DfsClientShm#handle} sets this to true.
*/ */
private boolean stale = false; private boolean disconnected = false;
DfsClientShm(ShmId shmId, FileInputStream stream, EndpointShmManager manager, DfsClientShm(ShmId shmId, FileInputStream stream, EndpointShmManager manager,
DomainPeer peer) throws IOException { DomainPeer peer) throws IOException {
@ -76,14 +81,14 @@ public DomainPeer getPeer() {
} }
/** /**
* Determine if the shared memory segment is stale. * Determine if the shared memory segment is disconnected from the DataNode.
* *
* This must be called with the DfsClientShmManager lock held. * This must be called with the DfsClientShmManager lock held.
* *
* @return True if the shared memory segment is stale. * @return True if the shared memory segment is stale.
*/ */
public synchronized boolean isStale() { public synchronized boolean isDisconnected() {
return stale; return disconnected;
} }
/** /**
@ -97,8 +102,8 @@ public synchronized boolean isStale() {
public boolean handle(DomainSocket sock) { public boolean handle(DomainSocket sock) {
manager.unregisterShm(getShmId()); manager.unregisterShm(getShmId());
synchronized (this) { synchronized (this) {
Preconditions.checkState(!stale); Preconditions.checkState(!disconnected);
stale = true; disconnected = true;
boolean hadSlots = false; boolean hadSlots = false;
for (Iterator<Slot> iter = slotIterator(); iter.hasNext(); ) { for (Iterator<Slot> iter = slotIterator(); iter.hasNext(); ) {
Slot slot = iter.next(); Slot slot = iter.next();

View File

@ -271,12 +271,12 @@ Slot allocSlot(DomainPeer peer, MutableBoolean usedPeer,
loading = false; loading = false;
finishedLoading.signalAll(); finishedLoading.signalAll();
} }
if (shm.isStale()) { if (shm.isDisconnected()) {
// If the peer closed immediately after the shared memory segment // If the peer closed immediately after the shared memory segment
// was created, the DomainSocketWatcher callback might already have // was created, the DomainSocketWatcher callback might already have
// fired and marked the shm as stale. In this case, we obviously // fired and marked the shm as disconnected. In this case, we
// don't want to add the SharedMemorySegment to our list of valid // obviously don't want to add the SharedMemorySegment to our list
// not-full segments. // of valid not-full segments.
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug(this + ": the UNIX domain socket associated with " + LOG.debug(this + ": the UNIX domain socket associated with " +
"this short-circuit memory closed before we could make " + "this short-circuit memory closed before we could make " +
@ -299,7 +299,7 @@ Slot allocSlot(DomainPeer peer, MutableBoolean usedPeer,
void freeSlot(Slot slot) { void freeSlot(Slot slot) {
DfsClientShm shm = (DfsClientShm)slot.getShm(); DfsClientShm shm = (DfsClientShm)slot.getShm();
shm.unregisterSlot(slot.getSlotIdx()); shm.unregisterSlot(slot.getSlotIdx());
if (shm.isStale()) { if (shm.isDisconnected()) {
// Stale shared memory segments should not be tracked here. // Stale shared memory segments should not be tracked here.
Preconditions.checkState(!full.containsKey(shm.getShmId())); Preconditions.checkState(!full.containsKey(shm.getShmId()));
Preconditions.checkState(!notFull.containsKey(shm.getShmId())); Preconditions.checkState(!notFull.containsKey(shm.getShmId()));

View File

@ -306,6 +306,13 @@ public int getSlotIdx() {
(slotAddress - baseAddress) / BYTES_PER_SLOT); (slotAddress - baseAddress) / BYTES_PER_SLOT);
} }
/**
* Clear the slot.
*/
void clear() {
unsafe.putLongVolatile(null, this.slotAddress, 0);
}
private boolean isSet(long flag) { private boolean isSet(long flag) {
long prev = unsafe.getLongVolatile(null, this.slotAddress); long prev = unsafe.getLongVolatile(null, this.slotAddress);
return (prev & flag) != 0; return (prev & flag) != 0;
@ -535,6 +542,7 @@ synchronized public final Slot allocAndRegisterSlot(
} }
allocatedSlots.set(idx, true); allocatedSlots.set(idx, true);
Slot slot = new Slot(calculateSlotAddress(idx), blockId); Slot slot = new Slot(calculateSlotAddress(idx), blockId);
slot.clear();
slot.makeValid(); slot.makeValid();
slots[idx] = slot; slots[idx] = slot;
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
@ -583,7 +591,7 @@ synchronized public final Slot registerSlot(int slotIdx,
Slot slot = new Slot(calculateSlotAddress(slotIdx), blockId); Slot slot = new Slot(calculateSlotAddress(slotIdx), blockId);
if (!slot.isValid()) { if (!slot.isValid()) {
throw new InvalidRequestException(this + ": slot " + slotIdx + throw new InvalidRequestException(this + ": slot " + slotIdx +
" has not been allocated."); " is not marked as valid.");
} }
slots[slotIdx] = slot; slots[slotIdx] = slot;
allocatedSlots.set(slotIdx, true); allocatedSlots.set(slotIdx, true);

View File

@ -23,6 +23,7 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_KEY;
import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.equalTo;
import java.io.DataOutputStream; import java.io.DataOutputStream;
@ -30,7 +31,9 @@
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.FileOutputStream; import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator;
import java.util.Map; import java.util.Map;
import org.apache.commons.lang.mutable.MutableBoolean; import org.apache.commons.lang.mutable.MutableBoolean;
@ -462,6 +465,7 @@ public void visit(HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info)
} }
}, 10, 60000); }, 10, 60000);
cluster.shutdown(); cluster.shutdown();
sockDir.close();
} }
@Test(timeout=60000) @Test(timeout=60000)
@ -516,4 +520,98 @@ public void visit(int numOutstandingMmaps,
}); });
cluster.shutdown(); cluster.shutdown();
} }
/**
* Test unlinking a file whose blocks we are caching in the DFSClient.
* The DataNode will notify the DFSClient that the replica is stale via the
* ShortCircuitShm.
*/
@Test(timeout=60000)
public void testUnlinkingReplicasInFileDescriptorCache() throws Exception {
BlockReaderTestUtil.enableShortCircuitShmTracing();
TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
Configuration conf = createShortCircuitConf(
"testUnlinkingReplicasInFileDescriptorCache", sockDir);
// We don't want the CacheCleaner to time out short-circuit shared memory
// segments during the test, so set the timeout really high.
conf.setLong(DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_KEY,
1000000000L);
MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
DistributedFileSystem fs = cluster.getFileSystem();
final ShortCircuitCache cache =
fs.getClient().getClientContext().getShortCircuitCache();
cache.getDfsClientShmManager().visit(new Visitor() {
@Override
public void visit(HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info)
throws IOException {
// The ClientShmManager starts off empty.
Assert.assertEquals(0, info.size());
}
});
final Path TEST_PATH = new Path("/test_file");
final int TEST_FILE_LEN = 8193;
final int SEED = 0xFADE0;
DFSTestUtil.createFile(fs, TEST_PATH, TEST_FILE_LEN,
(short)1, SEED);
byte contents[] = DFSTestUtil.readFileBuffer(fs, TEST_PATH);
byte expected[] = DFSTestUtil.
calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
Assert.assertTrue(Arrays.equals(contents, expected));
// Loading this file brought the ShortCircuitReplica into our local
// replica cache.
final DatanodeInfo datanode =
new DatanodeInfo(cluster.getDataNodes().get(0).getDatanodeId());
cache.getDfsClientShmManager().visit(new Visitor() {
@Override
public void visit(HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info)
throws IOException {
Assert.assertTrue(info.get(datanode).full.isEmpty());
Assert.assertFalse(info.get(datanode).disabled);
Assert.assertEquals(1, info.get(datanode).notFull.values().size());
DfsClientShm shm =
info.get(datanode).notFull.values().iterator().next();
Assert.assertFalse(shm.isDisconnected());
}
});
// Remove the file whose blocks we just read.
fs.delete(TEST_PATH, false);
// Wait for the replica to be purged from the DFSClient's cache.
GenericTestUtils.waitFor(new Supplier<Boolean>() {
MutableBoolean done = new MutableBoolean(true);
@Override
public Boolean get() {
try {
done.setValue(true);
cache.getDfsClientShmManager().visit(new Visitor() {
@Override
public void visit(HashMap<DatanodeInfo,
PerDatanodeVisitorInfo> info) throws IOException {
Assert.assertTrue(info.get(datanode).full.isEmpty());
Assert.assertFalse(info.get(datanode).disabled);
Assert.assertEquals(1,
info.get(datanode).notFull.values().size());
DfsClientShm shm = info.get(datanode).notFull.values().
iterator().next();
// Check that all slots have been invalidated.
for (Iterator<Slot> iter = shm.slotIterator();
iter.hasNext(); ) {
Slot slot = iter.next();
if (slot.isValid()) {
done.setValue(false);
}
}
}
});
} catch (IOException e) {
LOG.error("error running visitor", e);
}
return done.booleanValue();
}
}, 10, 60000);
cluster.shutdown();
sockDir.close();
}
} }