HDFS-7609. Avoid retry cache collision when Standby NameNode loading edits. Contributed by Ming Ma.
This commit is contained in:
parent
7366e42563
commit
7817674a3a
@ -822,6 +822,9 @@ Release 2.8.0 - UNRELEASED
|
|||||||
HDFS-7401. Add block info to DFSInputStream' WARN message when it adds
|
HDFS-7401. Add block info to DFSInputStream' WARN message when it adds
|
||||||
node to deadNodes (Arshad Mohammad via vinayakumarb)
|
node to deadNodes (Arshad Mohammad via vinayakumarb)
|
||||||
|
|
||||||
|
HDFS-7609. Avoid retry cache collision when Standby NameNode loading edits.
|
||||||
|
(Ming Ma via jing9)
|
||||||
|
|
||||||
Release 2.7.1 - UNRELEASED
|
Release 2.7.1 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -1887,7 +1887,6 @@ private GetBlockLocationsResult getBlockLocationsInt(
|
|||||||
*/
|
*/
|
||||||
void concat(String target, String [] srcs, boolean logRetryCache)
|
void concat(String target, String [] srcs, boolean logRetryCache)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
checkOperation(OperationCategory.WRITE);
|
|
||||||
waitForLoadingFSImage();
|
waitForLoadingFSImage();
|
||||||
HdfsFileStatus stat = null;
|
HdfsFileStatus stat = null;
|
||||||
boolean success = false;
|
boolean success = false;
|
||||||
@ -2162,7 +2161,6 @@ void createSymlink(String target, String link,
|
|||||||
}
|
}
|
||||||
waitForLoadingFSImage();
|
waitForLoadingFSImage();
|
||||||
HdfsFileStatus auditStat = null;
|
HdfsFileStatus auditStat = null;
|
||||||
checkOperation(OperationCategory.WRITE);
|
|
||||||
writeLock();
|
writeLock();
|
||||||
try {
|
try {
|
||||||
checkOperation(OperationCategory.WRITE);
|
checkOperation(OperationCategory.WRITE);
|
||||||
@ -2379,7 +2377,6 @@ private HdfsFileStatus startFileInt(final String src,
|
|||||||
throw new InvalidPathException(src);
|
throw new InvalidPathException(src);
|
||||||
}
|
}
|
||||||
blockManager.verifyReplication(src, replication, clientMachine);
|
blockManager.verifyReplication(src, replication, clientMachine);
|
||||||
checkOperation(OperationCategory.WRITE);
|
|
||||||
if (blockSize < minBlockSize) {
|
if (blockSize < minBlockSize) {
|
||||||
throw new IOException("Specified block size is less than configured" +
|
throw new IOException("Specified block size is less than configured" +
|
||||||
" minimum value (" + DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY
|
" minimum value (" + DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY
|
||||||
@ -2786,7 +2783,6 @@ private LastBlockWithStatus appendFileInt(final String srcArg, String holder,
|
|||||||
LocatedBlock lb = null;
|
LocatedBlock lb = null;
|
||||||
HdfsFileStatus stat = null;
|
HdfsFileStatus stat = null;
|
||||||
FSPermissionChecker pc = getPermissionChecker();
|
FSPermissionChecker pc = getPermissionChecker();
|
||||||
checkOperation(OperationCategory.WRITE);
|
|
||||||
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
|
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
|
||||||
writeLock();
|
writeLock();
|
||||||
try {
|
try {
|
||||||
@ -3081,7 +3077,6 @@ boolean checkFileProgress(String src, INodeFile v, boolean checkall) {
|
|||||||
boolean renameTo(String src, String dst, boolean logRetryCache)
|
boolean renameTo(String src, String dst, boolean logRetryCache)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
waitForLoadingFSImage();
|
waitForLoadingFSImage();
|
||||||
checkOperation(OperationCategory.WRITE);
|
|
||||||
FSDirRenameOp.RenameOldResult ret = null;
|
FSDirRenameOp.RenameOldResult ret = null;
|
||||||
writeLock();
|
writeLock();
|
||||||
try {
|
try {
|
||||||
@ -3107,7 +3102,6 @@ void renameTo(final String src, final String dst,
|
|||||||
boolean logRetryCache, Options.Rename... options)
|
boolean logRetryCache, Options.Rename... options)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
waitForLoadingFSImage();
|
waitForLoadingFSImage();
|
||||||
checkOperation(OperationCategory.WRITE);
|
|
||||||
Map.Entry<BlocksMapUpdateInfo, HdfsFileStatus> res = null;
|
Map.Entry<BlocksMapUpdateInfo, HdfsFileStatus> res = null;
|
||||||
writeLock();
|
writeLock();
|
||||||
try {
|
try {
|
||||||
@ -3144,7 +3138,6 @@ void renameTo(final String src, final String dst,
|
|||||||
boolean delete(String src, boolean recursive, boolean logRetryCache)
|
boolean delete(String src, boolean recursive, boolean logRetryCache)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
waitForLoadingFSImage();
|
waitForLoadingFSImage();
|
||||||
checkOperation(OperationCategory.WRITE);
|
|
||||||
BlocksMapUpdateInfo toRemovedBlocks = null;
|
BlocksMapUpdateInfo toRemovedBlocks = null;
|
||||||
writeLock();
|
writeLock();
|
||||||
boolean ret = false;
|
boolean ret = false;
|
||||||
@ -5762,8 +5755,6 @@ void updatePipeline(
|
|||||||
String clientName, ExtendedBlock oldBlock, ExtendedBlock newBlock,
|
String clientName, ExtendedBlock oldBlock, ExtendedBlock newBlock,
|
||||||
DatanodeID[] newNodes, String[] newStorageIDs, boolean logRetryCache)
|
DatanodeID[] newNodes, String[] newStorageIDs, boolean logRetryCache)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
checkOperation(OperationCategory.WRITE);
|
|
||||||
|
|
||||||
LOG.info("updatePipeline(" + oldBlock.getLocalBlock()
|
LOG.info("updatePipeline(" + oldBlock.getLocalBlock()
|
||||||
+ ", newGS=" + newBlock.getGenerationStamp()
|
+ ", newGS=" + newBlock.getGenerationStamp()
|
||||||
+ ", newLength=" + newBlock.getNumBytes()
|
+ ", newLength=" + newBlock.getNumBytes()
|
||||||
@ -6744,7 +6735,6 @@ String createSnapshot(String snapshotRoot, String snapshotName,
|
|||||||
void renameSnapshot(
|
void renameSnapshot(
|
||||||
String path, String snapshotOldName, String snapshotNewName,
|
String path, String snapshotOldName, String snapshotNewName,
|
||||||
boolean logRetryCache) throws IOException {
|
boolean logRetryCache) throws IOException {
|
||||||
checkOperation(OperationCategory.WRITE);
|
|
||||||
boolean success = false;
|
boolean success = false;
|
||||||
writeLock();
|
writeLock();
|
||||||
try {
|
try {
|
||||||
@ -6832,7 +6822,6 @@ SnapshotDiffReport getSnapshotDiffReport(String path,
|
|||||||
*/
|
*/
|
||||||
void deleteSnapshot(String snapshotRoot, String snapshotName,
|
void deleteSnapshot(String snapshotRoot, String snapshotName,
|
||||||
boolean logRetryCache) throws IOException {
|
boolean logRetryCache) throws IOException {
|
||||||
checkOperation(OperationCategory.WRITE);
|
|
||||||
boolean success = false;
|
boolean success = false;
|
||||||
writeLock();
|
writeLock();
|
||||||
BlocksMapUpdateInfo blocksToBeDeleted = null;
|
BlocksMapUpdateInfo blocksToBeDeleted = null;
|
||||||
@ -7057,7 +7046,6 @@ void finalizeRollingUpgradeInternal(long finalizeTime) {
|
|||||||
long addCacheDirective(CacheDirectiveInfo directive,
|
long addCacheDirective(CacheDirectiveInfo directive,
|
||||||
EnumSet<CacheFlag> flags, boolean logRetryCache)
|
EnumSet<CacheFlag> flags, boolean logRetryCache)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
checkOperation(OperationCategory.WRITE);
|
|
||||||
CacheDirectiveInfo effectiveDirective = null;
|
CacheDirectiveInfo effectiveDirective = null;
|
||||||
if (!flags.contains(CacheFlag.FORCE)) {
|
if (!flags.contains(CacheFlag.FORCE)) {
|
||||||
cacheManager.waitForRescanIfNeeded();
|
cacheManager.waitForRescanIfNeeded();
|
||||||
@ -7085,7 +7073,6 @@ long addCacheDirective(CacheDirectiveInfo directive,
|
|||||||
|
|
||||||
void modifyCacheDirective(CacheDirectiveInfo directive,
|
void modifyCacheDirective(CacheDirectiveInfo directive,
|
||||||
EnumSet<CacheFlag> flags, boolean logRetryCache) throws IOException {
|
EnumSet<CacheFlag> flags, boolean logRetryCache) throws IOException {
|
||||||
checkOperation(OperationCategory.WRITE);
|
|
||||||
boolean success = false;
|
boolean success = false;
|
||||||
if (!flags.contains(CacheFlag.FORCE)) {
|
if (!flags.contains(CacheFlag.FORCE)) {
|
||||||
cacheManager.waitForRescanIfNeeded();
|
cacheManager.waitForRescanIfNeeded();
|
||||||
@ -7109,7 +7096,6 @@ void modifyCacheDirective(CacheDirectiveInfo directive,
|
|||||||
}
|
}
|
||||||
|
|
||||||
void removeCacheDirective(long id, boolean logRetryCache) throws IOException {
|
void removeCacheDirective(long id, boolean logRetryCache) throws IOException {
|
||||||
checkOperation(OperationCategory.WRITE);
|
|
||||||
boolean success = false;
|
boolean success = false;
|
||||||
writeLock();
|
writeLock();
|
||||||
try {
|
try {
|
||||||
@ -7148,7 +7134,6 @@ BatchedListEntries<CacheDirectiveEntry> listCacheDirectives(
|
|||||||
|
|
||||||
void addCachePool(CachePoolInfo req, boolean logRetryCache)
|
void addCachePool(CachePoolInfo req, boolean logRetryCache)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
checkOperation(OperationCategory.WRITE);
|
|
||||||
writeLock();
|
writeLock();
|
||||||
boolean success = false;
|
boolean success = false;
|
||||||
String poolInfoStr = null;
|
String poolInfoStr = null;
|
||||||
@ -7170,7 +7155,6 @@ void addCachePool(CachePoolInfo req, boolean logRetryCache)
|
|||||||
|
|
||||||
void modifyCachePool(CachePoolInfo req, boolean logRetryCache)
|
void modifyCachePool(CachePoolInfo req, boolean logRetryCache)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
checkOperation(OperationCategory.WRITE);
|
|
||||||
writeLock();
|
writeLock();
|
||||||
boolean success = false;
|
boolean success = false;
|
||||||
try {
|
try {
|
||||||
@ -7192,7 +7176,6 @@ void modifyCachePool(CachePoolInfo req, boolean logRetryCache)
|
|||||||
|
|
||||||
void removeCachePool(String cachePoolName, boolean logRetryCache)
|
void removeCachePool(String cachePoolName, boolean logRetryCache)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
checkOperation(OperationCategory.WRITE);
|
|
||||||
writeLock();
|
writeLock();
|
||||||
boolean success = false;
|
boolean success = false;
|
||||||
try {
|
try {
|
||||||
@ -7387,7 +7370,6 @@ private void createEncryptionZoneInt(final String srcArg, String cipher,
|
|||||||
String src = srcArg;
|
String src = srcArg;
|
||||||
HdfsFileStatus resultingStat = null;
|
HdfsFileStatus resultingStat = null;
|
||||||
checkSuperuserPrivilege();
|
checkSuperuserPrivilege();
|
||||||
checkOperation(OperationCategory.WRITE);
|
|
||||||
final byte[][] pathComponents =
|
final byte[][] pathComponents =
|
||||||
FSDirectory.getPathComponentsForReservedPath(src);
|
FSDirectory.getPathComponentsForReservedPath(src);
|
||||||
FSPermissionChecker pc = getPermissionChecker();
|
FSPermissionChecker pc = getPermissionChecker();
|
||||||
@ -7473,7 +7455,6 @@ BatchedListEntries<EncryptionZone> listEncryptionZones(long prevId)
|
|||||||
void setXAttr(String src, XAttr xAttr, EnumSet<XAttrSetFlag> flag,
|
void setXAttr(String src, XAttr xAttr, EnumSet<XAttrSetFlag> flag,
|
||||||
boolean logRetryCache)
|
boolean logRetryCache)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
checkOperation(OperationCategory.WRITE);
|
|
||||||
HdfsFileStatus auditStat = null;
|
HdfsFileStatus auditStat = null;
|
||||||
writeLock();
|
writeLock();
|
||||||
try {
|
try {
|
||||||
@ -7521,7 +7502,6 @@ List<XAttr> listXAttrs(String src) throws IOException {
|
|||||||
|
|
||||||
void removeXAttr(String src, XAttr xAttr, boolean logRetryCache)
|
void removeXAttr(String src, XAttr xAttr, boolean logRetryCache)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
checkOperation(OperationCategory.WRITE);
|
|
||||||
HdfsFileStatus auditStat = null;
|
HdfsFileStatus auditStat = null;
|
||||||
writeLock();
|
writeLock();
|
||||||
try {
|
try {
|
||||||
|
@ -616,7 +616,7 @@ public HdfsFileStatus create(String src, FsPermission masked,
|
|||||||
throw new IOException("create: Pathname too long. Limit "
|
throw new IOException("create: Pathname too long. Limit "
|
||||||
+ MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
|
+ MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
|
||||||
}
|
}
|
||||||
|
namesystem.checkOperation(OperationCategory.WRITE);
|
||||||
CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache, null);
|
CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache, null);
|
||||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||||
return (HdfsFileStatus) cacheEntry.getPayload();
|
return (HdfsFileStatus) cacheEntry.getPayload();
|
||||||
@ -647,6 +647,7 @@ public LastBlockWithStatus append(String src, String clientName,
|
|||||||
stateChangeLog.debug("*DIR* NameNode.append: file "
|
stateChangeLog.debug("*DIR* NameNode.append: file "
|
||||||
+src+" for "+clientName+" at "+clientMachine);
|
+src+" for "+clientName+" at "+clientMachine);
|
||||||
}
|
}
|
||||||
|
namesystem.checkOperation(OperationCategory.WRITE);
|
||||||
CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache,
|
CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache,
|
||||||
null);
|
null);
|
||||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||||
@ -794,6 +795,7 @@ public void updatePipeline(String clientName, ExtendedBlock oldBlock,
|
|||||||
ExtendedBlock newBlock, DatanodeID[] newNodes, String[] newStorageIDs)
|
ExtendedBlock newBlock, DatanodeID[] newNodes, String[] newStorageIDs)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
checkNNStartup();
|
checkNNStartup();
|
||||||
|
namesystem.checkOperation(OperationCategory.WRITE);
|
||||||
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
||||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||||
return; // Return previous response
|
return; // Return previous response
|
||||||
@ -838,7 +840,7 @@ public boolean rename(String src, String dst) throws IOException {
|
|||||||
throw new IOException("rename: Pathname too long. Limit "
|
throw new IOException("rename: Pathname too long. Limit "
|
||||||
+ MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
|
+ MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
|
||||||
}
|
}
|
||||||
|
namesystem.checkOperation(OperationCategory.WRITE);
|
||||||
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
||||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||||
return true; // Return previous response
|
return true; // Return previous response
|
||||||
@ -859,6 +861,7 @@ public boolean rename(String src, String dst) throws IOException {
|
|||||||
@Override // ClientProtocol
|
@Override // ClientProtocol
|
||||||
public void concat(String trg, String[] src) throws IOException {
|
public void concat(String trg, String[] src) throws IOException {
|
||||||
checkNNStartup();
|
checkNNStartup();
|
||||||
|
namesystem.checkOperation(OperationCategory.WRITE);
|
||||||
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
||||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||||
return; // Return previous response
|
return; // Return previous response
|
||||||
@ -884,6 +887,7 @@ public void rename2(String src, String dst, Options.Rename... options)
|
|||||||
throw new IOException("rename: Pathname too long. Limit "
|
throw new IOException("rename: Pathname too long. Limit "
|
||||||
+ MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
|
+ MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
|
||||||
}
|
}
|
||||||
|
namesystem.checkOperation(OperationCategory.WRITE);
|
||||||
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
||||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||||
return; // Return previous response
|
return; // Return previous response
|
||||||
@ -922,6 +926,7 @@ public boolean delete(String src, boolean recursive) throws IOException {
|
|||||||
stateChangeLog.debug("*DIR* Namenode.delete: src=" + src
|
stateChangeLog.debug("*DIR* Namenode.delete: src=" + src
|
||||||
+ ", recursive=" + recursive);
|
+ ", recursive=" + recursive);
|
||||||
}
|
}
|
||||||
|
namesystem.checkOperation(OperationCategory.WRITE);
|
||||||
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
||||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||||
return true; // Return previous response
|
return true; // Return previous response
|
||||||
@ -1207,6 +1212,7 @@ public void setTimes(String src, long mtime, long atime)
|
|||||||
public void createSymlink(String target, String link, FsPermission dirPerms,
|
public void createSymlink(String target, String link, FsPermission dirPerms,
|
||||||
boolean createParent) throws IOException {
|
boolean createParent) throws IOException {
|
||||||
checkNNStartup();
|
checkNNStartup();
|
||||||
|
namesystem.checkOperation(OperationCategory.WRITE);
|
||||||
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
||||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||||
return; // Return previous response
|
return; // Return previous response
|
||||||
@ -1537,6 +1543,7 @@ public String createSnapshot(String snapshotRoot, String snapshotName)
|
|||||||
throw new IOException("createSnapshot: Pathname too long. Limit "
|
throw new IOException("createSnapshot: Pathname too long. Limit "
|
||||||
+ MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
|
+ MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
|
||||||
}
|
}
|
||||||
|
namesystem.checkOperation(OperationCategory.WRITE);
|
||||||
CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache,
|
CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache,
|
||||||
null);
|
null);
|
||||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||||
@ -1558,6 +1565,7 @@ public String createSnapshot(String snapshotRoot, String snapshotName)
|
|||||||
public void deleteSnapshot(String snapshotRoot, String snapshotName)
|
public void deleteSnapshot(String snapshotRoot, String snapshotName)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
checkNNStartup();
|
checkNNStartup();
|
||||||
|
namesystem.checkOperation(OperationCategory.WRITE);
|
||||||
metrics.incrDeleteSnapshotOps();
|
metrics.incrDeleteSnapshotOps();
|
||||||
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
||||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||||
@ -1596,6 +1604,7 @@ public void renameSnapshot(String snapshotRoot, String snapshotOldName,
|
|||||||
if (snapshotNewName == null || snapshotNewName.isEmpty()) {
|
if (snapshotNewName == null || snapshotNewName.isEmpty()) {
|
||||||
throw new IOException("The new snapshot name is null or empty.");
|
throw new IOException("The new snapshot name is null or empty.");
|
||||||
}
|
}
|
||||||
|
namesystem.checkOperation(OperationCategory.WRITE);
|
||||||
metrics.incrRenameSnapshotOps();
|
metrics.incrRenameSnapshotOps();
|
||||||
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
||||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||||
@ -1635,6 +1644,7 @@ public SnapshotDiffReport getSnapshotDiffReport(String snapshotRoot,
|
|||||||
public long addCacheDirective(
|
public long addCacheDirective(
|
||||||
CacheDirectiveInfo path, EnumSet<CacheFlag> flags) throws IOException {
|
CacheDirectiveInfo path, EnumSet<CacheFlag> flags) throws IOException {
|
||||||
checkNNStartup();
|
checkNNStartup();
|
||||||
|
namesystem.checkOperation(OperationCategory.WRITE);
|
||||||
CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion
|
CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion
|
||||||
(retryCache, null);
|
(retryCache, null);
|
||||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||||
@ -1656,6 +1666,7 @@ public long addCacheDirective(
|
|||||||
public void modifyCacheDirective(
|
public void modifyCacheDirective(
|
||||||
CacheDirectiveInfo directive, EnumSet<CacheFlag> flags) throws IOException {
|
CacheDirectiveInfo directive, EnumSet<CacheFlag> flags) throws IOException {
|
||||||
checkNNStartup();
|
checkNNStartup();
|
||||||
|
namesystem.checkOperation(OperationCategory.WRITE);
|
||||||
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
||||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||||
return;
|
return;
|
||||||
@ -1673,6 +1684,7 @@ public void modifyCacheDirective(
|
|||||||
@Override // ClientProtocol
|
@Override // ClientProtocol
|
||||||
public void removeCacheDirective(long id) throws IOException {
|
public void removeCacheDirective(long id) throws IOException {
|
||||||
checkNNStartup();
|
checkNNStartup();
|
||||||
|
namesystem.checkOperation(OperationCategory.WRITE);
|
||||||
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
||||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||||
return;
|
return;
|
||||||
@ -1699,6 +1711,7 @@ public BatchedEntries<CacheDirectiveEntry> listCacheDirectives(long prevId,
|
|||||||
@Override //ClientProtocol
|
@Override //ClientProtocol
|
||||||
public void addCachePool(CachePoolInfo info) throws IOException {
|
public void addCachePool(CachePoolInfo info) throws IOException {
|
||||||
checkNNStartup();
|
checkNNStartup();
|
||||||
|
namesystem.checkOperation(OperationCategory.WRITE);
|
||||||
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
||||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||||
return; // Return previous response
|
return; // Return previous response
|
||||||
@ -1715,6 +1728,7 @@ public void addCachePool(CachePoolInfo info) throws IOException {
|
|||||||
@Override // ClientProtocol
|
@Override // ClientProtocol
|
||||||
public void modifyCachePool(CachePoolInfo info) throws IOException {
|
public void modifyCachePool(CachePoolInfo info) throws IOException {
|
||||||
checkNNStartup();
|
checkNNStartup();
|
||||||
|
namesystem.checkOperation(OperationCategory.WRITE);
|
||||||
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
||||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||||
return; // Return previous response
|
return; // Return previous response
|
||||||
@ -1731,6 +1745,7 @@ public void modifyCachePool(CachePoolInfo info) throws IOException {
|
|||||||
@Override // ClientProtocol
|
@Override // ClientProtocol
|
||||||
public void removeCachePool(String cachePoolName) throws IOException {
|
public void removeCachePool(String cachePoolName) throws IOException {
|
||||||
checkNNStartup();
|
checkNNStartup();
|
||||||
|
namesystem.checkOperation(OperationCategory.WRITE);
|
||||||
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
||||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||||
return;
|
return;
|
||||||
@ -1793,6 +1808,7 @@ public AclStatus getAclStatus(String src) throws IOException {
|
|||||||
public void createEncryptionZone(String src, String keyName)
|
public void createEncryptionZone(String src, String keyName)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
checkNNStartup();
|
checkNNStartup();
|
||||||
|
namesystem.checkOperation(OperationCategory.WRITE);
|
||||||
final CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
final CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
||||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||||
return;
|
return;
|
||||||
@ -1824,6 +1840,7 @@ public BatchedEntries<EncryptionZone> listEncryptionZones(
|
|||||||
public void setXAttr(String src, XAttr xAttr, EnumSet<XAttrSetFlag> flag)
|
public void setXAttr(String src, XAttr xAttr, EnumSet<XAttrSetFlag> flag)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
checkNNStartup();
|
checkNNStartup();
|
||||||
|
namesystem.checkOperation(OperationCategory.WRITE);
|
||||||
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
||||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||||
return; // Return previous response
|
return; // Return previous response
|
||||||
@ -1853,6 +1870,7 @@ public List<XAttr> listXAttrs(String src) throws IOException {
|
|||||||
@Override // ClientProtocol
|
@Override // ClientProtocol
|
||||||
public void removeXAttr(String src, XAttr xAttr) throws IOException {
|
public void removeXAttr(String src, XAttr xAttr) throws IOException {
|
||||||
checkNNStartup();
|
checkNNStartup();
|
||||||
|
namesystem.checkOperation(OperationCategory.WRITE);
|
||||||
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
||||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||||
return; // Return previous response
|
return; // Return previous response
|
||||||
|
@ -215,7 +215,8 @@ private DFSClient genClientWithDummyHandler() throws IOException {
|
|||||||
abstract class AtMostOnceOp {
|
abstract class AtMostOnceOp {
|
||||||
private final String name;
|
private final String name;
|
||||||
final DFSClient client;
|
final DFSClient client;
|
||||||
|
int expectedUpdateCount = 0;
|
||||||
|
|
||||||
AtMostOnceOp(String name, DFSClient client) {
|
AtMostOnceOp(String name, DFSClient client) {
|
||||||
this.name = name;
|
this.name = name;
|
||||||
this.client = client;
|
this.client = client;
|
||||||
@ -225,6 +226,9 @@ abstract class AtMostOnceOp {
|
|||||||
abstract void invoke() throws Exception;
|
abstract void invoke() throws Exception;
|
||||||
abstract boolean checkNamenodeBeforeReturn() throws Exception;
|
abstract boolean checkNamenodeBeforeReturn() throws Exception;
|
||||||
abstract Object getResult();
|
abstract Object getResult();
|
||||||
|
int getExpectedCacheUpdateCount() {
|
||||||
|
return expectedUpdateCount;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/** createSnapshot operaiton */
|
/** createSnapshot operaiton */
|
||||||
@ -604,7 +608,7 @@ Object getResult() {
|
|||||||
class DeleteOp extends AtMostOnceOp {
|
class DeleteOp extends AtMostOnceOp {
|
||||||
private final String target;
|
private final String target;
|
||||||
private boolean deleted;
|
private boolean deleted;
|
||||||
|
|
||||||
DeleteOp(DFSClient client, String target) {
|
DeleteOp(DFSClient client, String target) {
|
||||||
super("delete", client);
|
super("delete", client);
|
||||||
this.target = target;
|
this.target = target;
|
||||||
@ -614,12 +618,14 @@ class DeleteOp extends AtMostOnceOp {
|
|||||||
void prepare() throws Exception {
|
void prepare() throws Exception {
|
||||||
Path p = new Path(target);
|
Path p = new Path(target);
|
||||||
if (!dfs.exists(p)) {
|
if (!dfs.exists(p)) {
|
||||||
|
expectedUpdateCount++;
|
||||||
DFSTestUtil.createFile(dfs, p, BlockSize, DataNodes, 0);
|
DFSTestUtil.createFile(dfs, p, BlockSize, DataNodes, 0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
void invoke() throws Exception {
|
void invoke() throws Exception {
|
||||||
|
expectedUpdateCount++;
|
||||||
deleted = client.delete(target, true);
|
deleted = client.delete(target, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -655,12 +661,14 @@ public CreateSymlinkOp(DFSClient client, String target, String link) {
|
|||||||
void prepare() throws Exception {
|
void prepare() throws Exception {
|
||||||
Path p = new Path(target);
|
Path p = new Path(target);
|
||||||
if (!dfs.exists(p)) {
|
if (!dfs.exists(p)) {
|
||||||
|
expectedUpdateCount++;
|
||||||
DFSTestUtil.createFile(dfs, p, BlockSize, DataNodes, 0);
|
DFSTestUtil.createFile(dfs, p, BlockSize, DataNodes, 0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
void invoke() throws Exception {
|
void invoke() throws Exception {
|
||||||
|
expectedUpdateCount++;
|
||||||
client.createSymlink(target, link, false);
|
client.createSymlink(target, link, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -773,11 +781,13 @@ class AddCacheDirectiveInfoOp extends AtMostOnceOp {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
void prepare() throws Exception {
|
void prepare() throws Exception {
|
||||||
|
expectedUpdateCount++;
|
||||||
dfs.addCachePool(new CachePoolInfo(directive.getPool()));
|
dfs.addCachePool(new CachePoolInfo(directive.getPool()));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
void invoke() throws Exception {
|
void invoke() throws Exception {
|
||||||
|
expectedUpdateCount++;
|
||||||
result = client.addCacheDirective(directive, EnumSet.of(CacheFlag.FORCE));
|
result = client.addCacheDirective(directive, EnumSet.of(CacheFlag.FORCE));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -819,12 +829,15 @@ class ModifyCacheDirectiveInfoOp extends AtMostOnceOp {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
void prepare() throws Exception {
|
void prepare() throws Exception {
|
||||||
|
expectedUpdateCount++;
|
||||||
dfs.addCachePool(new CachePoolInfo(directive.getPool()));
|
dfs.addCachePool(new CachePoolInfo(directive.getPool()));
|
||||||
|
expectedUpdateCount++;
|
||||||
id = client.addCacheDirective(directive, EnumSet.of(CacheFlag.FORCE));
|
id = client.addCacheDirective(directive, EnumSet.of(CacheFlag.FORCE));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
void invoke() throws Exception {
|
void invoke() throws Exception {
|
||||||
|
expectedUpdateCount++;
|
||||||
client.modifyCacheDirective(
|
client.modifyCacheDirective(
|
||||||
new CacheDirectiveInfo.Builder().
|
new CacheDirectiveInfo.Builder().
|
||||||
setId(id).
|
setId(id).
|
||||||
@ -875,12 +888,15 @@ class RemoveCacheDirectiveInfoOp extends AtMostOnceOp {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
void prepare() throws Exception {
|
void prepare() throws Exception {
|
||||||
|
expectedUpdateCount++;
|
||||||
dfs.addCachePool(new CachePoolInfo(directive.getPool()));
|
dfs.addCachePool(new CachePoolInfo(directive.getPool()));
|
||||||
|
expectedUpdateCount++;
|
||||||
id = dfs.addCacheDirective(directive, EnumSet.of(CacheFlag.FORCE));
|
id = dfs.addCacheDirective(directive, EnumSet.of(CacheFlag.FORCE));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
void invoke() throws Exception {
|
void invoke() throws Exception {
|
||||||
|
expectedUpdateCount++;
|
||||||
client.removeCacheDirective(id);
|
client.removeCacheDirective(id);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -922,6 +938,7 @@ void prepare() throws Exception {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
void invoke() throws Exception {
|
void invoke() throws Exception {
|
||||||
|
expectedUpdateCount++;
|
||||||
client.addCachePool(new CachePoolInfo(pool));
|
client.addCachePool(new CachePoolInfo(pool));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -954,11 +971,13 @@ class ModifyCachePoolOp extends AtMostOnceOp {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
void prepare() throws Exception {
|
void prepare() throws Exception {
|
||||||
|
expectedUpdateCount++;
|
||||||
client.addCachePool(new CachePoolInfo(pool).setLimit(10l));
|
client.addCachePool(new CachePoolInfo(pool).setLimit(10l));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
void invoke() throws Exception {
|
void invoke() throws Exception {
|
||||||
|
expectedUpdateCount++;
|
||||||
client.modifyCachePool(new CachePoolInfo(pool).setLimit(99l));
|
client.modifyCachePool(new CachePoolInfo(pool).setLimit(99l));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -991,11 +1010,13 @@ class RemoveCachePoolOp extends AtMostOnceOp {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
void prepare() throws Exception {
|
void prepare() throws Exception {
|
||||||
|
expectedUpdateCount++;
|
||||||
client.addCachePool(new CachePoolInfo(pool));
|
client.addCachePool(new CachePoolInfo(pool));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
void invoke() throws Exception {
|
void invoke() throws Exception {
|
||||||
|
expectedUpdateCount++;
|
||||||
client.removeCachePool(pool);
|
client.removeCachePool(pool);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1030,12 +1051,14 @@ class SetXAttrOp extends AtMostOnceOp {
|
|||||||
void prepare() throws Exception {
|
void prepare() throws Exception {
|
||||||
Path p = new Path(src);
|
Path p = new Path(src);
|
||||||
if (!dfs.exists(p)) {
|
if (!dfs.exists(p)) {
|
||||||
|
expectedUpdateCount++;
|
||||||
DFSTestUtil.createFile(dfs, p, BlockSize, DataNodes, 0);
|
DFSTestUtil.createFile(dfs, p, BlockSize, DataNodes, 0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
void invoke() throws Exception {
|
void invoke() throws Exception {
|
||||||
|
expectedUpdateCount++;
|
||||||
client.setXAttr(src, "user.key", "value".getBytes(),
|
client.setXAttr(src, "user.key", "value".getBytes(),
|
||||||
EnumSet.of(XAttrSetFlag.CREATE));
|
EnumSet.of(XAttrSetFlag.CREATE));
|
||||||
}
|
}
|
||||||
@ -1072,7 +1095,9 @@ class RemoveXAttrOp extends AtMostOnceOp {
|
|||||||
void prepare() throws Exception {
|
void prepare() throws Exception {
|
||||||
Path p = new Path(src);
|
Path p = new Path(src);
|
||||||
if (!dfs.exists(p)) {
|
if (!dfs.exists(p)) {
|
||||||
|
expectedUpdateCount++;
|
||||||
DFSTestUtil.createFile(dfs, p, BlockSize, DataNodes, 0);
|
DFSTestUtil.createFile(dfs, p, BlockSize, DataNodes, 0);
|
||||||
|
expectedUpdateCount++;
|
||||||
client.setXAttr(src, "user.key", "value".getBytes(),
|
client.setXAttr(src, "user.key", "value".getBytes(),
|
||||||
EnumSet.of(XAttrSetFlag.CREATE));
|
EnumSet.of(XAttrSetFlag.CREATE));
|
||||||
}
|
}
|
||||||
@ -1080,6 +1105,7 @@ void prepare() throws Exception {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
void invoke() throws Exception {
|
void invoke() throws Exception {
|
||||||
|
expectedUpdateCount++;
|
||||||
client.removeXAttr(src, "user.key");
|
client.removeXAttr(src, "user.key");
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1316,6 +1342,13 @@ public void run() {
|
|||||||
assertTrue("CacheUpdated on NN0: " + updatedNN0, updatedNN0 > 0);
|
assertTrue("CacheUpdated on NN0: " + updatedNN0, updatedNN0 > 0);
|
||||||
// Cache updated metrics on NN0 should be >0 since NN1 applied the editlog
|
// Cache updated metrics on NN0 should be >0 since NN1 applied the editlog
|
||||||
assertTrue("CacheUpdated on NN1: " + updatedNN1, updatedNN1 > 0);
|
assertTrue("CacheUpdated on NN1: " + updatedNN1, updatedNN1 > 0);
|
||||||
|
long expectedUpdateCount = op.getExpectedCacheUpdateCount();
|
||||||
|
if (expectedUpdateCount > 0) {
|
||||||
|
assertEquals("CacheUpdated on NN0: " + updatedNN0, expectedUpdateCount,
|
||||||
|
updatedNN0);
|
||||||
|
assertEquals("CacheUpdated on NN0: " + updatedNN1, expectedUpdateCount,
|
||||||
|
updatedNN1);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
Loading…
Reference in New Issue
Block a user