HDFS-15569. Speed up the Storage#doRecover during datanode rolling upgrade. Contributed by Hemanth Boyina.

This commit is contained in:
hemanthboyina 2020-12-22 17:27:31 +05:30
parent 4ae561bcb4
commit 16a20503ca

View File

@ -801,8 +801,7 @@ public void doRecover(StorageState curState) throws IOException {
case RECOVER_UPGRADE: // mv previous.tmp -> current case RECOVER_UPGRADE: // mv previous.tmp -> current
LOG.info("Recovering storage directory {} from previous upgrade", LOG.info("Recovering storage directory {} from previous upgrade",
rootPath); rootPath);
if (curDir.exists()) deleteAsync(curDir);
deleteDir(curDir);
rename(getPreviousTmp(), curDir); rename(getPreviousTmp(), curDir);
return; return;
case COMPLETE_ROLLBACK: // rm removed.tmp case COMPLETE_ROLLBACK: // rm removed.tmp
@ -818,21 +817,19 @@ public void doRecover(StorageState curState) throws IOException {
case COMPLETE_FINALIZE: // rm finalized.tmp case COMPLETE_FINALIZE: // rm finalized.tmp
LOG.info("Completing previous finalize for storage directory {}", LOG.info("Completing previous finalize for storage directory {}",
rootPath); rootPath);
deleteDir(getFinalizedTmp()); deleteAsync(getFinalizedTmp());
return; return;
case COMPLETE_CHECKPOINT: // mv lastcheckpoint.tmp -> previous.checkpoint case COMPLETE_CHECKPOINT: // mv lastcheckpoint.tmp -> previous.checkpoint
LOG.info("Completing previous checkpoint for storage directory {}", LOG.info("Completing previous checkpoint for storage directory {}",
rootPath); rootPath);
File prevCkptDir = getPreviousCheckpoint(); File prevCkptDir = getPreviousCheckpoint();
if (prevCkptDir.exists()) deleteAsync(prevCkptDir);
deleteDir(prevCkptDir);
rename(getLastCheckpointTmp(), prevCkptDir); rename(getLastCheckpointTmp(), prevCkptDir);
return; return;
case RECOVER_CHECKPOINT: // mv lastcheckpoint.tmp -> current case RECOVER_CHECKPOINT: // mv lastcheckpoint.tmp -> current
LOG.info("Recovering storage directory {} from failed checkpoint", LOG.info("Recovering storage directory {} from failed checkpoint",
rootPath); rootPath);
if (curDir.exists()) deleteAsync(curDir);
deleteDir(curDir);
rename(getLastCheckpointTmp(), curDir); rename(getLastCheckpointTmp(), curDir);
return; return;
default: default:
@ -840,7 +837,30 @@ public void doRecover(StorageState curState) throws IOException {
+ " for storage directory: " + rootPath); + " for storage directory: " + rootPath);
} }
} }
/**
* Rename the curDir to curDir.tmp and delete the curDir.tmp parallely.
* @throws IOException
*/
private void deleteAsync(File curDir) throws IOException {
if (curDir.exists()) {
File curTmp = new File(curDir.getParent(), curDir.getName() + ".tmp");
if (curTmp.exists()) {
deleteDir(curTmp);
}
rename(curDir, curTmp);
new Thread("Async Delete Current.tmp") {
public void run() {
try {
deleteDir(curTmp);
} catch (IOException e) {
LOG.warn("Deleting storage directory {} failed", curTmp);
}
}
}.start();
}
}
/** /**
* @return true if the storage directory should prompt the user prior * @return true if the storage directory should prompt the user prior
* to formatting (i.e if the directory appears to contain some data) * to formatting (i.e if the directory appears to contain some data)