HDFS-13330. ShortCircuitCache#fetchOrCreate never retries. Contributed by Gabor Bota.
This commit is contained in:
parent
6503593711
commit
e66e287efe
@ -664,6 +664,7 @@ private void purge(ShortCircuitReplica replica) {
|
|||||||
unref(replica);
|
unref(replica);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static final int FETCH_OR_CREATE_RETRY_TIMES = 3;
|
||||||
/**
|
/**
|
||||||
* Fetch or create a replica.
|
* Fetch or create a replica.
|
||||||
*
|
*
|
||||||
@ -678,11 +679,11 @@ private void purge(ShortCircuitReplica replica) {
|
|||||||
*/
|
*/
|
||||||
public ShortCircuitReplicaInfo fetchOrCreate(ExtendedBlockId key,
|
public ShortCircuitReplicaInfo fetchOrCreate(ExtendedBlockId key,
|
||||||
ShortCircuitReplicaCreator creator) {
|
ShortCircuitReplicaCreator creator) {
|
||||||
Waitable<ShortCircuitReplicaInfo> newWaitable = null;
|
Waitable<ShortCircuitReplicaInfo> newWaitable;
|
||||||
lock.lock();
|
lock.lock();
|
||||||
try {
|
try {
|
||||||
ShortCircuitReplicaInfo info = null;
|
ShortCircuitReplicaInfo info = null;
|
||||||
do {
|
for (int i = 0; i < FETCH_OR_CREATE_RETRY_TIMES; i++){
|
||||||
if (closed) {
|
if (closed) {
|
||||||
LOG.trace("{}: can't fethchOrCreate {} because the cache is closed.",
|
LOG.trace("{}: can't fethchOrCreate {} because the cache is closed.",
|
||||||
this, key);
|
this, key);
|
||||||
@ -692,11 +693,12 @@ public ShortCircuitReplicaInfo fetchOrCreate(ExtendedBlockId key,
|
|||||||
if (waitable != null) {
|
if (waitable != null) {
|
||||||
try {
|
try {
|
||||||
info = fetch(key, waitable);
|
info = fetch(key, waitable);
|
||||||
|
break;
|
||||||
} catch (RetriableException e) {
|
} catch (RetriableException e) {
|
||||||
LOG.debug("{}: retrying {}", this, e.getMessage());
|
LOG.debug("{}: retrying {}", this, e.getMessage());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} while (false);
|
}
|
||||||
if (info != null) return info;
|
if (info != null) return info;
|
||||||
// We need to load the replica ourselves.
|
// We need to load the replica ourselves.
|
||||||
newWaitable = new Waitable<>(lock.newCondition());
|
newWaitable = new Waitable<>(lock.newCondition());
|
||||||
@ -717,7 +719,8 @@ public ShortCircuitReplicaInfo fetchOrCreate(ExtendedBlockId key,
|
|||||||
*
|
*
|
||||||
* @throws RetriableException If the caller needs to retry.
|
* @throws RetriableException If the caller needs to retry.
|
||||||
*/
|
*/
|
||||||
private ShortCircuitReplicaInfo fetch(ExtendedBlockId key,
|
@VisibleForTesting // ONLY for testing
|
||||||
|
protected ShortCircuitReplicaInfo fetch(ExtendedBlockId key,
|
||||||
Waitable<ShortCircuitReplicaInfo> waitable) throws RetriableException {
|
Waitable<ShortCircuitReplicaInfo> waitable) throws RetriableException {
|
||||||
// Another thread is already in the process of loading this
|
// Another thread is already in the process of loading this
|
||||||
// ShortCircuitReplica. So we simply wait for it to complete.
|
// ShortCircuitReplica. So we simply wait for it to complete.
|
||||||
|
@ -65,6 +65,7 @@
|
|||||||
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.ShmId;
|
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.ShmId;
|
||||||
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot;
|
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
|
import org.apache.hadoop.ipc.RetriableException;
|
||||||
import org.apache.hadoop.net.unix.DomainSocket;
|
import org.apache.hadoop.net.unix.DomainSocket;
|
||||||
import org.apache.hadoop.net.unix.TemporarySocketDirectory;
|
import org.apache.hadoop.net.unix.TemporarySocketDirectory;
|
||||||
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
||||||
@ -793,4 +794,29 @@ public void testPreReceiptVerificationDfsClientCanDoScr() throws Exception {
|
|||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
sockDir.close();
|
sockDir.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFetchOrCreateRetries() throws Exception {
|
||||||
|
try(ShortCircuitCache cache = Mockito
|
||||||
|
.spy(new ShortCircuitCache(10, 10000000, 10, 10000000, 1, 10000, 0))) {
|
||||||
|
final TestFileDescriptorPair pair = new TestFileDescriptorPair();
|
||||||
|
ExtendedBlockId extendedBlockId = new ExtendedBlockId(123, "test_bp1");
|
||||||
|
SimpleReplicaCreator sRC = new SimpleReplicaCreator(123, cache, pair);
|
||||||
|
|
||||||
|
// Arrange that fetch will throw RetriableException for any call
|
||||||
|
Mockito.doThrow(new RetriableException("Retry")).when(cache)
|
||||||
|
.fetch(Mockito.eq(extendedBlockId), Mockito.any());
|
||||||
|
|
||||||
|
// Act: calling fetchOrCreate two times
|
||||||
|
// first call: it will create and put entry to replicaInfoMap
|
||||||
|
// second call: it will call fetch to get info for entry, and should
|
||||||
|
// retry 3 times because RetriableException thrown
|
||||||
|
cache.fetchOrCreate(extendedBlockId, sRC);
|
||||||
|
cache.fetchOrCreate(extendedBlockId, sRC);
|
||||||
|
|
||||||
|
// Assert that fetchOrCreate retried to fetch at least 3 times
|
||||||
|
Mockito.verify(cache, Mockito.atLeast(3))
|
||||||
|
.fetch(Mockito.eq(extendedBlockId), Mockito.any());
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user