HADOOP-12634. Change Lazy Rename Pending Operation Completion of WASB to address case of potential data loss due to partial copy. Contributed by Gaurav Kanade.
This commit is contained in:
parent
67c9780609
commit
978bbdfeb2
@ -1553,6 +1553,10 @@ Release 2.8.0 - UNRELEASED
|
||||
HADOOP-12689. S3 filesystem operations stopped working correctly
|
||||
(Matt Paduano via raviprak)
|
||||
|
||||
HADOOP-12634. Change Lazy Rename Pending Operation Completion of WASB to
|
||||
address case of potential data loss due to partial copy
|
||||
(Gaurav Kanade via cnauroth)
|
||||
|
||||
Release 2.7.3 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -536,45 +536,13 @@ private void finishSingleFileRename(String fileName)
|
||||
Path dstFile = fullPath(dstKey, fileName);
|
||||
boolean srcExists = fs.exists(srcFile);
|
||||
boolean dstExists = fs.exists(dstFile);
|
||||
if (srcExists && !dstExists) {
|
||||
|
||||
if(srcExists) {
|
||||
// Rename gets exclusive access (via a lease) for HBase write-ahead log
|
||||
// (WAL) file processing correctness. See the rename code for details.
|
||||
String srcName = fs.pathToKey(srcFile);
|
||||
String dstName = fs.pathToKey(dstFile);
|
||||
fs.getStoreInterface().rename(srcName, dstName, true, null);
|
||||
} else if (srcExists && dstExists) {
|
||||
|
||||
// Get a lease on source to block write access.
|
||||
String srcName = fs.pathToKey(srcFile);
|
||||
SelfRenewingLease lease = null;
|
||||
try {
|
||||
lease = fs.acquireLease(srcFile);
|
||||
// Delete the file. This will free the lease too.
|
||||
fs.getStoreInterface().delete(srcName, lease);
|
||||
} catch(AzureException e) {
|
||||
String errorCode = "";
|
||||
try {
|
||||
StorageException e2 = (StorageException) e.getCause();
|
||||
errorCode = e2.getErrorCode();
|
||||
} catch(Exception e3) {
|
||||
// do nothing if cast fails
|
||||
}
|
||||
// If the rename already finished do nothing
|
||||
if(!errorCode.equals("BlobNotFound")){
|
||||
throw e;
|
||||
}
|
||||
} finally {
|
||||
try {
|
||||
if(lease != null){
|
||||
lease.free();
|
||||
}
|
||||
} catch(StorageException e) {
|
||||
LOG.warn("Unable to free lease because: " + e.getMessage());
|
||||
}
|
||||
}
|
||||
} else if (!srcExists && dstExists) {
|
||||
|
||||
// The rename already finished, so do nothing.
|
||||
;
|
||||
} else {
|
||||
|
@ -24,6 +24,8 @@
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
|
||||
@ -43,6 +45,26 @@ protected AzureBlobStorageTestAccount createTestAccount() throws Exception {
|
||||
return AzureBlobStorageTestAccount.create();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLazyRenamePendingCanOverwriteExistingFile()
|
||||
throws Exception {
|
||||
final String SRC_FILE_KEY = "srcFile";
|
||||
final String DST_FILE_KEY = "dstFile";
|
||||
Path srcPath = new Path(SRC_FILE_KEY);
|
||||
FSDataOutputStream srcStream = fs.create(srcPath);
|
||||
assertTrue(fs.exists(srcPath));
|
||||
Path dstPath = new Path(DST_FILE_KEY);
|
||||
FSDataOutputStream dstStream = fs.create(dstPath);
|
||||
assertTrue(fs.exists(dstPath));
|
||||
NativeAzureFileSystem nfs = (NativeAzureFileSystem)fs;
|
||||
final String fullSrcKey = nfs.pathToKey(nfs.makeAbsolute(srcPath));
|
||||
final String fullDstKey = nfs.pathToKey(nfs.makeAbsolute(dstPath));
|
||||
nfs.getStoreInterface().rename(fullSrcKey, fullDstKey, true, null);
|
||||
assertTrue(fs.exists(dstPath));
|
||||
assertFalse(fs.exists(srcPath));
|
||||
IOUtils.cleanup(null, srcStream);
|
||||
IOUtils.cleanup(null, dstStream);
|
||||
}
|
||||
/**
|
||||
* Tests fs.delete() function to delete a blob when another blob is holding a
|
||||
* lease on it. Delete if called without a lease should fail if another process
|
||||
|
Loading…
Reference in New Issue
Block a user