HDFS-6355. Fix divide-by-zero, improper use of wall-clock time in BlockPoolSliceScanner (cmccabe)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1594338 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Colin McCabe 2014-05-13 19:24:22 +00:00
parent f3c3d9e0c6
commit dac9028ef9
3 changed files with 26 additions and 22 deletions

View File

@ -466,6 +466,9 @@ Release 2.5.0 - UNRELEASED
HDFS-6305. WebHdfs response decoding may throw RuntimeExceptions (Daryn HDFS-6305. WebHdfs response decoding may throw RuntimeExceptions (Daryn
Sharp via jeagles) Sharp via jeagles)
HDFS-6355. Fix divide-by-zero, improper use of wall-clock time in
BlockPoolSliceScanner (cmccabe)
Release 2.4.1 - UNRELEASED Release 2.4.1 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -97,7 +97,7 @@ class BlockPoolSliceScanner {
private long totalTransientErrors = 0; private long totalTransientErrors = 0;
private final AtomicInteger totalBlocksScannedInLastRun = new AtomicInteger(); // Used for test only private final AtomicInteger totalBlocksScannedInLastRun = new AtomicInteger(); // Used for test only
private long currentPeriodStart = Time.now(); private long currentPeriodStart = Time.monotonicNow();
private long bytesLeft = 0; // Bytes to scan in this period private long bytesLeft = 0; // Bytes to scan in this period
private long totalBytesToScan = 0; private long totalBytesToScan = 0;
private boolean isNewPeriod = true; private boolean isNewPeriod = true;
@ -260,7 +260,7 @@ private synchronized long getNewBlockScanTime() {
long period = Math.min(scanPeriod, long period = Math.min(scanPeriod,
Math.max(blockMap.size(),1) * 600 * 1000L); Math.max(blockMap.size(),1) * 600 * 1000L);
int periodInt = Math.abs((int)period); int periodInt = Math.abs((int)period);
return Time.now() - scanPeriod + return Time.monotonicNow() - scanPeriod +
DFSUtil.getRandom().nextInt(periodInt); DFSUtil.getRandom().nextInt(periodInt);
} }
@ -322,7 +322,7 @@ private synchronized void updateScanStatus(Block block,
info = new BlockScanInfo(block); info = new BlockScanInfo(block);
} }
long now = Time.now(); long now = Time.monotonicNow();
info.lastScanType = type; info.lastScanType = type;
info.lastScanTime = now; info.lastScanTime = now;
info.lastScanOk = scanOk; info.lastScanOk = scanOk;
@ -399,8 +399,9 @@ static LogEntry parseEntry(String line) {
} }
private synchronized void adjustThrottler() { private synchronized void adjustThrottler() {
long timeLeft = currentPeriodStart+scanPeriod - Time.now(); long timeLeft = Math.max(1L,
long bw = Math.max(bytesLeft*1000/timeLeft, MIN_SCAN_RATE); currentPeriodStart + scanPeriod - Time.monotonicNow());
long bw = Math.max((bytesLeft * 1000) / timeLeft, MIN_SCAN_RATE);
throttler.setBandwidth(Math.min(bw, MAX_SCAN_RATE)); throttler.setBandwidth(Math.min(bw, MAX_SCAN_RATE));
} }
@ -523,7 +524,7 @@ int getBlocksScannedInLastRun() {
private boolean assignInitialVerificationTimes() { private boolean assignInitialVerificationTimes() {
//First updates the last verification times from the log file. //First updates the last verification times from the log file.
if (verificationLog != null) { if (verificationLog != null) {
long now = Time.now(); long now = Time.monotonicNow();
RollingLogs.LineIterator logIterator = null; RollingLogs.LineIterator logIterator = null;
try { try {
logIterator = verificationLog.logs.iterator(false); logIterator = verificationLog.logs.iterator(false);
@ -574,7 +575,7 @@ private boolean assignInitialVerificationTimes() {
// Initially spread the block reads over half of scan period // Initially spread the block reads over half of scan period
// so that we don't keep scanning the blocks too quickly when restarted. // so that we don't keep scanning the blocks too quickly when restarted.
long verifyInterval = Math.min(scanPeriod/(2L * numBlocks), 10*60*1000L); long verifyInterval = Math.min(scanPeriod/(2L * numBlocks), 10*60*1000L);
long lastScanTime = Time.now() - scanPeriod; long lastScanTime = Time.monotonicNow() - scanPeriod;
if (!blockInfoSet.isEmpty()) { if (!blockInfoSet.isEmpty()) {
BlockScanInfo info; BlockScanInfo info;
@ -601,16 +602,16 @@ private synchronized void startNewPeriod() {
// reset the byte counts : // reset the byte counts :
bytesLeft = totalBytesToScan; bytesLeft = totalBytesToScan;
currentPeriodStart = Time.now(); currentPeriodStart = Time.monotonicNow();
isNewPeriod = true; isNewPeriod = true;
} }
private synchronized boolean workRemainingInCurrentPeriod() { private synchronized boolean workRemainingInCurrentPeriod() {
if (bytesLeft <= 0 && Time.now() < currentPeriodStart + scanPeriod) { if (bytesLeft <= 0 && Time.monotonicNow() < currentPeriodStart + scanPeriod) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Skipping scan since bytesLeft=" + bytesLeft + ", Start=" + LOG.debug("Skipping scan since bytesLeft=" + bytesLeft + ", Start=" +
currentPeriodStart + ", period=" + scanPeriod + ", now=" + currentPeriodStart + ", period=" + scanPeriod + ", now=" +
Time.now() + " " + blockPoolId); Time.monotonicNow() + " " + blockPoolId);
} }
return false; return false;
} else { } else {
@ -633,7 +634,7 @@ void scanBlockPoolSlice() {
scan(); scan();
} finally { } finally {
totalBlocksScannedInLastRun.set(processedBlocks.size()); totalBlocksScannedInLastRun.set(processedBlocks.size());
lastScanTime.set(Time.now()); lastScanTime.set(Time.monotonicNow());
} }
} }
@ -656,7 +657,7 @@ private void scan() {
while (datanode.shouldRun while (datanode.shouldRun
&& !datanode.blockScanner.blockScannerThread.isInterrupted() && !datanode.blockScanner.blockScannerThread.isInterrupted()
&& datanode.isBPServiceAlive(blockPoolId)) { && datanode.isBPServiceAlive(blockPoolId)) {
long now = Time.now(); long now = Time.monotonicNow();
synchronized (this) { synchronized (this) {
if ( now >= (currentPeriodStart + scanPeriod)) { if ( now >= (currentPeriodStart + scanPeriod)) {
startNewPeriod(); startNewPeriod();
@ -714,7 +715,7 @@ synchronized void printBlockReport(StringBuilder buffer,
int total = blockInfoSet.size(); int total = blockInfoSet.size();
long now = Time.now(); long now = Time.monotonicNow();
Date date = new Date(); Date date = new Date();

View File

@ -87,15 +87,15 @@ private static long waitForVerification(int infoPort, FileSystem fs,
throws IOException, TimeoutException { throws IOException, TimeoutException {
URL url = new URL("http://localhost:" + infoPort + URL url = new URL("http://localhost:" + infoPort +
"/blockScannerReport?listblocks"); "/blockScannerReport?listblocks");
long lastWarnTime = Time.now(); long lastWarnTime = Time.monotonicNow();
if (newTime <= 0) newTime = 1L; if (newTime <= 0) newTime = 1L;
long verificationTime = 0; long verificationTime = 0;
String block = DFSTestUtil.getFirstBlock(fs, file).getBlockName(); String block = DFSTestUtil.getFirstBlock(fs, file).getBlockName();
long failtime = (timeout <= 0) ? Long.MAX_VALUE long failtime = (timeout <= 0) ? Long.MAX_VALUE
: Time.now() + timeout; : Time.monotonicNow() + timeout;
while (verificationTime < newTime) { while (verificationTime < newTime) {
if (failtime < Time.now()) { if (failtime < Time.monotonicNow()) {
throw new TimeoutException("failed to achieve block verification after " throw new TimeoutException("failed to achieve block verification after "
+ timeout + " msec. Current verification timestamp = " + timeout + " msec. Current verification timestamp = "
+ verificationTime + ", requested verification time > " + verificationTime + ", requested verification time > "
@ -118,7 +118,7 @@ private static long waitForVerification(int infoPort, FileSystem fs,
} }
if (verificationTime < newTime) { if (verificationTime < newTime) {
long now = Time.now(); long now = Time.monotonicNow();
if ((now - lastWarnTime) >= 5*1000) { if ((now - lastWarnTime) >= 5*1000) {
LOG.info("Waiting for verification of " + block); LOG.info("Waiting for verification of " + block);
lastWarnTime = now; lastWarnTime = now;
@ -134,7 +134,7 @@ private static long waitForVerification(int infoPort, FileSystem fs,
@Test @Test
public void testDatanodeBlockScanner() throws IOException, TimeoutException { public void testDatanodeBlockScanner() throws IOException, TimeoutException {
long startTime = Time.now(); long startTime = Time.monotonicNow();
Configuration conf = new HdfsConfiguration(); Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
@ -344,7 +344,7 @@ public void testTruncatedBlockReport() throws Exception {
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 3L); conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 3L);
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, false); conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, false);
long startTime = Time.now(); long startTime = Time.monotonicNow();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(REPLICATION_FACTOR) .numDataNodes(REPLICATION_FACTOR)
.build(); .build();
@ -428,10 +428,10 @@ static boolean changeReplicaLength(ExtendedBlock blk, int dnIndex,
private static void waitForBlockDeleted(ExtendedBlock blk, int dnIndex, private static void waitForBlockDeleted(ExtendedBlock blk, int dnIndex,
long timeout) throws TimeoutException, InterruptedException { long timeout) throws TimeoutException, InterruptedException {
File blockFile = MiniDFSCluster.getBlockFile(dnIndex, blk); File blockFile = MiniDFSCluster.getBlockFile(dnIndex, blk);
long failtime = Time.now() long failtime = Time.monotonicNow()
+ ((timeout > 0) ? timeout : Long.MAX_VALUE); + ((timeout > 0) ? timeout : Long.MAX_VALUE);
while (blockFile != null && blockFile.exists()) { while (blockFile != null && blockFile.exists()) {
if (failtime < Time.now()) { if (failtime < Time.monotonicNow()) {
throw new TimeoutException("waited too long for blocks to be deleted: " throw new TimeoutException("waited too long for blocks to be deleted: "
+ blockFile.getPath() + (blockFile.exists() ? " still exists; " : " is absent; ")); + blockFile.getPath() + (blockFile.exists() ? " still exists; " : " is absent; "));
} }
@ -462,7 +462,7 @@ private static void testReplicaInfoParsingSingle(String subDirPath, int[] expect
@Test @Test
public void testDuplicateScans() throws Exception { public void testDuplicateScans() throws Exception {
long startTime = Time.now(); long startTime = Time.monotonicNow();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(new Configuration()) MiniDFSCluster cluster = new MiniDFSCluster.Builder(new Configuration())
.numDataNodes(1).build(); .numDataNodes(1).build();
FileSystem fs = null; FileSystem fs = null;