MAPREDUCE-6718. add progress log to JHS during startup (haibochen via rkanter)
This commit is contained in:
parent
bcb2528a51
commit
0d6778d800
@ -302,6 +302,10 @@ public HistoryFileInfo get(JobId jobId) {
|
|||||||
public boolean isFull() {
|
public boolean isFull() {
|
||||||
return cache.size() >= maxSize;
|
return cache.size() >= maxSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int size() {
|
||||||
|
return cache.size();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -612,6 +616,9 @@ void createHistoryDirs(Clock clock, long intervalCheckMillis,
|
|||||||
while (!done &&
|
while (!done &&
|
||||||
((timeOutMillis == -1) || (clock.getTime() - start < timeOutMillis))) {
|
((timeOutMillis == -1) || (clock.getTime() - start < timeOutMillis))) {
|
||||||
done = tryCreatingHistoryDirs(counter++ % 3 == 0); // log every 3 attempts, 30sec
|
done = tryCreatingHistoryDirs(counter++ % 3 == 0); // log every 3 attempts, 30sec
|
||||||
|
if (done) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
try {
|
try {
|
||||||
Thread.sleep(intervalCheckMillis);
|
Thread.sleep(intervalCheckMillis);
|
||||||
} catch (InterruptedException ex) {
|
} catch (InterruptedException ex) {
|
||||||
@ -760,15 +767,29 @@ void initExisting() throws IOException {
|
|||||||
List<FileStatus> timestampedDirList = findTimestampedDirectories();
|
List<FileStatus> timestampedDirList = findTimestampedDirectories();
|
||||||
// Sort first just so insertion is in a consistent order
|
// Sort first just so insertion is in a consistent order
|
||||||
Collections.sort(timestampedDirList);
|
Collections.sort(timestampedDirList);
|
||||||
|
LOG.info("Found " + timestampedDirList.size() + " directories to load");
|
||||||
for (FileStatus fs : timestampedDirList) {
|
for (FileStatus fs : timestampedDirList) {
|
||||||
// TODO Could verify the correct format for these directories.
|
// TODO Could verify the correct format for these directories.
|
||||||
addDirectoryToSerialNumberIndex(fs.getPath());
|
addDirectoryToSerialNumberIndex(fs.getPath());
|
||||||
}
|
}
|
||||||
|
final double maxCacheSize = (double) jobListCache.maxSize;
|
||||||
|
int prevCacheSize = jobListCache.size();
|
||||||
for (int i= timestampedDirList.size() - 1;
|
for (int i= timestampedDirList.size() - 1;
|
||||||
i >= 0 && !jobListCache.isFull(); i--) {
|
i >= 0 && !jobListCache.isFull(); i--) {
|
||||||
FileStatus fs = timestampedDirList.get(i);
|
FileStatus fs = timestampedDirList.get(i);
|
||||||
addDirectoryToJobListCache(fs.getPath());
|
addDirectoryToJobListCache(fs.getPath());
|
||||||
|
|
||||||
|
int currCacheSize = jobListCache.size();
|
||||||
|
if((currCacheSize - prevCacheSize)/maxCacheSize >= 0.05) {
|
||||||
|
LOG.info(currCacheSize * 100.0 / maxCacheSize +
|
||||||
|
"% of cache is loaded.");
|
||||||
}
|
}
|
||||||
|
prevCacheSize = currCacheSize;
|
||||||
|
}
|
||||||
|
final double loadedPercent = maxCacheSize == 0.0 ?
|
||||||
|
100 : prevCacheSize * 100.0 / maxCacheSize;
|
||||||
|
LOG.info("Existing job initialization finished. " +
|
||||||
|
loadedPercent + "% of cache is occupied.");
|
||||||
}
|
}
|
||||||
|
|
||||||
private void removeDirectoryFromSerialNumberIndex(Path serialDirPath) {
|
private void removeDirectoryFromSerialNumberIndex(Path serialDirPath) {
|
||||||
|
Loading…
Reference in New Issue
Block a user