diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt index 5da36d0270..03e9438a26 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt @@ -111,3 +111,7 @@ HDFS-4949 (Unreleased) HDFS-5203. Concurrent clients that add a cache directive on the same path may prematurely uncache from each other. (Chris Nauroth via Colin Patrick McCabe) + + HDFS-5385. Caching RPCs are AtMostOnce, but do not persist client ID and + call ID to edit log. (Chris Nauroth via Colin Patrick McCabe) + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java index e7123390ac..302c1615bc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java @@ -2861,6 +2861,10 @@ public String toString() { } } + /** + * {@literal @AtMostOnce} for + * {@link ClientProtocol#addPathBasedCacheDirective} + */ static class AddPathBasedCacheDirectiveOp extends FSEditLogOp { String path; short replication; @@ -2895,6 +2899,7 @@ void readFields(DataInputStream in, int logVersion) throws IOException { this.path = FSImageSerialization.readString(in); this.replication = FSImageSerialization.readShort(in); this.pool = FSImageSerialization.readString(in); + readRpcIds(in, logVersion); } @Override @@ -2902,6 +2907,7 @@ public void writeFields(DataOutputStream out) throws IOException { FSImageSerialization.writeString(path, out); FSImageSerialization.writeShort(replication, out); FSImageSerialization.writeString(pool, out); + writeRpcIds(rpcClientId, rpcCallId, out); } @Override @@ -2910,6 +2916,7 @@ protected void toXml(ContentHandler contentHandler) throws SAXException { XMLUtils.addSaxString(contentHandler, "REPLICATION", Short.toString(replication)); XMLUtils.addSaxString(contentHandler, "POOL", pool); + appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId); } @Override @@ -2917,6 +2924,7 @@ void fromXml(Stanza st) throws InvalidXmlException { path = st.getValue("PATH"); replication = Short.parseShort(st.getValue("REPLICATION")); pool = st.getValue("POOL"); + readRpcIdsFromXml(st); } @Override @@ -2925,11 +2933,17 @@ public String toString() { builder.append("AddPathBasedCacheDirective ["); builder.append("path=" + path + ","); builder.append("replication=" + replication + ","); - builder.append("pool=" + pool + "]"); + builder.append("pool=" + pool); + appendRpcIdsToString(builder, rpcClientId, rpcCallId); + builder.append("]"); return builder.toString(); } } + /** + * {@literal @AtMostOnce} for + * {@link ClientProtocol#removePathBasedCacheDescriptor} + */ static class RemovePathBasedCacheDescriptorOp extends FSEditLogOp { long id; @@ -2950,32 +2964,39 @@ public RemovePathBasedCacheDescriptorOp setId(long id) { @Override void readFields(DataInputStream in, int logVersion) throws IOException { this.id = FSImageSerialization.readLong(in); + readRpcIds(in, logVersion); } @Override public void writeFields(DataOutputStream out) throws IOException { FSImageSerialization.writeLong(id, out); + writeRpcIds(rpcClientId, rpcCallId, out); } @Override protected void toXml(ContentHandler contentHandler) throws SAXException { XMLUtils.addSaxString(contentHandler, "ID", Long.toString(id)); + appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId); } @Override void fromXml(Stanza st) throws InvalidXmlException { this.id = Long.parseLong(st.getValue("ID")); + readRpcIdsFromXml(st); } @Override public String toString() { StringBuilder builder = new StringBuilder(); builder.append("RemovePathBasedCacheDescriptor ["); - builder.append("id=" + Long.toString(id) + "]"); + builder.append("id=" + Long.toString(id)); + appendRpcIdsToString(builder, rpcClientId, rpcCallId); + builder.append("]"); return builder.toString(); } } + /** {@literal @AtMostOnce} for {@link ClientProtocol#addCachePool} */ static class AddCachePoolOp extends FSEditLogOp { CachePoolInfo info; @@ -2995,21 +3016,25 @@ public AddCachePoolOp setPool(CachePoolInfo info) { @Override void readFields(DataInputStream in, int logVersion) throws IOException { info = CachePoolInfo.readFrom(in); + readRpcIds(in, logVersion); } @Override public void writeFields(DataOutputStream out) throws IOException { info .writeTo(out); + writeRpcIds(rpcClientId, rpcCallId, out); } @Override protected void toXml(ContentHandler contentHandler) throws SAXException { - info .writeXmlTo(contentHandler); + info.writeXmlTo(contentHandler); + appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId); } @Override void fromXml(Stanza st) throws InvalidXmlException { this.info = CachePoolInfo.readXmlFrom(st); + readRpcIdsFromXml(st); } @Override @@ -3020,11 +3045,14 @@ public String toString() { builder.append("ownerName=" + info.getOwnerName() + ","); builder.append("groupName=" + info.getGroupName() + ","); builder.append("mode=" + Short.toString(info.getMode().toShort()) + ","); - builder.append("weight=" + Integer.toString(info.getWeight()) + "]"); + builder.append("weight=" + Integer.toString(info.getWeight())); + appendRpcIdsToString(builder, rpcClientId, rpcCallId); + builder.append("]"); return builder.toString(); } } + /** {@literal @AtMostOnce} for {@link ClientProtocol#modifyCachePool} */ static class ModifyCachePoolOp extends FSEditLogOp { CachePoolInfo info; @@ -3044,21 +3072,25 @@ public ModifyCachePoolOp setInfo(CachePoolInfo info) { @Override void readFields(DataInputStream in, int logVersion) throws IOException { info = CachePoolInfo.readFrom(in); + readRpcIds(in, logVersion); } @Override public void writeFields(DataOutputStream out) throws IOException { info.writeTo(out); + writeRpcIds(rpcClientId, rpcCallId, out); } @Override protected void toXml(ContentHandler contentHandler) throws SAXException { cachePoolInfoToXml(contentHandler, info); + appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId); } @Override void fromXml(Stanza st) throws InvalidXmlException { this.info = cachePoolInfoFromXml(st); + readRpcIdsFromXml(st); } @Override @@ -3082,11 +3114,13 @@ public String toString() { fields.add("weight=" + info.getWeight()); } builder.append(Joiner.on(",").join(fields)); + appendRpcIdsToString(builder, rpcClientId, rpcCallId); builder.append("]"); return builder.toString(); } } + /** {@literal @AtMostOnce} for {@link ClientProtocol#removeCachePool} */ static class RemoveCachePoolOp extends FSEditLogOp { String poolName; @@ -3106,28 +3140,34 @@ public RemoveCachePoolOp setPoolName(String poolName) { @Override void readFields(DataInputStream in, int logVersion) throws IOException { poolName = FSImageSerialization.readString(in); + readRpcIds(in, logVersion); } @Override public void writeFields(DataOutputStream out) throws IOException { FSImageSerialization.writeString(poolName, out); + writeRpcIds(rpcClientId, rpcCallId, out); } @Override protected void toXml(ContentHandler contentHandler) throws SAXException { XMLUtils.addSaxString(contentHandler, "POOLNAME", poolName); + appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId); } @Override void fromXml(Stanza st) throws InvalidXmlException { this.poolName = st.getValue("POOLNAME"); + readRpcIdsFromXml(st); } @Override public String toString() { StringBuilder builder = new StringBuilder(); builder.append("RemoveCachePoolOp ["); - builder.append("poolName=" + poolName + "]"); + builder.append("poolName=" + poolName); + appendRpcIdsToString(builder, rpcClientId, rpcCallId); + builder.append("]"); return builder.toString(); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index d568b2cf9c..8471eb721d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -993,6 +993,20 @@ public static void runOperations(MiniDFSCluster cluster, locatedBlocks = DFSClientAdapter.callGetBlockLocations( cluster.getNameNodeRpc(nnIndex), filePath, 0L, bytes.length); } while (locatedBlocks.isUnderConstruction()); + // OP_ADD_CACHE_POOL 35 + filesystem.addCachePool(new CachePoolInfo("pool1")); + // OP_MODIFY_CACHE_POOL 36 + filesystem.modifyCachePool(new CachePoolInfo("pool1").setWeight(99)); + // OP_ADD_PATH_BASED_CACHE_DIRECTIVE 33 + PathBasedCacheDescriptor pbcd = filesystem.addPathBasedCacheDirective( + new PathBasedCacheDirective.Builder(). + setPath(new Path("/path")). + setPool("pool1"). + build()); + // OP_REMOVE_PATH_BASED_CACHE_DESCRIPTOR 34 + filesystem.removePathBasedCacheDescriptor(pbcd); + // OP_REMOVE_CACHE_POOL 37 + filesystem.removeCachePool("pool1"); } public static void abortStream(DFSOutputStream out) throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java index ddb7c0fa69..576c3eaf11 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java @@ -413,7 +413,7 @@ public void testRetryCacheRebuild() throws Exception { LightWeightCache cacheSet = (LightWeightCache) namesystem.getRetryCache().getCacheSet(); - assertEquals(14, cacheSet.size()); + assertEquals(19, cacheSet.size()); Map oldEntries = new HashMap(); @@ -432,7 +432,7 @@ public void testRetryCacheRebuild() throws Exception { assertTrue(namesystem.hasRetryCache()); cacheSet = (LightWeightCache) namesystem .getRetryCache().getCacheSet(); - assertEquals(14, cacheSet.size()); + assertEquals(19, cacheSet.size()); iter = cacheSet.iterator(); while (iter.hasNext()) { CacheEntry entry = iter.next(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java index 82deab5938..44f4e64ab6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java @@ -42,6 +42,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Options.Rename; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSConfigKeys; @@ -53,12 +54,15 @@ import org.apache.hadoop.hdfs.NameNodeProxies; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag; +import org.apache.hadoop.hdfs.protocol.CachePoolInfo; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.PathBasedCacheDescriptor; +import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.INodeFile; @@ -147,7 +151,7 @@ public void testRetryCacheOnStandbyNN() throws Exception { FSNamesystem fsn0 = cluster.getNamesystem(0); LightWeightCache cacheSet = (LightWeightCache) fsn0.getRetryCache().getCacheSet(); - assertEquals(14, cacheSet.size()); + assertEquals(19, cacheSet.size()); Map oldEntries = new HashMap(); @@ -168,7 +172,7 @@ public void testRetryCacheOnStandbyNN() throws Exception { FSNamesystem fsn1 = cluster.getNamesystem(1); cacheSet = (LightWeightCache) fsn1 .getRetryCache().getCacheSet(); - assertEquals(14, cacheSet.size()); + assertEquals(19, cacheSet.size()); iter = cacheSet.iterator(); while (iter.hasNext()) { CacheEntry entry = iter.next(); @@ -733,6 +737,208 @@ Object getResult() { } } + /** addPathBasedCacheDirective */ + class AddPathBasedCacheDirectiveOp extends AtMostOnceOp { + private String pool; + private String path; + private PathBasedCacheDescriptor descriptor; + + AddPathBasedCacheDirectiveOp(DFSClient client, String pool, String path) { + super("addPathBasedCacheDirective", client); + this.pool = pool; + this.path = path; + } + + @Override + void prepare() throws Exception { + dfs.addCachePool(new CachePoolInfo(pool)); + } + + @Override + void invoke() throws Exception { + descriptor = client.addPathBasedCacheDirective( + new PathBasedCacheDirective.Builder(). + setPath(new Path(path)). + setPool(pool). + build()); + } + + @Override + boolean checkNamenodeBeforeReturn() throws Exception { + for (int i = 0; i < CHECKTIMES; i++) { + RemoteIterator iter = + dfs.listPathBasedCacheDescriptors(pool, new Path(path)); + if (iter.hasNext()) { + return true; + } + Thread.sleep(1000); + } + return false; + } + + @Override + Object getResult() { + return descriptor; + } + } + + /** removePathBasedCacheDescriptor */ + class RemovePathBasedCacheDescriptorOp extends AtMostOnceOp { + private String pool; + private String path; + private PathBasedCacheDescriptor descriptor; + + RemovePathBasedCacheDescriptorOp(DFSClient client, String pool, + String path) { + super("removePathBasedCacheDescriptor", client); + this.pool = pool; + this.path = path; + } + + @Override + void prepare() throws Exception { + dfs.addCachePool(new CachePoolInfo(pool)); + descriptor = dfs.addPathBasedCacheDirective( + new PathBasedCacheDirective.Builder(). + setPath(new Path(path)). + setPool(pool). + build()); + } + + @Override + void invoke() throws Exception { + client.removePathBasedCacheDescriptor(descriptor.getEntryId()); + } + + @Override + boolean checkNamenodeBeforeReturn() throws Exception { + for (int i = 0; i < CHECKTIMES; i++) { + RemoteIterator iter = + dfs.listPathBasedCacheDescriptors(pool, new Path(path)); + if (!iter.hasNext()) { + return true; + } + Thread.sleep(1000); + } + return false; + } + + @Override + Object getResult() { + return null; + } + } + + /** addCachePool */ + class AddCachePoolOp extends AtMostOnceOp { + private String pool; + + AddCachePoolOp(DFSClient client, String pool) { + super("addCachePool", client); + this.pool = pool; + } + + @Override + void prepare() throws Exception { + } + + @Override + void invoke() throws Exception { + client.addCachePool(new CachePoolInfo(pool)); + } + + @Override + boolean checkNamenodeBeforeReturn() throws Exception { + for (int i = 0; i < CHECKTIMES; i++) { + RemoteIterator iter = dfs.listCachePools(); + if (iter.hasNext()) { + return true; + } + Thread.sleep(1000); + } + return false; + } + + @Override + Object getResult() { + return null; + } + } + + /** modifyCachePool */ + class ModifyCachePoolOp extends AtMostOnceOp { + String pool; + + ModifyCachePoolOp(DFSClient client, String pool) { + super("modifyCachePool", client); + this.pool = pool; + } + + @Override + void prepare() throws Exception { + client.addCachePool(new CachePoolInfo(pool).setWeight(10)); + } + + @Override + void invoke() throws Exception { + client.modifyCachePool(new CachePoolInfo(pool).setWeight(99)); + } + + @Override + boolean checkNamenodeBeforeReturn() throws Exception { + for (int i = 0; i < CHECKTIMES; i++) { + RemoteIterator iter = dfs.listCachePools(); + if (iter.hasNext() && iter.next().getWeight() == 99) { + return true; + } + Thread.sleep(1000); + } + return false; + } + + @Override + Object getResult() { + return null; + } + } + + /** removeCachePool */ + class RemoveCachePoolOp extends AtMostOnceOp { + private String pool; + + RemoveCachePoolOp(DFSClient client, String pool) { + super("removeCachePool", client); + this.pool = pool; + } + + @Override + void prepare() throws Exception { + client.addCachePool(new CachePoolInfo(pool)); + } + + @Override + void invoke() throws Exception { + client.removeCachePool(pool); + } + + @Override + boolean checkNamenodeBeforeReturn() throws Exception { + for (int i = 0; i < CHECKTIMES; i++) { + RemoteIterator iter = dfs.listCachePools(); + if (!iter.hasNext()) { + return true; + } + Thread.sleep(1000); + } + return false; + } + + @Override + Object getResult() { + return null; + } + } + @Test (timeout=60000) public void testCreateSnapshot() throws Exception { final DFSClient client = genClientWithDummyHandler(); @@ -810,6 +1016,42 @@ public void testUpdatePipeline() throws Exception { testClientRetryWithFailover(op); } + @Test (timeout=60000) + public void testAddPathBasedCacheDirective() throws Exception { + DFSClient client = genClientWithDummyHandler(); + AtMostOnceOp op = new AddPathBasedCacheDirectiveOp(client, "pool", "/path"); + testClientRetryWithFailover(op); + } + + @Test (timeout=60000) + public void testRemovePathBasedCacheDescriptor() throws Exception { + DFSClient client = genClientWithDummyHandler(); + AtMostOnceOp op = new RemovePathBasedCacheDescriptorOp(client, "pool", + "/path"); + testClientRetryWithFailover(op); + } + + @Test (timeout=60000) + public void testAddCachePool() throws Exception { + DFSClient client = genClientWithDummyHandler(); + AtMostOnceOp op = new AddCachePoolOp(client, "pool"); + testClientRetryWithFailover(op); + } + + @Test (timeout=60000) + public void testModifyCachePool() throws Exception { + DFSClient client = genClientWithDummyHandler(); + AtMostOnceOp op = new ModifyCachePoolOp(client, "pool"); + testClientRetryWithFailover(op); + } + + @Test (timeout=60000) + public void testRemoveCachePool() throws Exception { + DFSClient client = genClientWithDummyHandler(); + AtMostOnceOp op = new RemoveCachePoolOp(client, "pool"); + testClientRetryWithFailover(op); + } + /** * When NN failover happens, if the client did not receive the response and * send a retry request to the other NN, the same response should be recieved diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored index ffd6601a9b..17b95dabbb 100644 Binary files a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored and b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored differ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored.xml b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored.xml index 160995daad..f013c25a72 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored.xml @@ -822,6 +822,8 @@ 493 100 + 27ac79f0-d378-4933-824b-c2a188968d97 + 75 @@ -833,6 +835,8 @@ party 448 1989 + 27ac79f0-d378-4933-824b-c2a188968d97 + 76 @@ -842,6 +846,8 @@ /bar 1 poolparty + 27ac79f0-d378-4933-824b-c2a188968d97 + 77 @@ -849,6 +855,8 @@ 64 1 + 27ac79f0-d378-4933-824b-c2a188968d97 + 78 @@ -856,6 +864,8 @@ 65 poolparty + 27ac79f0-d378-4933-824b-c2a188968d97 + 79