From 7817674a3a4d097b647dd77f1345787dd376d5ea Mon Sep 17 00:00:00 2001 From: Jing Zhao Date: Fri, 29 May 2015 11:05:13 -0700 Subject: [PATCH] HDFS-7609. Avoid retry cache collision when Standby NameNode loading edits. Contributed by Ming Ma. --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 ++ .../hdfs/server/namenode/FSNamesystem.java | 20 ---------- .../server/namenode/NameNodeRpcServer.java | 22 ++++++++++- .../namenode/ha/TestRetryCacheWithHA.java | 37 ++++++++++++++++++- 4 files changed, 58 insertions(+), 24 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 23699a38b3..cc8235c5fb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -822,6 +822,9 @@ Release 2.8.0 - UNRELEASED HDFS-7401. Add block info to DFSInputStream' WARN message when it adds node to deadNodes (Arshad Mohammad via vinayakumarb) + HDFS-7609. Avoid retry cache collision when Standby NameNode loading edits. + (Ming Ma via jing9) + Release 2.7.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 798f8d5049..5ed069d514 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -1887,7 +1887,6 @@ private GetBlockLocationsResult getBlockLocationsInt( */ void concat(String target, String [] srcs, boolean logRetryCache) throws IOException { - checkOperation(OperationCategory.WRITE); waitForLoadingFSImage(); HdfsFileStatus stat = null; boolean success = false; @@ -2162,7 +2161,6 @@ void createSymlink(String target, String link, } waitForLoadingFSImage(); HdfsFileStatus auditStat = null; - checkOperation(OperationCategory.WRITE); writeLock(); try { checkOperation(OperationCategory.WRITE); @@ -2379,7 +2377,6 @@ private HdfsFileStatus startFileInt(final String src, throw new InvalidPathException(src); } blockManager.verifyReplication(src, replication, clientMachine); - checkOperation(OperationCategory.WRITE); if (blockSize < minBlockSize) { throw new IOException("Specified block size is less than configured" + " minimum value (" + DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY @@ -2786,7 +2783,6 @@ private LastBlockWithStatus appendFileInt(final String srcArg, String holder, LocatedBlock lb = null; HdfsFileStatus stat = null; FSPermissionChecker pc = getPermissionChecker(); - checkOperation(OperationCategory.WRITE); byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src); writeLock(); try { @@ -3081,7 +3077,6 @@ boolean checkFileProgress(String src, INodeFile v, boolean checkall) { boolean renameTo(String src, String dst, boolean logRetryCache) throws IOException { waitForLoadingFSImage(); - checkOperation(OperationCategory.WRITE); FSDirRenameOp.RenameOldResult ret = null; writeLock(); try { @@ -3107,7 +3102,6 @@ void renameTo(final String src, final String dst, boolean logRetryCache, Options.Rename... options) throws IOException { waitForLoadingFSImage(); - checkOperation(OperationCategory.WRITE); Map.Entry res = null; writeLock(); try { @@ -3144,7 +3138,6 @@ void renameTo(final String src, final String dst, boolean delete(String src, boolean recursive, boolean logRetryCache) throws IOException { waitForLoadingFSImage(); - checkOperation(OperationCategory.WRITE); BlocksMapUpdateInfo toRemovedBlocks = null; writeLock(); boolean ret = false; @@ -5762,8 +5755,6 @@ void updatePipeline( String clientName, ExtendedBlock oldBlock, ExtendedBlock newBlock, DatanodeID[] newNodes, String[] newStorageIDs, boolean logRetryCache) throws IOException { - checkOperation(OperationCategory.WRITE); - LOG.info("updatePipeline(" + oldBlock.getLocalBlock() + ", newGS=" + newBlock.getGenerationStamp() + ", newLength=" + newBlock.getNumBytes() @@ -6744,7 +6735,6 @@ String createSnapshot(String snapshotRoot, String snapshotName, void renameSnapshot( String path, String snapshotOldName, String snapshotNewName, boolean logRetryCache) throws IOException { - checkOperation(OperationCategory.WRITE); boolean success = false; writeLock(); try { @@ -6832,7 +6822,6 @@ SnapshotDiffReport getSnapshotDiffReport(String path, */ void deleteSnapshot(String snapshotRoot, String snapshotName, boolean logRetryCache) throws IOException { - checkOperation(OperationCategory.WRITE); boolean success = false; writeLock(); BlocksMapUpdateInfo blocksToBeDeleted = null; @@ -7057,7 +7046,6 @@ void finalizeRollingUpgradeInternal(long finalizeTime) { long addCacheDirective(CacheDirectiveInfo directive, EnumSet flags, boolean logRetryCache) throws IOException { - checkOperation(OperationCategory.WRITE); CacheDirectiveInfo effectiveDirective = null; if (!flags.contains(CacheFlag.FORCE)) { cacheManager.waitForRescanIfNeeded(); @@ -7085,7 +7073,6 @@ long addCacheDirective(CacheDirectiveInfo directive, void modifyCacheDirective(CacheDirectiveInfo directive, EnumSet flags, boolean logRetryCache) throws IOException { - checkOperation(OperationCategory.WRITE); boolean success = false; if (!flags.contains(CacheFlag.FORCE)) { cacheManager.waitForRescanIfNeeded(); @@ -7109,7 +7096,6 @@ void modifyCacheDirective(CacheDirectiveInfo directive, } void removeCacheDirective(long id, boolean logRetryCache) throws IOException { - checkOperation(OperationCategory.WRITE); boolean success = false; writeLock(); try { @@ -7148,7 +7134,6 @@ BatchedListEntries listCacheDirectives( void addCachePool(CachePoolInfo req, boolean logRetryCache) throws IOException { - checkOperation(OperationCategory.WRITE); writeLock(); boolean success = false; String poolInfoStr = null; @@ -7170,7 +7155,6 @@ void addCachePool(CachePoolInfo req, boolean logRetryCache) void modifyCachePool(CachePoolInfo req, boolean logRetryCache) throws IOException { - checkOperation(OperationCategory.WRITE); writeLock(); boolean success = false; try { @@ -7192,7 +7176,6 @@ void modifyCachePool(CachePoolInfo req, boolean logRetryCache) void removeCachePool(String cachePoolName, boolean logRetryCache) throws IOException { - checkOperation(OperationCategory.WRITE); writeLock(); boolean success = false; try { @@ -7387,7 +7370,6 @@ private void createEncryptionZoneInt(final String srcArg, String cipher, String src = srcArg; HdfsFileStatus resultingStat = null; checkSuperuserPrivilege(); - checkOperation(OperationCategory.WRITE); final byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src); FSPermissionChecker pc = getPermissionChecker(); @@ -7473,7 +7455,6 @@ BatchedListEntries listEncryptionZones(long prevId) void setXAttr(String src, XAttr xAttr, EnumSet flag, boolean logRetryCache) throws IOException { - checkOperation(OperationCategory.WRITE); HdfsFileStatus auditStat = null; writeLock(); try { @@ -7521,7 +7502,6 @@ List listXAttrs(String src) throws IOException { void removeXAttr(String src, XAttr xAttr, boolean logRetryCache) throws IOException { - checkOperation(OperationCategory.WRITE); HdfsFileStatus auditStat = null; writeLock(); try { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index 0d416a6eb2..dafa23e5e0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -616,7 +616,7 @@ public HdfsFileStatus create(String src, FsPermission masked, throw new IOException("create: Pathname too long. Limit " + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels."); } - + namesystem.checkOperation(OperationCategory.WRITE); CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache, null); if (cacheEntry != null && cacheEntry.isSuccess()) { return (HdfsFileStatus) cacheEntry.getPayload(); @@ -647,6 +647,7 @@ public LastBlockWithStatus append(String src, String clientName, stateChangeLog.debug("*DIR* NameNode.append: file " +src+" for "+clientName+" at "+clientMachine); } + namesystem.checkOperation(OperationCategory.WRITE); CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache, null); if (cacheEntry != null && cacheEntry.isSuccess()) { @@ -794,6 +795,7 @@ public void updatePipeline(String clientName, ExtendedBlock oldBlock, ExtendedBlock newBlock, DatanodeID[] newNodes, String[] newStorageIDs) throws IOException { checkNNStartup(); + namesystem.checkOperation(OperationCategory.WRITE); CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); if (cacheEntry != null && cacheEntry.isSuccess()) { return; // Return previous response @@ -838,7 +840,7 @@ public boolean rename(String src, String dst) throws IOException { throw new IOException("rename: Pathname too long. Limit " + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels."); } - + namesystem.checkOperation(OperationCategory.WRITE); CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); if (cacheEntry != null && cacheEntry.isSuccess()) { return true; // Return previous response @@ -859,6 +861,7 @@ public boolean rename(String src, String dst) throws IOException { @Override // ClientProtocol public void concat(String trg, String[] src) throws IOException { checkNNStartup(); + namesystem.checkOperation(OperationCategory.WRITE); CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); if (cacheEntry != null && cacheEntry.isSuccess()) { return; // Return previous response @@ -884,6 +887,7 @@ public void rename2(String src, String dst, Options.Rename... options) throw new IOException("rename: Pathname too long. Limit " + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels."); } + namesystem.checkOperation(OperationCategory.WRITE); CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); if (cacheEntry != null && cacheEntry.isSuccess()) { return; // Return previous response @@ -922,6 +926,7 @@ public boolean delete(String src, boolean recursive) throws IOException { stateChangeLog.debug("*DIR* Namenode.delete: src=" + src + ", recursive=" + recursive); } + namesystem.checkOperation(OperationCategory.WRITE); CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); if (cacheEntry != null && cacheEntry.isSuccess()) { return true; // Return previous response @@ -1207,6 +1212,7 @@ public void setTimes(String src, long mtime, long atime) public void createSymlink(String target, String link, FsPermission dirPerms, boolean createParent) throws IOException { checkNNStartup(); + namesystem.checkOperation(OperationCategory.WRITE); CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); if (cacheEntry != null && cacheEntry.isSuccess()) { return; // Return previous response @@ -1537,6 +1543,7 @@ public String createSnapshot(String snapshotRoot, String snapshotName) throw new IOException("createSnapshot: Pathname too long. Limit " + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels."); } + namesystem.checkOperation(OperationCategory.WRITE); CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache, null); if (cacheEntry != null && cacheEntry.isSuccess()) { @@ -1558,6 +1565,7 @@ public String createSnapshot(String snapshotRoot, String snapshotName) public void deleteSnapshot(String snapshotRoot, String snapshotName) throws IOException { checkNNStartup(); + namesystem.checkOperation(OperationCategory.WRITE); metrics.incrDeleteSnapshotOps(); CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); if (cacheEntry != null && cacheEntry.isSuccess()) { @@ -1596,6 +1604,7 @@ public void renameSnapshot(String snapshotRoot, String snapshotOldName, if (snapshotNewName == null || snapshotNewName.isEmpty()) { throw new IOException("The new snapshot name is null or empty."); } + namesystem.checkOperation(OperationCategory.WRITE); metrics.incrRenameSnapshotOps(); CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); if (cacheEntry != null && cacheEntry.isSuccess()) { @@ -1635,6 +1644,7 @@ public SnapshotDiffReport getSnapshotDiffReport(String snapshotRoot, public long addCacheDirective( CacheDirectiveInfo path, EnumSet flags) throws IOException { checkNNStartup(); + namesystem.checkOperation(OperationCategory.WRITE); CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion (retryCache, null); if (cacheEntry != null && cacheEntry.isSuccess()) { @@ -1656,6 +1666,7 @@ public long addCacheDirective( public void modifyCacheDirective( CacheDirectiveInfo directive, EnumSet flags) throws IOException { checkNNStartup(); + namesystem.checkOperation(OperationCategory.WRITE); CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); if (cacheEntry != null && cacheEntry.isSuccess()) { return; @@ -1673,6 +1684,7 @@ public void modifyCacheDirective( @Override // ClientProtocol public void removeCacheDirective(long id) throws IOException { checkNNStartup(); + namesystem.checkOperation(OperationCategory.WRITE); CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); if (cacheEntry != null && cacheEntry.isSuccess()) { return; @@ -1699,6 +1711,7 @@ public BatchedEntries listCacheDirectives(long prevId, @Override //ClientProtocol public void addCachePool(CachePoolInfo info) throws IOException { checkNNStartup(); + namesystem.checkOperation(OperationCategory.WRITE); CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); if (cacheEntry != null && cacheEntry.isSuccess()) { return; // Return previous response @@ -1715,6 +1728,7 @@ public void addCachePool(CachePoolInfo info) throws IOException { @Override // ClientProtocol public void modifyCachePool(CachePoolInfo info) throws IOException { checkNNStartup(); + namesystem.checkOperation(OperationCategory.WRITE); CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); if (cacheEntry != null && cacheEntry.isSuccess()) { return; // Return previous response @@ -1731,6 +1745,7 @@ public void modifyCachePool(CachePoolInfo info) throws IOException { @Override // ClientProtocol public void removeCachePool(String cachePoolName) throws IOException { checkNNStartup(); + namesystem.checkOperation(OperationCategory.WRITE); CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); if (cacheEntry != null && cacheEntry.isSuccess()) { return; @@ -1793,6 +1808,7 @@ public AclStatus getAclStatus(String src) throws IOException { public void createEncryptionZone(String src, String keyName) throws IOException { checkNNStartup(); + namesystem.checkOperation(OperationCategory.WRITE); final CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); if (cacheEntry != null && cacheEntry.isSuccess()) { return; @@ -1824,6 +1840,7 @@ public BatchedEntries listEncryptionZones( public void setXAttr(String src, XAttr xAttr, EnumSet flag) throws IOException { checkNNStartup(); + namesystem.checkOperation(OperationCategory.WRITE); CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); if (cacheEntry != null && cacheEntry.isSuccess()) { return; // Return previous response @@ -1853,6 +1870,7 @@ public List listXAttrs(String src) throws IOException { @Override // ClientProtocol public void removeXAttr(String src, XAttr xAttr) throws IOException { checkNNStartup(); + namesystem.checkOperation(OperationCategory.WRITE); CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); if (cacheEntry != null && cacheEntry.isSuccess()) { return; // Return previous response diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java index e3572abde5..d202fb788f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java @@ -215,7 +215,8 @@ private DFSClient genClientWithDummyHandler() throws IOException { abstract class AtMostOnceOp { private final String name; final DFSClient client; - + int expectedUpdateCount = 0; + AtMostOnceOp(String name, DFSClient client) { this.name = name; this.client = client; @@ -225,6 +226,9 @@ abstract class AtMostOnceOp { abstract void invoke() throws Exception; abstract boolean checkNamenodeBeforeReturn() throws Exception; abstract Object getResult(); + int getExpectedCacheUpdateCount() { + return expectedUpdateCount; + } } /** createSnapshot operaiton */ @@ -604,7 +608,7 @@ Object getResult() { class DeleteOp extends AtMostOnceOp { private final String target; private boolean deleted; - + DeleteOp(DFSClient client, String target) { super("delete", client); this.target = target; @@ -614,12 +618,14 @@ class DeleteOp extends AtMostOnceOp { void prepare() throws Exception { Path p = new Path(target); if (!dfs.exists(p)) { + expectedUpdateCount++; DFSTestUtil.createFile(dfs, p, BlockSize, DataNodes, 0); } } @Override void invoke() throws Exception { + expectedUpdateCount++; deleted = client.delete(target, true); } @@ -655,12 +661,14 @@ public CreateSymlinkOp(DFSClient client, String target, String link) { void prepare() throws Exception { Path p = new Path(target); if (!dfs.exists(p)) { + expectedUpdateCount++; DFSTestUtil.createFile(dfs, p, BlockSize, DataNodes, 0); } } @Override void invoke() throws Exception { + expectedUpdateCount++; client.createSymlink(target, link, false); } @@ -773,11 +781,13 @@ class AddCacheDirectiveInfoOp extends AtMostOnceOp { @Override void prepare() throws Exception { + expectedUpdateCount++; dfs.addCachePool(new CachePoolInfo(directive.getPool())); } @Override void invoke() throws Exception { + expectedUpdateCount++; result = client.addCacheDirective(directive, EnumSet.of(CacheFlag.FORCE)); } @@ -819,12 +829,15 @@ class ModifyCacheDirectiveInfoOp extends AtMostOnceOp { @Override void prepare() throws Exception { + expectedUpdateCount++; dfs.addCachePool(new CachePoolInfo(directive.getPool())); + expectedUpdateCount++; id = client.addCacheDirective(directive, EnumSet.of(CacheFlag.FORCE)); } @Override void invoke() throws Exception { + expectedUpdateCount++; client.modifyCacheDirective( new CacheDirectiveInfo.Builder(). setId(id). @@ -875,12 +888,15 @@ class RemoveCacheDirectiveInfoOp extends AtMostOnceOp { @Override void prepare() throws Exception { + expectedUpdateCount++; dfs.addCachePool(new CachePoolInfo(directive.getPool())); + expectedUpdateCount++; id = dfs.addCacheDirective(directive, EnumSet.of(CacheFlag.FORCE)); } @Override void invoke() throws Exception { + expectedUpdateCount++; client.removeCacheDirective(id); } @@ -922,6 +938,7 @@ void prepare() throws Exception { @Override void invoke() throws Exception { + expectedUpdateCount++; client.addCachePool(new CachePoolInfo(pool)); } @@ -954,11 +971,13 @@ class ModifyCachePoolOp extends AtMostOnceOp { @Override void prepare() throws Exception { + expectedUpdateCount++; client.addCachePool(new CachePoolInfo(pool).setLimit(10l)); } @Override void invoke() throws Exception { + expectedUpdateCount++; client.modifyCachePool(new CachePoolInfo(pool).setLimit(99l)); } @@ -991,11 +1010,13 @@ class RemoveCachePoolOp extends AtMostOnceOp { @Override void prepare() throws Exception { + expectedUpdateCount++; client.addCachePool(new CachePoolInfo(pool)); } @Override void invoke() throws Exception { + expectedUpdateCount++; client.removeCachePool(pool); } @@ -1030,12 +1051,14 @@ class SetXAttrOp extends AtMostOnceOp { void prepare() throws Exception { Path p = new Path(src); if (!dfs.exists(p)) { + expectedUpdateCount++; DFSTestUtil.createFile(dfs, p, BlockSize, DataNodes, 0); } } @Override void invoke() throws Exception { + expectedUpdateCount++; client.setXAttr(src, "user.key", "value".getBytes(), EnumSet.of(XAttrSetFlag.CREATE)); } @@ -1072,7 +1095,9 @@ class RemoveXAttrOp extends AtMostOnceOp { void prepare() throws Exception { Path p = new Path(src); if (!dfs.exists(p)) { + expectedUpdateCount++; DFSTestUtil.createFile(dfs, p, BlockSize, DataNodes, 0); + expectedUpdateCount++; client.setXAttr(src, "user.key", "value".getBytes(), EnumSet.of(XAttrSetFlag.CREATE)); } @@ -1080,6 +1105,7 @@ void prepare() throws Exception { @Override void invoke() throws Exception { + expectedUpdateCount++; client.removeXAttr(src, "user.key"); } @@ -1316,6 +1342,13 @@ public void run() { assertTrue("CacheUpdated on NN0: " + updatedNN0, updatedNN0 > 0); // Cache updated metrics on NN0 should be >0 since NN1 applied the editlog assertTrue("CacheUpdated on NN1: " + updatedNN1, updatedNN1 > 0); + long expectedUpdateCount = op.getExpectedCacheUpdateCount(); + if (expectedUpdateCount > 0) { + assertEquals("CacheUpdated on NN0: " + updatedNN0, expectedUpdateCount, + updatedNN0); + assertEquals("CacheUpdated on NN0: " + updatedNN1, expectedUpdateCount, + updatedNN1); + } } /**