HADOOP-11970. Replace uses of ThreadLocal<Random> with JDK7 ThreadLocalRandom (Sean Busbey via Colin P. McCabe)
This commit is contained in:
parent
c97f32e7b9
commit
470c87dbc6
@ -595,6 +595,9 @@ Release 2.8.0 - UNRELEASED
|
|||||||
HADOOP-11812. Implement listLocatedStatus for ViewFileSystem to speed up
|
HADOOP-11812. Implement listLocatedStatus for ViewFileSystem to speed up
|
||||||
split calculation (gera)
|
split calculation (gera)
|
||||||
|
|
||||||
|
HADOOP-11970. Replace uses of ThreadLocal<Random> with JDK7
|
||||||
|
ThreadLocalRandom. (Sean Busbey via Colin P. McCabe)
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
HADOOP-11802: DomainSocketWatcher thread terminates sometimes after there
|
HADOOP-11802: DomainSocketWatcher thread terminates sometimes after there
|
||||||
is an I/O error during requestShortCircuitShm (cmccabe)
|
is an I/O error during requestShortCircuitShm (cmccabe)
|
||||||
|
@ -28,7 +28,7 @@
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
import java.util.Random;
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
@ -47,13 +47,6 @@ public class RetryPolicies {
|
|||||||
|
|
||||||
public static final Log LOG = LogFactory.getLog(RetryPolicies.class);
|
public static final Log LOG = LogFactory.getLog(RetryPolicies.class);
|
||||||
|
|
||||||
private static ThreadLocal<Random> RANDOM = new ThreadLocal<Random>() {
|
|
||||||
@Override
|
|
||||||
protected Random initialValue() {
|
|
||||||
return new Random();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* <p>
|
* <p>
|
||||||
* Try once, and fail by re-throwing the exception.
|
* Try once, and fail by re-throwing the exception.
|
||||||
@ -321,7 +314,8 @@ public RetryAction shouldRetry(Exception e, int curRetry, int failovers,
|
|||||||
}
|
}
|
||||||
|
|
||||||
//calculate sleep time and return.
|
//calculate sleep time and return.
|
||||||
final double ratio = RANDOM.get().nextDouble() + 0.5;//0.5 <= ratio <=1.5
|
// ensure 0.5 <= ratio <=1.5
|
||||||
|
final double ratio = ThreadLocalRandom.current().nextDouble() + 0.5;
|
||||||
final long sleepTime = Math.round(p.sleepMillis * ratio);
|
final long sleepTime = Math.round(p.sleepMillis * ratio);
|
||||||
return new RetryAction(RetryAction.RetryDecision.RETRY, sleepTime);
|
return new RetryAction(RetryAction.RetryDecision.RETRY, sleepTime);
|
||||||
}
|
}
|
||||||
@ -610,7 +604,7 @@ public RetryAction shouldRetry(Exception e, int retries,
|
|||||||
private static long calculateExponentialTime(long time, int retries,
|
private static long calculateExponentialTime(long time, int retries,
|
||||||
long cap) {
|
long cap) {
|
||||||
long baseTime = Math.min(time * (1L << retries), cap);
|
long baseTime = Math.min(time * (1L << retries), cap);
|
||||||
return (long) (baseTime * (RANDOM.get().nextDouble() + 0.5));
|
return (long) (baseTime * (ThreadLocalRandom.current().nextDouble() + 0.5));
|
||||||
}
|
}
|
||||||
|
|
||||||
private static long calculateExponentialTime(long time, int retries) {
|
private static long calculateExponentialTime(long time, int retries) {
|
||||||
|
@ -48,6 +48,7 @@
|
|||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
import java.util.concurrent.SynchronousQueue;
|
import java.util.concurrent.SynchronousQueue;
|
||||||
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
@ -316,7 +317,8 @@ public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode,
|
|||||||
|
|
||||||
this.authority = nameNodeUri == null? "null": nameNodeUri.getAuthority();
|
this.authority = nameNodeUri == null? "null": nameNodeUri.getAuthority();
|
||||||
this.clientName = "DFSClient_" + dfsClientConf.getTaskId() + "_" +
|
this.clientName = "DFSClient_" + dfsClientConf.getTaskId() + "_" +
|
||||||
DFSUtil.getRandom().nextInt() + "_" + Thread.currentThread().getId();
|
ThreadLocalRandom.current().nextInt() + "_" +
|
||||||
|
Thread.currentThread().getId();
|
||||||
int numResponseToDrop = conf.getInt(
|
int numResponseToDrop = conf.getInt(
|
||||||
DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY,
|
DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY,
|
||||||
DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_DEFAULT);
|
DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_DEFAULT);
|
||||||
|
@ -40,6 +40,7 @@
|
|||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.ExecutorCompletionService;
|
import java.util.concurrent.ExecutorCompletionService;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
@ -976,7 +977,9 @@ private DNAddrPair chooseDataNode(LocatedBlock block,
|
|||||||
// expanded to 9000ms.
|
// expanded to 9000ms.
|
||||||
final int timeWindow = dfsClient.getConf().getTimeWindow();
|
final int timeWindow = dfsClient.getConf().getTimeWindow();
|
||||||
double waitTime = timeWindow * failures + // grace period for the last round of attempt
|
double waitTime = timeWindow * failures + // grace period for the last round of attempt
|
||||||
timeWindow * (failures + 1) * DFSUtil.getRandom().nextDouble(); // expanding time window for each failure
|
// expanding time window for each failure
|
||||||
|
timeWindow * (failures + 1) *
|
||||||
|
ThreadLocalRandom.current().nextDouble();
|
||||||
DFSClient.LOG.warn("DFS chooseDataNode: got # " + (failures + 1) + " IOException, will wait for " + waitTime + " msec.");
|
DFSClient.LOG.warn("DFS chooseDataNode: got # " + (failures + 1) + " IOException, will wait for " + waitTime + " msec.");
|
||||||
Thread.sleep((long)waitTime);
|
Thread.sleep((long)waitTime);
|
||||||
} catch (InterruptedException iex) {
|
} catch (InterruptedException iex) {
|
||||||
|
@ -50,8 +50,8 @@
|
|||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Random;
|
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
|
|
||||||
import javax.net.SocketFactory;
|
import javax.net.SocketFactory;
|
||||||
|
|
||||||
@ -103,12 +103,6 @@ public class DFSUtil {
|
|||||||
public static final Log LOG = LogFactory.getLog(DFSUtil.class.getName());
|
public static final Log LOG = LogFactory.getLog(DFSUtil.class.getName());
|
||||||
|
|
||||||
private DFSUtil() { /* Hidden constructor */ }
|
private DFSUtil() { /* Hidden constructor */ }
|
||||||
private static final ThreadLocal<Random> RANDOM = new ThreadLocal<Random>() {
|
|
||||||
@Override
|
|
||||||
protected Random initialValue() {
|
|
||||||
return new Random();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
private static final ThreadLocal<SecureRandom> SECURE_RANDOM = new ThreadLocal<SecureRandom>() {
|
private static final ThreadLocal<SecureRandom> SECURE_RANDOM = new ThreadLocal<SecureRandom>() {
|
||||||
@Override
|
@Override
|
||||||
@ -117,11 +111,6 @@ protected SecureRandom initialValue() {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
/** @return a pseudo random number generator. */
|
|
||||||
public static Random getRandom() {
|
|
||||||
return RANDOM.get();
|
|
||||||
}
|
|
||||||
|
|
||||||
/** @return a pseudo secure random number generator. */
|
/** @return a pseudo secure random number generator. */
|
||||||
public static SecureRandom getSecureRandom() {
|
public static SecureRandom getSecureRandom() {
|
||||||
return SECURE_RANDOM.get();
|
return SECURE_RANDOM.get();
|
||||||
@ -130,9 +119,8 @@ public static SecureRandom getSecureRandom() {
|
|||||||
/** Shuffle the elements in the given array. */
|
/** Shuffle the elements in the given array. */
|
||||||
public static <T> T[] shuffle(final T[] array) {
|
public static <T> T[] shuffle(final T[] array) {
|
||||||
if (array != null && array.length > 0) {
|
if (array != null && array.length > 0) {
|
||||||
final Random random = getRandom();
|
|
||||||
for (int n = array.length; n > 1; ) {
|
for (int n = array.length; n > 1; ) {
|
||||||
final int randomIndex = random.nextInt(n);
|
final int randomIndex = ThreadLocalRandom.current().nextInt(n);
|
||||||
n--;
|
n--;
|
||||||
if (n != randomIndex) {
|
if (n != randomIndex) {
|
||||||
final T tmp = array[randomIndex];
|
final T tmp = array[randomIndex];
|
||||||
|
@ -35,6 +35,7 @@
|
|||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.TreeMap;
|
import java.util.TreeMap;
|
||||||
import java.util.TreeSet;
|
import java.util.TreeSet;
|
||||||
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
import org.apache.hadoop.HadoopIllegalArgumentException;
|
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||||
@ -1027,7 +1028,8 @@ private BlocksWithLocations getBlocksWithLocations(final DatanodeID datanode,
|
|||||||
return new BlocksWithLocations(new BlockWithLocations[0]);
|
return new BlocksWithLocations(new BlockWithLocations[0]);
|
||||||
}
|
}
|
||||||
Iterator<BlockInfoContiguous> iter = node.getBlockIterator();
|
Iterator<BlockInfoContiguous> iter = node.getBlockIterator();
|
||||||
int startBlock = DFSUtil.getRandom().nextInt(numBlocks); // starting from a random block
|
// starting from a random block
|
||||||
|
int startBlock = ThreadLocalRandom.current().nextInt(numBlocks);
|
||||||
// skip blocks
|
// skip blocks
|
||||||
for(int i=0; i<startBlock; i++) {
|
for(int i=0; i<startBlock; i++) {
|
||||||
iter.next();
|
iter.next();
|
||||||
@ -1669,7 +1671,7 @@ else if (node.isDecommissionInProgress()) {
|
|||||||
// switch to a different node randomly
|
// switch to a different node randomly
|
||||||
// this to prevent from deterministically selecting the same node even
|
// this to prevent from deterministically selecting the same node even
|
||||||
// if the node failed to replicate the block on previous iterations
|
// if the node failed to replicate the block on previous iterations
|
||||||
if(DFSUtil.getRandom().nextBoolean())
|
if(ThreadLocalRandom.current().nextBoolean())
|
||||||
srcNode = node;
|
srcNode = node;
|
||||||
}
|
}
|
||||||
if(numReplicas != null)
|
if(numReplicas != null)
|
||||||
@ -1920,7 +1922,7 @@ void rescanPostponedMisreplicatedBlocks() {
|
|||||||
datanodeManager.getBlocksPerPostponedMisreplicatedBlocksRescan();
|
datanodeManager.getBlocksPerPostponedMisreplicatedBlocksRescan();
|
||||||
long base = getPostponedMisreplicatedBlocksCount() - blocksPerRescan;
|
long base = getPostponedMisreplicatedBlocksCount() - blocksPerRescan;
|
||||||
if (base > 0) {
|
if (base > 0) {
|
||||||
startIndex = DFSUtil.getRandom().nextLong() % (base+1);
|
startIndex = ThreadLocalRandom.current().nextLong() % (base+1);
|
||||||
if (startIndex < 0) {
|
if (startIndex < 0) {
|
||||||
startIndex += (base+1);
|
startIndex += (base+1);
|
||||||
}
|
}
|
||||||
|
@ -51,6 +51,7 @@
|
|||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.net.UnknownHostException;
|
import java.net.UnknownHostException;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Manage datanodes, include decommission and other activities.
|
* Manage datanodes, include decommission and other activities.
|
||||||
@ -457,7 +458,7 @@ DatanodeDescriptor getDatanodeDescriptor(String address) {
|
|||||||
// Try something rack local.
|
// Try something rack local.
|
||||||
if (node == null && !rackNodes.isEmpty()) {
|
if (node == null && !rackNodes.isEmpty()) {
|
||||||
node = (DatanodeDescriptor) (rackNodes
|
node = (DatanodeDescriptor) (rackNodes
|
||||||
.get(DFSUtil.getRandom().nextInt(rackNodes.size())));
|
.get(ThreadLocalRandom.current().nextInt(rackNodes.size())));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -20,12 +20,12 @@
|
|||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
import java.util.concurrent.locks.ReadWriteLock;
|
import java.util.concurrent.locks.ReadWriteLock;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.hdfs.DFSUtil;
|
|
||||||
|
|
||||||
/** A map from host names to datanode descriptors. */
|
/** A map from host names to datanode descriptors. */
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
@ -161,7 +161,7 @@ DatanodeDescriptor getDatanodeByHost(String ipAddr) {
|
|||||||
return nodes[0];
|
return nodes[0];
|
||||||
}
|
}
|
||||||
// more than one node
|
// more than one node
|
||||||
return nodes[DFSUtil.getRandom().nextInt(nodes.length)];
|
return nodes[ThreadLocalRandom.current().nextInt(nodes.length)];
|
||||||
} finally {
|
} finally {
|
||||||
hostmapLock.readLock().unlock();
|
hostmapLock.readLock().unlock();
|
||||||
}
|
}
|
||||||
|
@ -28,13 +28,13 @@
|
|||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
|
|
||||||
import com.google.common.base.Joiner;
|
import com.google.common.base.Joiner;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
||||||
import org.apache.hadoop.hdfs.client.BlockReportOptions;
|
import org.apache.hadoop.hdfs.client.BlockReportOptions;
|
||||||
import org.apache.hadoop.hdfs.DFSUtil;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
@ -122,7 +122,7 @@ static enum RunningState {
|
|||||||
this.dn = bpos.getDataNode();
|
this.dn = bpos.getDataNode();
|
||||||
this.nnAddr = nnAddr;
|
this.nnAddr = nnAddr;
|
||||||
this.dnConf = dn.getDnConf();
|
this.dnConf = dn.getDnConf();
|
||||||
prevBlockReportId = DFSUtil.getRandom().nextLong();
|
prevBlockReportId = ThreadLocalRandom.current().nextLong();
|
||||||
scheduler = new Scheduler(dnConf.heartBeatInterval, dnConf.blockReportInterval);
|
scheduler = new Scheduler(dnConf.heartBeatInterval, dnConf.blockReportInterval);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -409,7 +409,7 @@ private long generateUniqueBlockReportId() {
|
|||||||
// not send a 0 value ourselves.
|
// not send a 0 value ourselves.
|
||||||
prevBlockReportId++;
|
prevBlockReportId++;
|
||||||
while (prevBlockReportId == 0) {
|
while (prevBlockReportId == 0) {
|
||||||
prevBlockReportId = DFSUtil.getRandom().nextLong();
|
prevBlockReportId = ThreadLocalRandom.current().nextLong();
|
||||||
}
|
}
|
||||||
return prevBlockReportId;
|
return prevBlockReportId;
|
||||||
}
|
}
|
||||||
@ -1054,7 +1054,7 @@ long scheduleBlockReport(long delay) {
|
|||||||
if (delay > 0) { // send BR after random delay
|
if (delay > 0) { // send BR after random delay
|
||||||
// Numerical overflow is possible here and is okay.
|
// Numerical overflow is possible here and is okay.
|
||||||
nextBlockReportTime =
|
nextBlockReportTime =
|
||||||
monotonicNow() + DFSUtil.getRandom().nextInt((int) (delay));
|
monotonicNow() + ThreadLocalRandom.current().nextInt((int) (delay));
|
||||||
} else { // send at next heartbeat
|
} else { // send at next heartbeat
|
||||||
nextBlockReportTime = monotonicNow();
|
nextBlockReportTime = monotonicNow();
|
||||||
}
|
}
|
||||||
@ -1073,7 +1073,7 @@ void scheduleNextBlockReport() {
|
|||||||
// time before we start the periodic block reports.
|
// time before we start the periodic block reports.
|
||||||
if (resetBlockReportTime) {
|
if (resetBlockReportTime) {
|
||||||
nextBlockReportTime = monotonicNow() +
|
nextBlockReportTime = monotonicNow() +
|
||||||
DFSUtil.getRandom().nextInt((int)(blockReportIntervalMs));
|
ThreadLocalRandom.current().nextInt((int)(blockReportIntervalMs));
|
||||||
resetBlockReportTime = false;
|
resetBlockReportTime = false;
|
||||||
} else {
|
} else {
|
||||||
/* say the last block report was at 8:20:14. The current report
|
/* say the last block report was at 8:20:14. The current report
|
||||||
|
@ -31,6 +31,7 @@
|
|||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||||
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.regex.Matcher;
|
import java.util.regex.Matcher;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
@ -41,7 +42,6 @@
|
|||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileUtil;
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.DFSUtil;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||||
@ -327,7 +327,8 @@ public long getGenStamp() {
|
|||||||
|
|
||||||
void start() {
|
void start() {
|
||||||
shouldRun = true;
|
shouldRun = true;
|
||||||
long offset = DFSUtil.getRandom().nextInt((int) (scanPeriodMsecs/1000L)) * 1000L; //msec
|
long offset = ThreadLocalRandom.current().nextInt(
|
||||||
|
(int) (scanPeriodMsecs/1000L)) * 1000L; //msec
|
||||||
long firstScanTime = Time.now() + offset;
|
long firstScanTime = Time.now() + offset;
|
||||||
LOG.info("Periodic Directory Tree Verification scan starting at "
|
LOG.info("Periodic Directory Tree Verification scan starting at "
|
||||||
+ firstScanTime + " with interval " + scanPeriodMsecs);
|
+ firstScanTime + " with interval " + scanPeriodMsecs);
|
||||||
|
@ -22,7 +22,6 @@
|
|||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.DFSUtil;
|
|
||||||
import org.apache.hadoop.metrics2.MetricsSystem;
|
import org.apache.hadoop.metrics2.MetricsSystem;
|
||||||
import org.apache.hadoop.metrics2.annotation.Metric;
|
import org.apache.hadoop.metrics2.annotation.Metric;
|
||||||
import org.apache.hadoop.metrics2.annotation.Metrics;
|
import org.apache.hadoop.metrics2.annotation.Metrics;
|
||||||
@ -33,6 +32,8 @@
|
|||||||
import org.apache.hadoop.metrics2.lib.MutableRate;
|
import org.apache.hadoop.metrics2.lib.MutableRate;
|
||||||
import org.apache.hadoop.metrics2.source.JvmMetrics;
|
import org.apache.hadoop.metrics2.source.JvmMetrics;
|
||||||
|
|
||||||
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
* This class is for maintaining the various DataNode statistics
|
* This class is for maintaining the various DataNode statistics
|
||||||
@ -177,7 +178,7 @@ public static DataNodeMetrics create(Configuration conf, String dnName) {
|
|||||||
MetricsSystem ms = DefaultMetricsSystem.instance();
|
MetricsSystem ms = DefaultMetricsSystem.instance();
|
||||||
JvmMetrics jm = JvmMetrics.create("DataNode", sessionId, ms);
|
JvmMetrics jm = JvmMetrics.create("DataNode", sessionId, ms);
|
||||||
String name = "DataNodeActivity-"+ (dnName.isEmpty()
|
String name = "DataNodeActivity-"+ (dnName.isEmpty()
|
||||||
? "UndefinedDataNodeName"+ DFSUtil.getRandom().nextInt()
|
? "UndefinedDataNodeName"+ ThreadLocalRandom.current().nextInt()
|
||||||
: dnName.replace(':', '-'));
|
: dnName.replace(':', '-'));
|
||||||
|
|
||||||
// Percentile measurement is off by default, by watching no intervals
|
// Percentile measurement is off by default, by watching no intervals
|
||||||
|
@ -32,6 +32,7 @@
|
|||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.concurrent.CopyOnWriteArrayList;
|
import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
@ -599,7 +600,7 @@ public void format() throws IOException {
|
|||||||
private static int newNamespaceID() {
|
private static int newNamespaceID() {
|
||||||
int newID = 0;
|
int newID = 0;
|
||||||
while(newID == 0)
|
while(newID == 0)
|
||||||
newID = DFSUtil.getRandom().nextInt(0x7FFFFFFF); // use 31 bits only
|
newID = ThreadLocalRandom.current().nextInt(0x7FFFFFFF); // use 31 bits
|
||||||
return newID;
|
return newID;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -32,6 +32,7 @@
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.TreeSet;
|
import java.util.TreeSet;
|
||||||
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
@ -44,7 +45,6 @@
|
|||||||
import org.apache.hadoop.hdfs.BlockReaderFactory;
|
import org.apache.hadoop.hdfs.BlockReaderFactory;
|
||||||
import org.apache.hadoop.hdfs.DFSClient;
|
import org.apache.hadoop.hdfs.DFSClient;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.DFSUtil;
|
|
||||||
import org.apache.hadoop.hdfs.RemotePeerFactory;
|
import org.apache.hadoop.hdfs.RemotePeerFactory;
|
||||||
import org.apache.hadoop.fs.StorageType;
|
import org.apache.hadoop.fs.StorageType;
|
||||||
import org.apache.hadoop.hdfs.net.Peer;
|
import org.apache.hadoop.hdfs.net.Peer;
|
||||||
@ -933,7 +933,7 @@ private DatanodeInfo bestNode(DFSClient dfs, DatanodeInfo[] nodes,
|
|||||||
}
|
}
|
||||||
DatanodeInfo chosenNode;
|
DatanodeInfo chosenNode;
|
||||||
do {
|
do {
|
||||||
chosenNode = nodes[DFSUtil.getRandom().nextInt(nodes.length)];
|
chosenNode = nodes[ThreadLocalRandom.current().nextInt(nodes.length)];
|
||||||
} while (deadNodes.contains(chosenNode));
|
} while (deadNodes.contains(chosenNode));
|
||||||
return chosenNode;
|
return chosenNode;
|
||||||
}
|
}
|
||||||
|
@ -29,6 +29,7 @@
|
|||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
@ -214,8 +215,7 @@ String deleteSnapshot(String snapshot) throws IOException {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String call() throws Exception {
|
public String call() throws Exception {
|
||||||
final Random r = DFSUtil.getRandom();
|
final int op = ThreadLocalRandom.current().nextInt(6);
|
||||||
final int op = r.nextInt(6);
|
|
||||||
if (op <= 1) {
|
if (op <= 1) {
|
||||||
pauseAllFiles();
|
pauseAllFiles();
|
||||||
try {
|
try {
|
||||||
@ -229,7 +229,8 @@ public String call() throws Exception {
|
|||||||
if (keys.length == 0) {
|
if (keys.length == 0) {
|
||||||
return "NO-OP";
|
return "NO-OP";
|
||||||
}
|
}
|
||||||
final String snapshot = keys[r.nextInt(keys.length)];
|
final String snapshot = keys[ThreadLocalRandom.current()
|
||||||
|
.nextInt(keys.length)];
|
||||||
final String s = checkSnapshot(snapshot);
|
final String s = checkSnapshot(snapshot);
|
||||||
|
|
||||||
if (op == 2) {
|
if (op == 2) {
|
||||||
@ -292,13 +293,13 @@ static class FileWorker extends Worker {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String call() throws IOException {
|
public String call() throws IOException {
|
||||||
final Random r = DFSUtil.getRandom();
|
final int op = ThreadLocalRandom.current().nextInt(9);
|
||||||
final int op = r.nextInt(9);
|
|
||||||
if (op == 0) {
|
if (op == 0) {
|
||||||
return checkFullFile();
|
return checkFullFile();
|
||||||
} else {
|
} else {
|
||||||
final int nBlocks = r.nextInt(4) + 1;
|
final int nBlocks = ThreadLocalRandom.current().nextInt(4) + 1;
|
||||||
final int lastBlockSize = r.nextInt(BLOCK_SIZE) + 1;
|
final int lastBlockSize = ThreadLocalRandom.current()
|
||||||
|
.nextInt(BLOCK_SIZE) + 1;
|
||||||
final int nBytes = nBlocks*BLOCK_SIZE + lastBlockSize;
|
final int nBytes = nBlocks*BLOCK_SIZE + lastBlockSize;
|
||||||
|
|
||||||
if (op <= 4) {
|
if (op <= 4) {
|
||||||
@ -316,7 +317,7 @@ String append(int n) throws IOException {
|
|||||||
.append(n).append(" bytes to ").append(file.getName());
|
.append(n).append(" bytes to ").append(file.getName());
|
||||||
|
|
||||||
final byte[] bytes = new byte[n];
|
final byte[] bytes = new byte[n];
|
||||||
DFSUtil.getRandom().nextBytes(bytes);
|
ThreadLocalRandom.current().nextBytes(bytes);
|
||||||
|
|
||||||
{ // write to local file
|
{ // write to local file
|
||||||
final FileOutputStream out = new FileOutputStream(localFile, true);
|
final FileOutputStream out = new FileOutputStream(localFile, true);
|
||||||
@ -446,7 +447,6 @@ void start() {
|
|||||||
final Thread t = new Thread(null, new Runnable() {
|
final Thread t = new Thread(null, new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
final Random r = DFSUtil.getRandom();
|
|
||||||
for(State s; !(s = checkErrorState()).isTerminated;) {
|
for(State s; !(s = checkErrorState()).isTerminated;) {
|
||||||
if (s == State.RUNNING) {
|
if (s == State.RUNNING) {
|
||||||
isCalling.set(true);
|
isCalling.set(true);
|
||||||
@ -458,7 +458,7 @@ public void run() {
|
|||||||
}
|
}
|
||||||
isCalling.set(false);
|
isCalling.set(false);
|
||||||
}
|
}
|
||||||
sleep(r.nextInt(100) + 50);
|
sleep(ThreadLocalRandom.current().nextInt(100) + 50);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}, name);
|
}, name);
|
||||||
|
@ -19,6 +19,7 @@
|
|||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
@ -272,7 +273,7 @@ public void testRollback() throws IOException {
|
|||||||
|
|
||||||
final Path file = new Path(foo, "file");
|
final Path file = new Path(foo, "file");
|
||||||
final byte[] data = new byte[1024];
|
final byte[] data = new byte[1024];
|
||||||
DFSUtil.getRandom().nextBytes(data);
|
ThreadLocalRandom.current().nextBytes(data);
|
||||||
final FSDataOutputStream out = cluster.getFileSystem().create(file);
|
final FSDataOutputStream out = cluster.getFileSystem().create(file);
|
||||||
out.write(data, 0, data.length);
|
out.write(data, 0, data.length);
|
||||||
out.close();
|
out.close();
|
||||||
@ -320,7 +321,8 @@ private static void startRollingUpgrade(Path foo, Path bar,
|
|||||||
Assert.assertTrue(dfs.exists(bar));
|
Assert.assertTrue(dfs.exists(bar));
|
||||||
|
|
||||||
//truncate a file
|
//truncate a file
|
||||||
final int newLength = DFSUtil.getRandom().nextInt(data.length - 1) + 1;
|
final int newLength = ThreadLocalRandom.current().nextInt(data.length - 1)
|
||||||
|
+ 1;
|
||||||
dfs.truncate(file, newLength);
|
dfs.truncate(file, newLength);
|
||||||
TestFileTruncate.checkBlockRecovery(file, dfs);
|
TestFileTruncate.checkBlockRecovery(file, dfs);
|
||||||
AppendTestUtil.checkFullFile(dfs, file, newLength, data);
|
AppendTestUtil.checkFullFile(dfs, file, newLength, data);
|
||||||
|
@ -36,6 +36,7 @@
|
|||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
|
|
||||||
import org.apache.commons.logging.impl.Log4JLogger;
|
import org.apache.commons.logging.impl.Log4JLogger;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
@ -75,7 +76,6 @@ public class TestReplicationPolicy {
|
|||||||
((Log4JLogger)BlockPlacementPolicy.LOG).getLogger().setLevel(Level.ALL);
|
((Log4JLogger)BlockPlacementPolicy.LOG).getLogger().setLevel(Level.ALL);
|
||||||
}
|
}
|
||||||
|
|
||||||
private final Random random = DFSUtil.getRandom();
|
|
||||||
private static final int BLOCK_SIZE = 1024;
|
private static final int BLOCK_SIZE = 1024;
|
||||||
private static final int NUM_OF_DATANODES = 6;
|
private static final int NUM_OF_DATANODES = 6;
|
||||||
private static NetworkTopology cluster;
|
private static NetworkTopology cluster;
|
||||||
@ -850,14 +850,16 @@ public void testReplicationWithPriority() throws Exception {
|
|||||||
.getNamesystem().getBlockManager().neededReplications;
|
.getNamesystem().getBlockManager().neededReplications;
|
||||||
for (int i = 0; i < 100; i++) {
|
for (int i = 0; i < 100; i++) {
|
||||||
// Adding the blocks directly to normal priority
|
// Adding the blocks directly to normal priority
|
||||||
neededReplications.add(new Block(random.nextLong()), 2, 0, 3);
|
neededReplications.add(new Block(ThreadLocalRandom.current()
|
||||||
|
.nextLong()), 2, 0, 3);
|
||||||
}
|
}
|
||||||
// Lets wait for the replication interval, to start process normal
|
// Lets wait for the replication interval, to start process normal
|
||||||
// priority blocks
|
// priority blocks
|
||||||
Thread.sleep(DFS_NAMENODE_REPLICATION_INTERVAL);
|
Thread.sleep(DFS_NAMENODE_REPLICATION_INTERVAL);
|
||||||
|
|
||||||
// Adding the block directly to high priority list
|
// Adding the block directly to high priority list
|
||||||
neededReplications.add(new Block(random.nextLong()), 1, 0, 3);
|
neededReplications.add(new Block(ThreadLocalRandom.current().nextLong()),
|
||||||
|
1, 0, 3);
|
||||||
|
|
||||||
// Lets wait for the replication interval
|
// Lets wait for the replication interval
|
||||||
Thread.sleep(DFS_NAMENODE_REPLICATION_INTERVAL);
|
Thread.sleep(DFS_NAMENODE_REPLICATION_INTERVAL);
|
||||||
@ -880,19 +882,24 @@ public void testChooseUnderReplicatedBlocks() throws Exception {
|
|||||||
|
|
||||||
for (int i = 0; i < 5; i++) {
|
for (int i = 0; i < 5; i++) {
|
||||||
// Adding QUEUE_HIGHEST_PRIORITY block
|
// Adding QUEUE_HIGHEST_PRIORITY block
|
||||||
underReplicatedBlocks.add(new Block(random.nextLong()), 1, 0, 3);
|
underReplicatedBlocks.add(new Block(ThreadLocalRandom.current()
|
||||||
|
.nextLong()), 1, 0, 3);
|
||||||
|
|
||||||
// Adding QUEUE_VERY_UNDER_REPLICATED block
|
// Adding QUEUE_VERY_UNDER_REPLICATED block
|
||||||
underReplicatedBlocks.add(new Block(random.nextLong()), 2, 0, 7);
|
underReplicatedBlocks.add(new Block(ThreadLocalRandom.current()
|
||||||
|
.nextLong()), 2, 0, 7);
|
||||||
|
|
||||||
// Adding QUEUE_REPLICAS_BADLY_DISTRIBUTED block
|
// Adding QUEUE_REPLICAS_BADLY_DISTRIBUTED block
|
||||||
underReplicatedBlocks.add(new Block(random.nextLong()), 6, 0, 6);
|
underReplicatedBlocks.add(new Block(ThreadLocalRandom.current()
|
||||||
|
.nextLong()), 6, 0, 6);
|
||||||
|
|
||||||
// Adding QUEUE_UNDER_REPLICATED block
|
// Adding QUEUE_UNDER_REPLICATED block
|
||||||
underReplicatedBlocks.add(new Block(random.nextLong()), 5, 0, 6);
|
underReplicatedBlocks.add(new Block(ThreadLocalRandom.current()
|
||||||
|
.nextLong()), 5, 0, 6);
|
||||||
|
|
||||||
// Adding QUEUE_WITH_CORRUPT_BLOCKS block
|
// Adding QUEUE_WITH_CORRUPT_BLOCKS block
|
||||||
underReplicatedBlocks.add(new Block(random.nextLong()), 0, 0, 3);
|
underReplicatedBlocks.add(new Block(ThreadLocalRandom.current()
|
||||||
|
.nextLong()), 0, 0, 3);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Choose 6 blocks from UnderReplicatedBlocks. Then it should pick 5 blocks
|
// Choose 6 blocks from UnderReplicatedBlocks. Then it should pick 5 blocks
|
||||||
@ -908,7 +915,8 @@ public void testChooseUnderReplicatedBlocks() throws Exception {
|
|||||||
assertTheChosenBlocks(chosenBlocks, 0, 4, 5, 1, 0);
|
assertTheChosenBlocks(chosenBlocks, 0, 4, 5, 1, 0);
|
||||||
|
|
||||||
// Adding QUEUE_HIGHEST_PRIORITY
|
// Adding QUEUE_HIGHEST_PRIORITY
|
||||||
underReplicatedBlocks.add(new Block(random.nextLong()), 1, 0, 3);
|
underReplicatedBlocks.add(new Block(ThreadLocalRandom.current().nextLong()),
|
||||||
|
1, 0, 3);
|
||||||
|
|
||||||
// Choose 10 blocks from UnderReplicatedBlocks. Then it should pick 1 block from
|
// Choose 10 blocks from UnderReplicatedBlocks. Then it should pick 1 block from
|
||||||
// QUEUE_HIGHEST_PRIORITY, 4 blocks from QUEUE_REPLICAS_BADLY_DISTRIBUTED
|
// QUEUE_HIGHEST_PRIORITY, 4 blocks from QUEUE_REPLICAS_BADLY_DISTRIBUTED
|
||||||
@ -1100,9 +1108,9 @@ public void testGetReplWorkMultiplier() {
|
|||||||
public void testUpdateDoesNotCauseSkippedReplication() {
|
public void testUpdateDoesNotCauseSkippedReplication() {
|
||||||
UnderReplicatedBlocks underReplicatedBlocks = new UnderReplicatedBlocks();
|
UnderReplicatedBlocks underReplicatedBlocks = new UnderReplicatedBlocks();
|
||||||
|
|
||||||
Block block1 = new Block(random.nextLong());
|
Block block1 = new Block(ThreadLocalRandom.current().nextLong());
|
||||||
Block block2 = new Block(random.nextLong());
|
Block block2 = new Block(ThreadLocalRandom.current().nextLong());
|
||||||
Block block3 = new Block(random.nextLong());
|
Block block3 = new Block(ThreadLocalRandom.current().nextLong());
|
||||||
|
|
||||||
// Adding QUEUE_VERY_UNDER_REPLICATED block
|
// Adding QUEUE_VERY_UNDER_REPLICATED block
|
||||||
final int block1CurReplicas = 2;
|
final int block1CurReplicas = 2;
|
||||||
@ -1149,8 +1157,8 @@ public void testAddStoredBlockDoesNotCauseSkippedReplication()
|
|||||||
BlockManager bm = new BlockManager(mockNS, new HdfsConfiguration());
|
BlockManager bm = new BlockManager(mockNS, new HdfsConfiguration());
|
||||||
UnderReplicatedBlocks underReplicatedBlocks = bm.neededReplications;
|
UnderReplicatedBlocks underReplicatedBlocks = bm.neededReplications;
|
||||||
|
|
||||||
Block block1 = new Block(random.nextLong());
|
Block block1 = new Block(ThreadLocalRandom.current().nextLong());
|
||||||
Block block2 = new Block(random.nextLong());
|
Block block2 = new Block(ThreadLocalRandom.current().nextLong());
|
||||||
|
|
||||||
// Adding QUEUE_UNDER_REPLICATED block
|
// Adding QUEUE_UNDER_REPLICATED block
|
||||||
underReplicatedBlocks.add(block1, 0, 1, 1);
|
underReplicatedBlocks.add(block1, 0, 1, 1);
|
||||||
@ -1195,8 +1203,8 @@ public void testAddStoredBlockDoesNotCauseSkippedReplication()
|
|||||||
BlockManager bm = new BlockManager(mockNS, new HdfsConfiguration());
|
BlockManager bm = new BlockManager(mockNS, new HdfsConfiguration());
|
||||||
UnderReplicatedBlocks underReplicatedBlocks = bm.neededReplications;
|
UnderReplicatedBlocks underReplicatedBlocks = bm.neededReplications;
|
||||||
|
|
||||||
Block block1 = new Block(random.nextLong());
|
Block block1 = new Block(ThreadLocalRandom.current().nextLong());
|
||||||
Block block2 = new Block(random.nextLong());
|
Block block2 = new Block(ThreadLocalRandom.current().nextLong());
|
||||||
|
|
||||||
// Adding QUEUE_UNDER_REPLICATED block
|
// Adding QUEUE_UNDER_REPLICATED block
|
||||||
underReplicatedBlocks.add(block1, 0, 1, 1);
|
underReplicatedBlocks.add(block1, 0, 1, 1);
|
||||||
@ -1258,8 +1266,8 @@ public void testupdateNeededReplicationsDoesNotCauseSkippedReplication()
|
|||||||
BlockManager bm = new BlockManager(mockNS, new HdfsConfiguration());
|
BlockManager bm = new BlockManager(mockNS, new HdfsConfiguration());
|
||||||
UnderReplicatedBlocks underReplicatedBlocks = bm.neededReplications;
|
UnderReplicatedBlocks underReplicatedBlocks = bm.neededReplications;
|
||||||
|
|
||||||
Block block1 = new Block(random.nextLong());
|
Block block1 = new Block(ThreadLocalRandom.current().nextLong());
|
||||||
Block block2 = new Block(random.nextLong());
|
Block block2 = new Block(ThreadLocalRandom.current().nextLong());
|
||||||
|
|
||||||
// Adding QUEUE_UNDER_REPLICATED block
|
// Adding QUEUE_UNDER_REPLICATED block
|
||||||
underReplicatedBlocks.add(block1, 0, 1, 1);
|
underReplicatedBlocks.add(block1, 0, 1, 1);
|
||||||
|
@ -31,6 +31,7 @@
|
|||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
@ -46,7 +47,6 @@
|
|||||||
import org.apache.hadoop.hdfs.AppendTestUtil;
|
import org.apache.hadoop.hdfs.AppendTestUtil;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
import org.apache.hadoop.hdfs.DFSUtil;
|
|
||||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
@ -164,11 +164,11 @@ public void testMultipleTruncate() throws IOException {
|
|||||||
fs.mkdirs(dir);
|
fs.mkdirs(dir);
|
||||||
final Path p = new Path(dir, "file");
|
final Path p = new Path(dir, "file");
|
||||||
final byte[] data = new byte[100 * BLOCK_SIZE];
|
final byte[] data = new byte[100 * BLOCK_SIZE];
|
||||||
DFSUtil.getRandom().nextBytes(data);
|
ThreadLocalRandom.current().nextBytes(data);
|
||||||
writeContents(data, data.length, p);
|
writeContents(data, data.length, p);
|
||||||
|
|
||||||
for(int n = data.length; n > 0; ) {
|
for(int n = data.length; n > 0; ) {
|
||||||
final int newLength = DFSUtil.getRandom().nextInt(n);
|
final int newLength = ThreadLocalRandom.current().nextInt(n);
|
||||||
final boolean isReady = fs.truncate(p, newLength);
|
final boolean isReady = fs.truncate(p, newLength);
|
||||||
LOG.info("newLength=" + newLength + ", isReady=" + isReady);
|
LOG.info("newLength=" + newLength + ", isReady=" + isReady);
|
||||||
assertEquals("File must be closed for truncating at the block boundary",
|
assertEquals("File must be closed for truncating at the block boundary",
|
||||||
@ -193,7 +193,7 @@ public void testSnapshotTruncateThenDeleteSnapshot() throws IOException {
|
|||||||
fs.allowSnapshot(dir);
|
fs.allowSnapshot(dir);
|
||||||
final Path p = new Path(dir, "file");
|
final Path p = new Path(dir, "file");
|
||||||
final byte[] data = new byte[BLOCK_SIZE];
|
final byte[] data = new byte[BLOCK_SIZE];
|
||||||
DFSUtil.getRandom().nextBytes(data);
|
ThreadLocalRandom.current().nextBytes(data);
|
||||||
writeContents(data, data.length, p);
|
writeContents(data, data.length, p);
|
||||||
final String snapshot = "s0";
|
final String snapshot = "s0";
|
||||||
fs.createSnapshot(dir, snapshot);
|
fs.createSnapshot(dir, snapshot);
|
||||||
@ -226,7 +226,7 @@ public void testTruncateWithOtherOperations() throws IOException {
|
|||||||
final Path p = new Path(dir, "file");
|
final Path p = new Path(dir, "file");
|
||||||
final byte[] data = new byte[2 * BLOCK_SIZE];
|
final byte[] data = new byte[2 * BLOCK_SIZE];
|
||||||
|
|
||||||
DFSUtil.getRandom().nextBytes(data);
|
ThreadLocalRandom.current().nextBytes(data);
|
||||||
writeContents(data, data.length, p);
|
writeContents(data, data.length, p);
|
||||||
|
|
||||||
final int newLength = data.length - 1;
|
final int newLength = data.length - 1;
|
||||||
|
@ -24,6 +24,7 @@
|
|||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
|
|
||||||
import com.google.common.base.Supplier;
|
import com.google.common.base.Supplier;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
@ -37,7 +38,6 @@
|
|||||||
import org.apache.hadoop.hdfs.AppendTestUtil;
|
import org.apache.hadoop.hdfs.AppendTestUtil;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
import org.apache.hadoop.hdfs.DFSUtil;
|
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
@ -640,7 +640,7 @@ public DatanodeStorageInfo chooseReplicaToDelete(BlockCollection inode,
|
|||||||
Collection<DatanodeStorageInfo> chooseFrom = !first.isEmpty() ? first : second;
|
Collection<DatanodeStorageInfo> chooseFrom = !first.isEmpty() ? first : second;
|
||||||
|
|
||||||
List<DatanodeStorageInfo> l = Lists.newArrayList(chooseFrom);
|
List<DatanodeStorageInfo> l = Lists.newArrayList(chooseFrom);
|
||||||
return l.get(DFSUtil.getRandom().nextInt(l.size()));
|
return l.get(ThreadLocalRandom.current().nextInt(l.size()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -20,6 +20,7 @@
|
|||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
@ -27,7 +28,6 @@
|
|||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.AppendTestUtil;
|
import org.apache.hadoop.hdfs.AppendTestUtil;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.DFSUtil;
|
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.TestFileTruncate;
|
import org.apache.hadoop.hdfs.server.namenode.TestFileTruncate;
|
||||||
@ -73,7 +73,7 @@ public void testMultipleAppendsDuringCatchupTailing() throws Exception {
|
|||||||
Path fileToTruncate = new Path("/FileToTruncate");
|
Path fileToTruncate = new Path("/FileToTruncate");
|
||||||
|
|
||||||
final byte[] data = new byte[1 << 16];
|
final byte[] data = new byte[1 << 16];
|
||||||
DFSUtil.getRandom().nextBytes(data);
|
ThreadLocalRandom.current().nextBytes(data);
|
||||||
final int[] appendPos = AppendTestUtil.randomFilePartition(
|
final int[] appendPos = AppendTestUtil.randomFilePartition(
|
||||||
data.length, COUNT);
|
data.length, COUNT);
|
||||||
final int[] truncatePos = AppendTestUtil.randomFilePartition(
|
final int[] truncatePos = AppendTestUtil.randomFilePartition(
|
||||||
|
@ -27,6 +27,7 @@
|
|||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
@ -34,7 +35,6 @@
|
|||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.commons.logging.impl.Log4JLogger;
|
import org.apache.commons.logging.impl.Log4JLogger;
|
||||||
import org.apache.hadoop.hdfs.DFSUtil;
|
|
||||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.util.ByteArrayManager.Counter;
|
import org.apache.hadoop.hdfs.util.ByteArrayManager.Counter;
|
||||||
import org.apache.hadoop.hdfs.util.ByteArrayManager.CounterMap;
|
import org.apache.hadoop.hdfs.util.ByteArrayManager.CounterMap;
|
||||||
@ -72,7 +72,7 @@ public void testCounter() throws Exception {
|
|||||||
final long countResetTimePeriodMs = 200L;
|
final long countResetTimePeriodMs = 200L;
|
||||||
final Counter c = new Counter(countResetTimePeriodMs);
|
final Counter c = new Counter(countResetTimePeriodMs);
|
||||||
|
|
||||||
final int n = DFSUtil.getRandom().nextInt(512) + 512;
|
final int n = ThreadLocalRandom.current().nextInt(512) + 512;
|
||||||
final List<Future<Integer>> futures = new ArrayList<Future<Integer>>(n);
|
final List<Future<Integer>> futures = new ArrayList<Future<Integer>>(n);
|
||||||
|
|
||||||
final ExecutorService pool = Executors.newFixedThreadPool(32);
|
final ExecutorService pool = Executors.newFixedThreadPool(32);
|
||||||
@ -334,7 +334,7 @@ public void testByteArrayManager() throws Exception {
|
|||||||
public void run() {
|
public void run() {
|
||||||
LOG.info("randomRecycler start");
|
LOG.info("randomRecycler start");
|
||||||
for(int i = 0; shouldRun(); i++) {
|
for(int i = 0; shouldRun(); i++) {
|
||||||
final int j = DFSUtil.getRandom().nextInt(runners.length);
|
final int j = ThreadLocalRandom.current().nextInt(runners.length);
|
||||||
try {
|
try {
|
||||||
runners[j].recycle();
|
runners[j].recycle();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
@ -440,7 +440,7 @@ Future<byte[]> submitAllocate() {
|
|||||||
public byte[] call() throws Exception {
|
public byte[] call() throws Exception {
|
||||||
final int lower = maxArrayLength == ByteArrayManager.MIN_ARRAY_LENGTH?
|
final int lower = maxArrayLength == ByteArrayManager.MIN_ARRAY_LENGTH?
|
||||||
0: maxArrayLength >> 1;
|
0: maxArrayLength >> 1;
|
||||||
final int arrayLength = DFSUtil.getRandom().nextInt(
|
final int arrayLength = ThreadLocalRandom.current().nextInt(
|
||||||
maxArrayLength - lower) + lower + 1;
|
maxArrayLength - lower) + lower + 1;
|
||||||
final byte[] array = bam.newByteArray(arrayLength);
|
final byte[] array = bam.newByteArray(arrayLength);
|
||||||
try {
|
try {
|
||||||
@ -496,7 +496,8 @@ public Integer call() throws Exception {
|
|||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
for(int i = 0; i < n; i++) {
|
for(int i = 0; i < n; i++) {
|
||||||
final boolean isAllocate = DFSUtil.getRandom().nextInt(NUM_RUNNERS) < p;
|
final boolean isAllocate = ThreadLocalRandom.current()
|
||||||
|
.nextInt(NUM_RUNNERS) < p;
|
||||||
if (isAllocate) {
|
if (isAllocate) {
|
||||||
submitAllocate();
|
submitAllocate();
|
||||||
} else {
|
} else {
|
||||||
@ -573,7 +574,6 @@ public static void main(String[] args) throws Exception {
|
|||||||
+ ", nAllocations=" + nAllocations
|
+ ", nAllocations=" + nAllocations
|
||||||
+ ", maxArrays=" + maxArrays);
|
+ ", maxArrays=" + maxArrays);
|
||||||
|
|
||||||
final Random ran = DFSUtil.getRandom();
|
|
||||||
final ByteArrayManager[] impls = {
|
final ByteArrayManager[] impls = {
|
||||||
new ByteArrayManager.NewByteArrayWithoutLimit(),
|
new ByteArrayManager.NewByteArrayWithoutLimit(),
|
||||||
new NewByteArrayWithLimit(maxArrays),
|
new NewByteArrayWithLimit(maxArrays),
|
||||||
@ -590,7 +590,7 @@ public static void main(String[] args) throws Exception {
|
|||||||
for(int j = 0; j < nTrials; j++) {
|
for(int j = 0; j < nTrials; j++) {
|
||||||
final int[] sleepTime = new int[nAllocations];
|
final int[] sleepTime = new int[nAllocations];
|
||||||
for(int k = 0; k < sleepTime.length; k++) {
|
for(int k = 0; k < sleepTime.length; k++) {
|
||||||
sleepTime[k] = ran.nextInt(100);
|
sleepTime[k] = ThreadLocalRandom.current().nextInt(100);
|
||||||
}
|
}
|
||||||
|
|
||||||
final long elapsed = performanceTest(arrayLength, maxArrays, nThreads,
|
final long elapsed = performanceTest(arrayLength, maxArrays, nThreads,
|
||||||
|
@ -22,8 +22,8 @@
|
|||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.lang.reflect.UndeclaredThrowableException;
|
import java.lang.reflect.UndeclaredThrowableException;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
import java.util.Random;
|
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
@ -61,13 +61,6 @@ class SharedCacheUploader implements Callable<Boolean> {
|
|||||||
new FsPermission((short)00555);
|
new FsPermission((short)00555);
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(SharedCacheUploader.class);
|
private static final Log LOG = LogFactory.getLog(SharedCacheUploader.class);
|
||||||
private static final ThreadLocal<Random> randomTl =
|
|
||||||
new ThreadLocal<Random>() {
|
|
||||||
@Override
|
|
||||||
protected Random initialValue() {
|
|
||||||
return new Random(System.nanoTime());
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
private final LocalResource resource;
|
private final LocalResource resource;
|
||||||
private final Path localPath;
|
private final Path localPath;
|
||||||
@ -267,7 +260,7 @@ String computeChecksum(Path path) throws IOException {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private String getTemporaryFileName(Path path) {
|
private String getTemporaryFileName(Path path) {
|
||||||
return path.getName() + "-" + randomTl.get().nextLong();
|
return path.getName() + "-" + ThreadLocalRandom.current().nextLong();
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
|
Loading…
Reference in New Issue
Block a user