diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteMethod.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteMethod.java index f7ba8123d5..6f1121ef9f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteMethod.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteMethod.java @@ -21,6 +21,8 @@ import java.lang.reflect.Method; import java.util.Arrays; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -198,9 +200,16 @@ public Object[] getParams(RemoteLocationContext context) { for (int i = 0; i < this.params.length; i++) { Object currentObj = this.params[i]; if (currentObj instanceof RemoteParam) { - // Map the parameter using the context RemoteParam paramGetter = (RemoteParam) currentObj; - objList[i] = paramGetter.getParameterForContext(context); + // Map the parameter using the context + if (this.types[i] == CacheDirectiveInfo.class) { + CacheDirectiveInfo path = + (CacheDirectiveInfo) paramGetter.getParameterForContext(context); + objList[i] = new CacheDirectiveInfo.Builder(path) + .setPath(new Path(context.getDest())).build(); + } else { + objList[i] = paramGetter.getParameterForContext(context); + } } else { objList[i] = currentObj; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterCacheAdmin.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterCacheAdmin.java new file mode 100644 index 0000000000..e25d8b269d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterCacheAdmin.java @@ -0,0 +1,173 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import java.io.IOException; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.fs.CacheFlag; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries; +import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; +import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; +import org.apache.hadoop.hdfs.protocol.CachePoolEntry; +import org.apache.hadoop.hdfs.protocol.CachePoolInfo; +import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo; +import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; +import org.apache.hadoop.hdfs.server.namenode.NameNode; + +/** + * Module that implements all the RPC calls in + * {@link org.apache.hadoop.hdfs.protocol.ClientProtocol} related to Cache Admin + * in the {@link RouterRpcServer}. + */ +public class RouterCacheAdmin { + + /** RPC server to receive client calls. */ + private final RouterRpcServer rpcServer; + /** RPC clients to connect to the Namenodes. */ + private final RouterRpcClient rpcClient; + /** Interface to identify the active NN for a nameservice or blockpool ID. */ + private final ActiveNamenodeResolver namenodeResolver; + + public RouterCacheAdmin(RouterRpcServer server) { + this.rpcServer = server; + this.rpcClient = this.rpcServer.getRPCClient(); + this.namenodeResolver = this.rpcClient.getNamenodeResolver(); + } + + public long addCacheDirective(CacheDirectiveInfo path, + EnumSet flags) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.WRITE, true); + final List locations = + rpcServer.getLocationsForPath(path.getPath().toString(), true, false); + RemoteMethod method = new RemoteMethod("addCacheDirective", + new Class[] {CacheDirectiveInfo.class, EnumSet.class}, + new RemoteParam(getRemoteMap(path, locations)), flags); + Map response = + rpcClient.invokeConcurrent(locations, method, false, false, long.class); + return response.values().iterator().next(); + } + + public void modifyCacheDirective(CacheDirectiveInfo directive, + EnumSet flags) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.WRITE, true); + Path p = directive.getPath(); + if (p != null) { + final List locations = rpcServer + .getLocationsForPath(directive.getPath().toString(), true, false); + RemoteMethod method = new RemoteMethod("modifyCacheDirective", + new Class[] {CacheDirectiveInfo.class, EnumSet.class}, + new RemoteParam(getRemoteMap(directive, locations)), flags); + rpcClient.invokeConcurrent(locations, method); + return; + } + RemoteMethod method = new RemoteMethod("modifyCacheDirective", + new Class[] {CacheDirectiveInfo.class, EnumSet.class}, directive, + flags); + Set nss = namenodeResolver.getNamespaces(); + rpcClient.invokeConcurrent(nss, method, false, false); + } + + public void removeCacheDirective(long id) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.WRITE, true); + RemoteMethod method = new RemoteMethod("removeCacheDirective", + new Class[] {long.class}, id); + Set nss = namenodeResolver.getNamespaces(); + rpcClient.invokeConcurrent(nss, method, false, false); + } + + public BatchedEntries listCacheDirectives(long prevId, + CacheDirectiveInfo filter) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.READ, true); + if (filter.getPath() != null) { + final List locations = rpcServer + .getLocationsForPath(filter.getPath().toString(), true, false); + RemoteMethod method = new RemoteMethod("listCacheDirectives", + new Class[] {long.class, CacheDirectiveInfo.class}, prevId, + new RemoteParam(getRemoteMap(filter, locations))); + Map response = rpcClient.invokeConcurrent( + locations, method, false, false, BatchedEntries.class); + return response.values().iterator().next(); + } + RemoteMethod method = new RemoteMethod("listCacheDirectives", + new Class[] {long.class, CacheDirectiveInfo.class}, prevId, + filter); + Set nss = namenodeResolver.getNamespaces(); + Map results = rpcClient + .invokeConcurrent(nss, method, true, false, BatchedEntries.class); + return results.values().iterator().next(); + } + + public void addCachePool(CachePoolInfo info) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.WRITE, true); + RemoteMethod method = new RemoteMethod("addCachePool", + new Class[] {CachePoolInfo.class}, info); + Set nss = namenodeResolver.getNamespaces(); + rpcClient.invokeConcurrent(nss, method, true, false); + } + + public void modifyCachePool(CachePoolInfo info) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.WRITE, true); + RemoteMethod method = new RemoteMethod("modifyCachePool", + new Class[] {CachePoolInfo.class}, info); + Set nss = namenodeResolver.getNamespaces(); + rpcClient.invokeConcurrent(nss, method, true, false); + } + + public void removeCachePool(String cachePoolName) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.WRITE, true); + RemoteMethod method = new RemoteMethod("removeCachePool", + new Class[] {String.class}, cachePoolName); + Set nss = namenodeResolver.getNamespaces(); + rpcClient.invokeConcurrent(nss, method, true, false); + } + + public BatchedEntries listCachePools(String prevKey) + throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.READ, true); + RemoteMethod method = new RemoteMethod("listCachePools", + new Class[] {String.class}, prevKey); + Set nss = namenodeResolver.getNamespaces(); + Map results = rpcClient + .invokeConcurrent(nss, method, true, false, BatchedEntries.class); + return results.values().iterator().next(); + } + + /** + * Returns a map with the CacheDirectiveInfo mapped to each location. + * @param path CacheDirectiveInfo to be mapped to the locations. + * @param locations the locations to map. + * @return map with CacheDirectiveInfo mapped to the locations. + */ + private Map getRemoteMap( + CacheDirectiveInfo path, final List locations) { + final Map dstMap = new HashMap<>(); + Iterator iterator = locations.iterator(); + while (iterator.hasNext()) { + dstMap.put(iterator.next(), path); + } + return dstMap; + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java index 550f5e7e90..66718fb12e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java @@ -132,6 +132,8 @@ public class RouterClientProtocol implements ClientProtocol { private final String superGroup; /** Erasure coding calls. */ private final ErasureCoding erasureCoding; + /** Cache Admin calls. */ + private final RouterCacheAdmin routerCacheAdmin; /** StoragePolicy calls. **/ private final RouterStoragePolicy storagePolicy; /** Router security manager to handle token operations. */ @@ -164,6 +166,7 @@ public class RouterClientProtocol implements ClientProtocol { DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT); this.erasureCoding = new ErasureCoding(rpcServer); this.storagePolicy = new RouterStoragePolicy(rpcServer); + this.routerCacheAdmin = new RouterCacheAdmin(rpcServer); this.securityManager = rpcServer.getRouterSecurityManager(); } @@ -1259,48 +1262,45 @@ public SnapshotDiffReportListing getSnapshotDiffReportListing( @Override public long addCacheDirective(CacheDirectiveInfo path, EnumSet flags) throws IOException { - rpcServer.checkOperation(NameNode.OperationCategory.WRITE, false); - return 0; + return routerCacheAdmin.addCacheDirective(path, flags); } @Override public void modifyCacheDirective(CacheDirectiveInfo directive, EnumSet flags) throws IOException { - rpcServer.checkOperation(NameNode.OperationCategory.WRITE, false); + routerCacheAdmin.modifyCacheDirective(directive, flags); } @Override public void removeCacheDirective(long id) throws IOException { - rpcServer.checkOperation(NameNode.OperationCategory.WRITE, false); + routerCacheAdmin.removeCacheDirective(id); } @Override - public BatchedEntries listCacheDirectives( - long prevId, CacheDirectiveInfo filter) throws IOException { - rpcServer.checkOperation(NameNode.OperationCategory.READ, false); - return null; + public BatchedEntries listCacheDirectives(long prevId, + CacheDirectiveInfo filter) throws IOException { + return routerCacheAdmin.listCacheDirectives(prevId, filter); } @Override public void addCachePool(CachePoolInfo info) throws IOException { - rpcServer.checkOperation(NameNode.OperationCategory.WRITE, false); + routerCacheAdmin.addCachePool(info); } @Override public void modifyCachePool(CachePoolInfo info) throws IOException { - rpcServer.checkOperation(NameNode.OperationCategory.WRITE, false); + routerCacheAdmin.modifyCachePool(info); } @Override public void removeCachePool(String cachePoolName) throws IOException { - rpcServer.checkOperation(NameNode.OperationCategory.WRITE, false); + routerCacheAdmin.removeCachePool(cachePoolName); } @Override public BatchedEntries listCachePools(String prevKey) throws IOException { - rpcServer.checkOperation(NameNode.OperationCategory.READ, false); - return null; + return routerCacheAdmin.listCachePools(prevKey); } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java index d943076775..2f7eb6e917 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java @@ -61,13 +61,18 @@ import org.apache.hadoop.fs.FsServerDefaults; import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.NameNodeProxies; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; +import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; +import org.apache.hadoop.hdfs.protocol.CachePoolEntry; +import org.apache.hadoop.hdfs.protocol.CachePoolInfo; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DirectoryListing; @@ -1439,6 +1444,63 @@ public Boolean get() { cluster.waitNamenodeRegistration(); } + @Test + public void testCacheAdmin() throws Exception { + DistributedFileSystem routerDFS = (DistributedFileSystem) routerFS; + // Verify cache directive commands. + CachePoolInfo cpInfo = new CachePoolInfo("Check"); + cpInfo.setOwnerName("Owner"); + + // Add a cache pool. + routerProtocol.addCachePool(cpInfo); + RemoteIterator iter = routerDFS.listCachePools(); + assertTrue(iter.hasNext()); + + // Modify a cache pool. + CachePoolInfo info = iter.next().getInfo(); + assertEquals("Owner", info.getOwnerName()); + cpInfo.setOwnerName("new Owner"); + routerProtocol.modifyCachePool(cpInfo); + iter = routerDFS.listCachePools(); + assertTrue(iter.hasNext()); + info = iter.next().getInfo(); + assertEquals("new Owner", info.getOwnerName()); + + // Remove a cache pool. + routerProtocol.removeCachePool("Check"); + iter = routerDFS.listCachePools(); + assertFalse(iter.hasNext()); + + // Verify cache directive commands. + cpInfo.setOwnerName("Owner"); + routerProtocol.addCachePool(cpInfo); + routerDFS.mkdirs(new Path("/ns1/dir")); + + // Add a cache directive. + CacheDirectiveInfo cacheDir = new CacheDirectiveInfo.Builder() + .setPath(new Path("/ns1/dir")) + .setReplication((short) 1) + .setPool("Check") + .build(); + long id = routerDFS.addCacheDirective(cacheDir); + CacheDirectiveInfo filter = + new CacheDirectiveInfo.Builder().setPath(new Path("/ns1/dir")).build(); + assertTrue(routerDFS.listCacheDirectives(filter).hasNext()); + + // List cache directive. + assertEquals("Check", + routerDFS.listCacheDirectives(filter).next().getInfo().getPool()); + cacheDir = new CacheDirectiveInfo.Builder().setReplication((short) 2) + .setId(id).setPath(new Path("/ns1/dir")).build(); + + // Modify cache directive. + routerDFS.modifyCacheDirective(cacheDir); + assertEquals((short) 2, (short) routerDFS.listCacheDirectives(filter).next() + .getInfo().getReplication()); + routerDFS.removeCacheDirective(id); + assertFalse(routerDFS.listCacheDirectives(filter).hasNext()); + } + /** * Check the erasure coding policies in the Router and the Namenode. * @return The erasure coding policies.