diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/AddErasureCodingPolicyResponse.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/AddErasureCodingPolicyResponse.java index 2e8d081c19..dc77a47a94 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/AddErasureCodingPolicyResponse.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/AddErasureCodingPolicyResponse.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hdfs.protocol; +import org.apache.commons.lang.builder.EqualsBuilder; +import org.apache.commons.lang.builder.HashCodeBuilder; import org.apache.hadoop.HadoopIllegalArgumentException; /** @@ -65,4 +67,26 @@ public String toString() { + "error message is " + getErrorMsg(); } } + + @Override + public boolean equals(Object o) { + if (o instanceof AddErasureCodingPolicyResponse) { + AddErasureCodingPolicyResponse other = (AddErasureCodingPolicyResponse) o; + return new EqualsBuilder() + .append(policy, other.policy) + .append(succeed, other.succeed) + .append(errorMsg, other.errorMsg) + .isEquals(); + } + return false; + } + + @Override + public int hashCode() { + return new HashCodeBuilder(303855623, 582626729) + .append(policy) + .append(succeed) + .append(errorMsg) + .toHashCode(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ErasureCoding.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ErasureCoding.java new file mode 100644 index 0000000000..d2b2d50fdb --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ErasureCoding.java @@ -0,0 +1,198 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import static org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer.merge; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse; +import org.apache.hadoop.hdfs.protocol.ECBlockGroupStats; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo; +import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo; +import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; +import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory; + +/** + * Module that implements all the RPC calls in + * {@link org.apache.hadoop.hdfs.protocol.ClientProtocol} related to + * Erasure Coding in the {@link RouterRpcServer}. + */ +public class ErasureCoding { + + /** RPC server to receive client calls. */ + private final RouterRpcServer rpcServer; + /** RPC clients to connect to the Namenodes. */ + private final RouterRpcClient rpcClient; + /** Interface to identify the active NN for a nameservice or blockpool ID. */ + private final ActiveNamenodeResolver namenodeResolver; + + + public ErasureCoding(RouterRpcServer server) { + this.rpcServer = server; + this.rpcClient = this.rpcServer.getRPCClient(); + this.namenodeResolver = this.rpcClient.getNamenodeResolver(); + } + + public ErasureCodingPolicyInfo[] getErasureCodingPolicies() + throws IOException { + rpcServer.checkOperation(OperationCategory.READ); + + RemoteMethod method = new RemoteMethod("getErasureCodingPolicies"); + Set nss = namenodeResolver.getNamespaces(); + Map ret = + rpcClient.invokeConcurrent( + nss, method, true, false, ErasureCodingPolicyInfo[].class); + return merge(ret, ErasureCodingPolicyInfo.class); + } + + public Map getErasureCodingCodecs() throws IOException { + rpcServer.checkOperation(OperationCategory.READ); + + RemoteMethod method = new RemoteMethod("getErasureCodingCodecs"); + Set nss = namenodeResolver.getNamespaces(); + @SuppressWarnings("rawtypes") + Map retCodecs = + rpcClient.invokeConcurrent( + nss, method, true, false, Map.class); + + Map ret = new HashMap<>(); + Object obj = retCodecs; + @SuppressWarnings("unchecked") + Map> results = + (Map>)obj; + Collection> allCodecs = results.values(); + for (Map codecs : allCodecs) { + ret.putAll(codecs); + } + + return ret; + } + + public AddErasureCodingPolicyResponse[] addErasureCodingPolicies( + ErasureCodingPolicy[] policies) throws IOException { + rpcServer.checkOperation(OperationCategory.WRITE); + + RemoteMethod method = new RemoteMethod("addErasureCodingPolicies", + new Class[] {ErasureCodingPolicy[].class}, new Object[] {policies}); + Set nss = namenodeResolver.getNamespaces(); + Map ret = + rpcClient.invokeConcurrent( + nss, method, true, false, AddErasureCodingPolicyResponse[].class); + + return merge(ret, AddErasureCodingPolicyResponse.class); + } + + public void removeErasureCodingPolicy(String ecPolicyName) + throws IOException { + rpcServer.checkOperation(OperationCategory.WRITE); + + RemoteMethod method = new RemoteMethod("removeErasureCodingPolicy", + new Class[] {String.class}, ecPolicyName); + Set nss = namenodeResolver.getNamespaces(); + rpcClient.invokeConcurrent(nss, method, true, false); + } + + public void disableErasureCodingPolicy(String ecPolicyName) + throws IOException { + rpcServer.checkOperation(OperationCategory.WRITE); + + RemoteMethod method = new RemoteMethod("disableErasureCodingPolicy", + new Class[] {String.class}, ecPolicyName); + Set nss = namenodeResolver.getNamespaces(); + rpcClient.invokeConcurrent(nss, method, true, false); + } + + public void enableErasureCodingPolicy(String ecPolicyName) + throws IOException { + rpcServer.checkOperation(OperationCategory.WRITE); + + RemoteMethod method = new RemoteMethod("enableErasureCodingPolicy", + new Class[] {String.class}, ecPolicyName); + Set nss = namenodeResolver.getNamespaces(); + rpcClient.invokeConcurrent(nss, method, true, false); + } + + public ErasureCodingPolicy getErasureCodingPolicy(String src) + throws IOException { + rpcServer.checkOperation(OperationCategory.READ); + + final List locations = + rpcServer.getLocationsForPath(src, true); + RemoteMethod remoteMethod = new RemoteMethod("getErasureCodingPolicy", + new Class[] {String.class}, new RemoteParam()); + ErasureCodingPolicy ret = rpcClient.invokeSequential( + locations, remoteMethod, null, null); + return ret; + } + + public void setErasureCodingPolicy(String src, String ecPolicyName) + throws IOException { + rpcServer.checkOperation(OperationCategory.WRITE); + + final List locations = + rpcServer.getLocationsForPath(src, true); + RemoteMethod remoteMethod = new RemoteMethod("setErasureCodingPolicy", + new Class[] {String.class, String.class}, + new RemoteParam(), ecPolicyName); + rpcClient.invokeSequential(locations, remoteMethod, null, null); + } + + public void unsetErasureCodingPolicy(String src) throws IOException { + rpcServer.checkOperation(OperationCategory.WRITE); + + final List locations = + rpcServer.getLocationsForPath(src, true); + RemoteMethod remoteMethod = new RemoteMethod("unsetErasureCodingPolicy", + new Class[] {String.class}, new RemoteParam()); + rpcClient.invokeSequential(locations, remoteMethod, null, null); + } + + public ECBlockGroupStats getECBlockGroupStats() throws IOException { + rpcServer.checkOperation(OperationCategory.READ); + + RemoteMethod method = new RemoteMethod("getECBlockGroupStats"); + Set nss = namenodeResolver.getNamespaces(); + Map allStats = + rpcClient.invokeConcurrent( + nss, method, true, false, ECBlockGroupStats.class); + + // Merge the stats from all the namespaces + long lowRedundancyBlockGroups = 0; + long corruptBlockGroups = 0; + long missingBlockGroups = 0; + long bytesInFutureBlockGroups = 0; + long pendingDeletionBlocks = 0; + for (ECBlockGroupStats stats : allStats.values()) { + lowRedundancyBlockGroups += stats.getLowRedundancyBlockGroups(); + corruptBlockGroups += stats.getCorruptBlockGroups(); + missingBlockGroups += stats.getMissingBlockGroups(); + bytesInFutureBlockGroups += stats.getBytesInFutureBlockGroups(); + pendingDeletionBlocks += stats.getPendingDeletionBlocks(); + } + return new ECBlockGroupStats(lowRedundancyBlockGroups, corruptBlockGroups, + missingBlockGroups, bytesInFutureBlockGroups, pendingDeletionBlocks); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Quota.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Quota.java index f5e5272ac1..dbb6ffa0fd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Quota.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Quota.java @@ -94,8 +94,8 @@ public QuotaUsage getQuotaUsage(String path) throws IOException { final List quotaLocs = getValidQuotaLocations(path); RemoteMethod method = new RemoteMethod("getQuotaUsage", new Class[] {String.class}, new RemoteParam()); - Map results = rpcClient.invokeConcurrent(quotaLocs, - method, true, false); + Map results = rpcClient.invokeConcurrent( + quotaLocs, method, true, false, QuotaUsage.class); return aggregateQuota(results); } @@ -151,14 +151,14 @@ private List getValidQuotaLocations(String path) * @param results Quota query result. * @return Aggregated Quota. */ - private QuotaUsage aggregateQuota(Map results) { + private QuotaUsage aggregateQuota(Map results) { long nsCount = 0; long ssCount = 0; boolean hasQuotaUnSet = false; - for (Map.Entry entry : results.entrySet()) { + for (Map.Entry entry : results.entrySet()) { RemoteLocation loc = entry.getKey(); - QuotaUsage usage = (QuotaUsage) entry.getValue(); + QuotaUsage usage = entry.getValue(); if (usage != null) { // If quota is not set in real FileSystem, the usage // value will return -1. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java index cac37132ad..4209a49302 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java @@ -147,6 +147,14 @@ public RouterRpcClient(Configuration conf, String identifier, failoverSleepBaseMillis, failoverSleepMaxMillis); } + /** + * Get the active namenode resolver used by this client. + * @return Active namenode resolver. + */ + public ActiveNamenodeResolver getNamenodeResolver() { + return this.namenodeResolver; + } + /** * Shutdown the client. */ @@ -617,9 +625,9 @@ public Object invokeSequential( * @throws IOException if the success condition is not met, return the first * remote exception generated. */ - public Object invokeSequential( + public T invokeSequential( final List locations, - final RemoteMethod remoteMethod, Class expectedResultClass, + final RemoteMethod remoteMethod, Class expectedResultClass, Object expectedResultValue) throws IOException { final UserGroupInformation ugi = RouterRpcServer.getRemoteUser(); @@ -639,7 +647,9 @@ public Object invokeSequential( if (isExpectedClass(expectedResultClass, result) && isExpectedValue(expectedResultValue, result)) { // Valid result, stop here - return result; + @SuppressWarnings("unchecked") + T ret = (T)result; + return ret; } if (firstResult == null) { firstResult = result; @@ -669,7 +679,9 @@ public Object invokeSequential( throw firstThrownException; } // Return the last result, whether it is the value we are looking for or a - return firstResult; + @SuppressWarnings("unchecked") + T ret = (T)firstResult; + return ret; } /** @@ -709,7 +721,7 @@ private static boolean isExpectedValue(Object expectedValue, Object value) { } /** - * Invokes multiple concurrent proxy calls to different clients. Returns an + * Invoke multiple concurrent proxy calls to different clients. Returns an * array of results. * * Re-throws exceptions generated by the remote RPC call as either @@ -722,14 +734,12 @@ private static boolean isExpectedValue(Object expectedValue, Object value) { * not complete. If false exceptions are ignored and all data results * successfully received are returned. * @param standby If the requests should go to the standby namenodes too. - * @return Result of invoking the method per subcluster: nsId -> result. - * @throws IOException If requiredResponse=true and any of the calls throw an - * exception. + * @throws IOException If all the calls throw an exception. */ - public Map invokeConcurrent( + public void invokeConcurrent( final Collection locations, final RemoteMethod method, boolean requireResponse, boolean standby) throws IOException { - return invokeConcurrent(locations, method, requireResponse, standby, -1); + invokeConcurrent(locations, method, requireResponse, standby, void.class); } /** @@ -739,6 +749,36 @@ public Map invokeConcurrent( * Re-throws exceptions generated by the remote RPC call as either * RemoteException or IOException. * + * @param The type of the remote location. + * @param The type of the remote method return. + * @param locations List of remote locations to call concurrently. + * @param method The remote method and parameters to invoke. + * @param requireResponse If true an exception will be thrown if all calls do + * not complete. If false exceptions are ignored and all data results + * successfully received are returned. + * @param standby If the requests should go to the standby namenodes too. + * @param clazz Type of the remote return type. + * @return Result of invoking the method per subcluster: nsId -> result. + * @throws IOException If requiredResponse=true and any of the calls throw an + * exception. + */ + public Map invokeConcurrent( + final Collection locations, final RemoteMethod method, + boolean requireResponse, boolean standby, Class clazz) + throws IOException { + return invokeConcurrent( + locations, method, requireResponse, standby, -1, clazz); + } + + /** + * Invokes multiple concurrent proxy calls to different clients. Returns an + * array of results. + * + * Re-throws exceptions generated by the remote RPC call as either + * RemoteException or IOException. + * + * @param The type of the remote location. + * @param The type of the remote method return. * @param locations List of remote locations to call concurrently. * @param method The remote method and parameters to invoke. * @param requireResponse If true an exception will be thrown if all calls do @@ -746,14 +786,15 @@ public Map invokeConcurrent( * successfully received are returned. * @param standby If the requests should go to the standby namenodes too. * @param timeOutMs Timeout for each individual call. + * @param clazz Type of the remote return type. * @return Result of invoking the method per subcluster: nsId -> result. * @throws IOException If requiredResponse=true and any of the calls throw an * exception. */ @SuppressWarnings("unchecked") - public Map invokeConcurrent( + public Map invokeConcurrent( final Collection locations, final RemoteMethod method, - boolean requireResponse, boolean standby, long timeOutMs) + boolean requireResponse, boolean standby, long timeOutMs, Class clazz) throws IOException { final UserGroupInformation ugi = RouterRpcServer.getRemoteUser(); @@ -767,7 +808,7 @@ public Map invokeConcurrent( getNamenodesForNameservice(ns); Object[] paramList = method.getParams(location); Object result = invokeMethod(ugi, namenodes, m, paramList); - return Collections.singletonMap(location, result); + return Collections.singletonMap(location, clazz.cast(result)); } List orderedLocations = new LinkedList<>(); @@ -817,14 +858,14 @@ public Object call() throws Exception { } else { futures = executorService.invokeAll(callables); } - Map results = new TreeMap<>(); + Map results = new TreeMap<>(); Map exceptions = new TreeMap<>(); for (int i=0; i future = futures.get(i); Object result = future.get(); - results.put(location, result); + results.put(location, clazz.cast(result)); } catch (CancellationException ce) { T loc = orderedLocations.get(i); String msg = diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index 73b189ed9b..9afd441ccd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -28,12 +28,14 @@ import java.io.FileNotFoundException; import java.io.IOException; +import java.lang.reflect.Array; import java.net.InetSocketAddress; import java.util.Collection; import java.util.EnumSet; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -181,8 +183,12 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol { /** Category of the operation that a thread is executing. */ private final ThreadLocal opCategory = new ThreadLocal<>(); + // Modules implementing groups of RPC calls /** Router Quota calls. */ private final Quota quotaCall; + /** Erasure coding calls. */ + private final ErasureCoding erasureCoding; + /** * Construct a router RPC server. @@ -282,6 +288,7 @@ public RouterRpcServer(Configuration configuration, Router router, // Initialize modules this.quotaCall = new Quota(this.router, this); + this.erasureCoding = new ErasureCoding(this); } @Override @@ -367,7 +374,7 @@ public InetSocketAddress getRpcAddress() { * client requests. * @throws UnsupportedOperationException If the operation is not supported. */ - private void checkOperation(OperationCategory op, boolean supported) + protected void checkOperation(OperationCategory op, boolean supported) throws StandbyException, UnsupportedOperationException { checkOperation(op); @@ -949,8 +956,9 @@ public DirectoryListing getListing(String src, byte[] startAfter, RemoteMethod method = new RemoteMethod("getListing", new Class[] {String.class, startAfter.getClass(), boolean.class}, new RemoteParam(), startAfter, needLocation); - Map listings = - rpcClient.invokeConcurrent(locations, method, false, false); + Map listings = + rpcClient.invokeConcurrent( + locations, method, false, false, DirectoryListing.class); Map nnListing = new TreeMap<>(); int totalRemainingEntries = 0; @@ -959,9 +967,10 @@ public DirectoryListing getListing(String src, byte[] startAfter, if (listings != null) { // Check the subcluster listing with the smallest name String lastName = null; - for (Entry entry : listings.entrySet()) { + for (Entry entry : + listings.entrySet()) { RemoteLocation location = entry.getKey(); - DirectoryListing listing = (DirectoryListing) entry.getValue(); + DirectoryListing listing = entry.getValue(); if (listing == null) { LOG.debug("Cannot get listing from {}", location); } else { @@ -1097,11 +1106,10 @@ public long[] getStats() throws IOException { RemoteMethod method = new RemoteMethod("getStats"); Set nss = namenodeResolver.getNamespaces(); - Map results = - rpcClient.invokeConcurrent(nss, method, true, false); + Map results = + rpcClient.invokeConcurrent(nss, method, true, false, long[].class); long[] combinedData = new long[STATS_ARRAY_LENGTH]; - for (Object o : results.values()) { - long[] data = (long[]) o; + for (long[] data : results.values()) { for (int i = 0; i < combinedData.length && i < data.length; i++) { if (data[i] >= 0) { combinedData[i] += data[i]; @@ -1134,11 +1142,13 @@ public DatanodeInfo[] getDatanodeReport( new Class[] {DatanodeReportType.class}, type); Set nss = namenodeResolver.getNamespaces(); - Map results = - rpcClient.invokeConcurrent(nss, method, true, false, timeOutMs); - for (Entry entry : results.entrySet()) { + Map results = + rpcClient.invokeConcurrent( + nss, method, true, false, timeOutMs, DatanodeInfo[].class); + for (Entry entry : + results.entrySet()) { FederationNamespaceInfo ns = entry.getKey(); - DatanodeInfo[] result = (DatanodeInfo[]) entry.getValue(); + DatanodeInfo[] result = entry.getValue(); for (DatanodeInfo node : result) { String nodeId = node.getXferAddr(); if (!datanodesMap.containsKey(nodeId)) { @@ -1168,10 +1178,10 @@ public DatanodeStorageReport[] getDatanodeStorageReport( RemoteMethod method = new RemoteMethod("getDatanodeStorageReport", new Class[] {DatanodeReportType.class}, type); Set nss = namenodeResolver.getNamespaces(); - Map results = - rpcClient.invokeConcurrent(nss, method, true, false); - for (Object r : results.values()) { - DatanodeStorageReport[] result = (DatanodeStorageReport[]) r; + Map results = + rpcClient.invokeConcurrent( + nss, method, true, false, DatanodeStorageReport[].class); + for (DatanodeStorageReport[] result : results.values()) { for (DatanodeStorageReport node : result) { String nodeId = node.getDatanodeInfo().getXferAddr(); if (!datanodesMap.containsKey(nodeId)) { @@ -1199,17 +1209,14 @@ public boolean setSafeMode(SafeModeAction action, boolean isChecked) new Class[] {SafeModeAction.class, boolean.class}, action, isChecked); Set nss = namenodeResolver.getNamespaces(); - Map results = - rpcClient.invokeConcurrent(nss, method, true, true); + Map results = + rpcClient.invokeConcurrent(nss, method, true, true, boolean.class); // We only report true if all the name space are in safe mode int numSafemode = 0; - for (Object result : results.values()) { - if (result instanceof Boolean) { - boolean safemode = (boolean) result; - if (safemode) { - numSafemode++; - } + for (boolean safemode : results.values()) { + if (safemode) { + numSafemode++; } } return numSafemode == results.size(); @@ -1222,18 +1229,14 @@ public boolean restoreFailedStorage(String arg) throws IOException { RemoteMethod method = new RemoteMethod("restoreFailedStorage", new Class[] {String.class}, arg); final Set nss = namenodeResolver.getNamespaces(); - Map ret = - rpcClient.invokeConcurrent(nss, method, true, false); + Map ret = + rpcClient.invokeConcurrent(nss, method, true, false, boolean.class); boolean success = true; - Object obj = ret; - @SuppressWarnings("unchecked") - Map results = - (Map)obj; - Collection sucesses = results.values(); - for (boolean s : sucesses) { + for (boolean s : ret.values()) { if (!s) { success = false; + break; } } return success; @@ -1246,18 +1249,14 @@ public boolean saveNamespace(long timeWindow, long txGap) throws IOException { RemoteMethod method = new RemoteMethod("saveNamespace", new Class[] {Long.class, Long.class}, timeWindow, txGap); final Set nss = namenodeResolver.getNamespaces(); - Map ret = - rpcClient.invokeConcurrent(nss, method, true, false); + Map ret = + rpcClient.invokeConcurrent(nss, method, true, false, boolean.class); boolean success = true; - Object obj = ret; - @SuppressWarnings("unchecked") - Map results = - (Map)obj; - Collection sucesses = results.values(); - for (boolean s : sucesses) { + for (boolean s : ret.values()) { if (!s) { success = false; + break; } } return success; @@ -1269,17 +1268,12 @@ public long rollEdits() throws IOException { RemoteMethod method = new RemoteMethod("rollEdits", new Class[] {}); final Set nss = namenodeResolver.getNamespaces(); - Map ret = - rpcClient.invokeConcurrent(nss, method, true, false); + Map ret = + rpcClient.invokeConcurrent(nss, method, true, false, long.class); // Return the maximum txid long txid = 0; - Object obj = ret; - @SuppressWarnings("unchecked") - Map results = - (Map)obj; - Collection txids = results.values(); - for (long t : txids) { + for (long t : ret.values()) { if (t > txid) { txid = t; } @@ -1314,17 +1308,13 @@ public RollingUpgradeInfo rollingUpgrade(RollingUpgradeAction action) RemoteMethod method = new RemoteMethod("rollingUpgrade", new Class[] {RollingUpgradeAction.class}, action); final Set nss = namenodeResolver.getNamespaces(); - Map ret = - rpcClient.invokeConcurrent(nss, method, true, false); + Map ret = + rpcClient.invokeConcurrent( + nss, method, true, false, RollingUpgradeInfo.class); // Return the first rolling upgrade info RollingUpgradeInfo info = null; - Object obj = ret; - @SuppressWarnings("unchecked") - Map results = - (Map)obj; - Collection infos = results.values(); - for (RollingUpgradeInfo infoNs : infos) { + for (RollingUpgradeInfo infoNs : ret.values()) { if (info == null && infoNs != null) { info = infoNs; } @@ -1376,10 +1366,9 @@ public ContentSummary getContentSummary(String path) throws IOException { final List locations = getLocationsForPath(path, false); RemoteMethod method = new RemoteMethod("getContentSummary", new Class[] {String.class}, new RemoteParam()); - @SuppressWarnings("unchecked") - Map results = - (Map) ((Object)rpcClient.invokeConcurrent( - locations, method, false, false)); + Map results = + rpcClient.invokeConcurrent( + locations, method, false, false, ContentSummary.class); summaries.addAll(results.values()); } catch (FileNotFoundException e) { notFoundException = e; @@ -1773,17 +1762,12 @@ public long getCurrentEditLogTxid() throws IOException { RemoteMethod method = new RemoteMethod( "getCurrentEditLogTxid", new Class[] {}); final Set nss = namenodeResolver.getNamespaces(); - Map ret = - rpcClient.invokeConcurrent(nss, method, true, false); + Map ret = + rpcClient.invokeConcurrent(nss, method, true, false, long.class); // Return the maximum txid long txid = 0; - Object obj = ret; - @SuppressWarnings("unchecked") - Map results = - (Map)obj; - Collection txids = results.values(); - for (long t : txids) { + for (long t : ret.values()) { if (t > txid) { txid = t; } @@ -1816,31 +1800,6 @@ public void deleteSnapshot(String snapshotRoot, String snapshotName) checkOperation(OperationCategory.WRITE, false); } - @Override - public ErasureCodingPolicyInfo[] getErasureCodingPolicies() - throws IOException { - checkOperation(OperationCategory.READ, false); - return null; - } - - @Override // ClientProtocol - public ErasureCodingPolicy getErasureCodingPolicy(String src) - throws IOException { - checkOperation(OperationCategory.READ, false); - return null; - } - - @Override // ClientProtocol - public void setErasureCodingPolicy(String src, String ecPolicyName) - throws IOException { - checkOperation(OperationCategory.WRITE, false); - } - - @Override // ClientProtocol - public void unsetErasureCodingPolicy(String src) throws IOException { - checkOperation(OperationCategory.WRITE, false); - } - @Override // ClientProtocol public void setQuota(String path, long namespaceQuota, long storagespaceQuota, StorageType type) throws IOException { @@ -1894,38 +1853,61 @@ public BlockStoragePolicy getStoragePolicy(String path) throws IOException { return null; } - @Override + @Override // ClientProtocol + public ErasureCodingPolicyInfo[] getErasureCodingPolicies() + throws IOException { + return erasureCoding.getErasureCodingPolicies(); + } + + @Override // ClientProtocol + public Map getErasureCodingCodecs() throws IOException { + return erasureCoding.getErasureCodingCodecs(); + } + + @Override // ClientProtocol public AddErasureCodingPolicyResponse[] addErasureCodingPolicies( ErasureCodingPolicy[] policies) throws IOException { - checkOperation(OperationCategory.WRITE, false); - return null; + return erasureCoding.addErasureCodingPolicies(policies); } - @Override - public void removeErasureCodingPolicy(String arg0) throws IOException { - checkOperation(OperationCategory.WRITE, false); + @Override // ClientProtocol + public void removeErasureCodingPolicy(String ecPolicyName) + throws IOException { + erasureCoding.removeErasureCodingPolicy(ecPolicyName); } - @Override - public void disableErasureCodingPolicy(String arg0) throws IOException { - checkOperation(OperationCategory.WRITE, false); + @Override // ClientProtocol + public void disableErasureCodingPolicy(String ecPolicyName) + throws IOException { + erasureCoding.disableErasureCodingPolicy(ecPolicyName); } - @Override - public void enableErasureCodingPolicy(String arg0) throws IOException { - checkOperation(OperationCategory.WRITE, false); + @Override // ClientProtocol + public void enableErasureCodingPolicy(String ecPolicyName) + throws IOException { + erasureCoding.enableErasureCodingPolicy(ecPolicyName); + } + + @Override // ClientProtocol + public ErasureCodingPolicy getErasureCodingPolicy(String src) + throws IOException { + return erasureCoding.getErasureCodingPolicy(src); + } + + @Override // ClientProtocol + public void setErasureCodingPolicy(String src, String ecPolicyName) + throws IOException { + erasureCoding.setErasureCodingPolicy(src, ecPolicyName); + } + + @Override // ClientProtocol + public void unsetErasureCodingPolicy(String src) throws IOException { + erasureCoding.unsetErasureCodingPolicy(src); } @Override public ECBlockGroupStats getECBlockGroupStats() throws IOException { - checkOperation(OperationCategory.READ, false); - return null; - } - - @Override - public Map getErasureCodingCodecs() throws IOException { - checkOperation(OperationCategory.READ, false); - return null; + return erasureCoding.getECBlockGroupStats(); } @Override @@ -2127,6 +2109,39 @@ static UserGroupInformation getRemoteUser() throws IOException { return (ugi != null) ? ugi : UserGroupInformation.getCurrentUser(); } + /** + * Merge the outputs from multiple namespaces. + * @param map Namespace -> Output array. + * @param clazz Class of the values. + * @return Array with the outputs. + */ + protected static T[] merge( + Map map, Class clazz) { + + // Put all results into a set to avoid repeats + Set ret = new LinkedHashSet<>(); + for (T[] values : map.values()) { + for (T val : values) { + ret.add(val); + } + } + + return toArray(ret, clazz); + } + + /** + * Convert a set of values into an array. + * @param set Input set. + * @param clazz Class of the values. + * @return Array with the values in set. + */ + private static T[] toArray(Set set, Class clazz) { + @SuppressWarnings("unchecked") + T[] combinedData = (T[]) Array.newInstance(clazz, set.size()); + combinedData = set.toArray(combinedData); + return combinedData; + } + /** * Get quota module implement. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterDFSCluster.java index 7424499791..8f8bd3ebcb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterDFSCluster.java @@ -109,6 +109,8 @@ public class RouterDFSCluster { private List routers; /** If the Namenodes are in high availability.*/ private boolean highAvailability; + /** Number of datanodes per nameservice. */ + private int numDatanodesPerNameservice = 2; /** Mini cluster. */ private MiniDFSCluster cluster; @@ -356,8 +358,8 @@ public RouterDFSCluster(boolean ha, int numNameservices) { DEFAULT_HEARTBEAT_INTERVAL_MS, DEFAULT_CACHE_INTERVAL_MS); } - public RouterDFSCluster(boolean ha, int numNameservices, int numNamnodes) { - this(ha, numNameservices, numNamnodes, + public RouterDFSCluster(boolean ha, int numNameservices, int numNamenodes) { + this(ha, numNameservices, numNamenodes, DEFAULT_HEARTBEAT_INTERVAL_MS, DEFAULT_CACHE_INTERVAL_MS); } @@ -531,6 +533,10 @@ public void configureNameservices(int numNameservices, int numNamenodes) { } } + public void setNumDatanodesPerNameservice(int num) { + this.numDatanodesPerNameservice = num; + } + public String getNameservicesKey() { StringBuilder sb = new StringBuilder(); for (String nsId : this.nameservices) { @@ -658,7 +664,7 @@ public void startCluster(Configuration overrideConf) { nnConf.addResource(overrideConf); } cluster = new MiniDFSCluster.Builder(nnConf) - .numDataNodes(nameservices.size()*2) + .numDataNodes(nameservices.size() * numDatanodesPerNameservice) .nnTopology(topology) .build(); cluster.waitActive(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java index af506c9908..6a8c0e1ea6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java @@ -24,19 +24,24 @@ import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.getFileStatus; import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.verifyFileExists; import static org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.TEST_STRING; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import java.io.IOException; import java.lang.reflect.Method; import java.net.URISyntaxException; import java.util.Arrays; +import java.util.Comparator; import java.util.EnumSet; import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Map; +import java.util.Map.Entry; import java.util.Random; import java.util.Set; import java.util.TreeSet; @@ -53,10 +58,15 @@ import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; +import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DirectoryListing; +import org.apache.hadoop.hdfs.protocol.ECBlockGroupStats; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyState; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; @@ -68,12 +78,18 @@ import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; import org.apache.hadoop.io.EnumSetWritable; +import org.apache.hadoop.io.erasurecode.ECSchema; +import org.apache.hadoop.io.erasurecode.ErasureCodeConstants; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.Service.STATE; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Maps; /** * The the RPC interface of the {@link Router} implemented by @@ -81,6 +97,20 @@ */ public class TestRouterRpc { + private static final Logger LOG = + LoggerFactory.getLogger(TestRouterRpc.class); + + private static final Comparator EC_POLICY_CMP = + new Comparator() { + public int compare( + ErasureCodingPolicyInfo ec0, + ErasureCodingPolicyInfo ec1) { + String name0 = ec0.getPolicy().getName(); + String name1 = ec1.getPolicy().getName(); + return name0.compareTo(name1); + } + }; + /** Federated HDFS cluster. */ private static RouterDFSCluster cluster; @@ -111,6 +141,8 @@ public class TestRouterRpc { @BeforeClass public static void globalSetUp() throws Exception { cluster = new RouterDFSCluster(false, 2); + // We need 6 DNs to test Erasure Coding with RS-6-3-64k + cluster.setNumDatanodesPerNameservice(6); // Start NNs and DNs and wait until ready cluster.startCluster(); @@ -144,9 +176,9 @@ public void testSetup() throws Exception { // Wait to ensure NN has fully created its test directories Thread.sleep(100); - // Pick a NS, namenode and router for this test + // Default namenode and random router for this test this.router = cluster.getRandomRouter(); - this.ns = cluster.getRandomNameservice(); + this.ns = cluster.getNameservices().get(0); this.namenode = cluster.getNamenode(ns, null); // Handles to the ClientProtocol interface @@ -481,7 +513,7 @@ public void testProxyGetStats() throws Exception { for (int i = 0; i < data.length; i++) { individualData[i] += data[i]; } - assert(data.length == combinedData.length); + assertEquals(data.length, combinedData.length); } for (int i = 0; i < combinedData.length && i < individualData.length; i++) { @@ -489,7 +521,9 @@ public void testProxyGetStats() throws Exception { // Skip available storage as this fluctuates in mini cluster continue; } - assertEquals(combinedData[i], individualData[i]); + assertEquals("Stats for " + i + " don't match: " + + combinedData[i] + "!=" + individualData[i], + combinedData[i], individualData[i]); } } @@ -866,4 +900,153 @@ public void testProxyGetFileInfoAcessException() throws IOException { assertEquals(routerFailure.getClass(), nnFailure.getClass()); } + + @Test + public void testErasureCoding() throws IOException { + + LOG.info("List the available erasurce coding policies"); + ErasureCodingPolicyInfo[] policies = checkErasureCodingPolicies(); + for (ErasureCodingPolicyInfo policy : policies) { + LOG.info(" {}", policy); + } + + LOG.info("List the erasure coding codecs"); + Map codecsRouter = routerProtocol.getErasureCodingCodecs(); + Map codecsNamenode = nnProtocol.getErasureCodingCodecs(); + assertTrue(Maps.difference(codecsRouter, codecsNamenode).areEqual()); + for (Entry entry : codecsRouter.entrySet()) { + LOG.info(" {}: {}", entry.getKey(), entry.getValue()); + } + + LOG.info("Create a testing directory via the router at the root level"); + String dirPath = "/testec"; + String filePath1 = dirPath + "/testfile1"; + FsPermission permission = new FsPermission("755"); + routerProtocol.mkdirs(dirPath, permission, false); + createFile(routerFS, filePath1, 32); + assertTrue(verifyFileExists(routerFS, filePath1)); + DFSClient file1Protocol = getFileDFSClient(filePath1); + + LOG.info("The policy for the new file should not be set"); + assertNull(routerProtocol.getErasureCodingPolicy(filePath1)); + assertNull(file1Protocol.getErasureCodingPolicy(filePath1)); + + String policyName = "RS-6-3-1024k"; + LOG.info("Set policy \"{}\" for \"{}\"", policyName, dirPath); + routerProtocol.setErasureCodingPolicy(dirPath, policyName); + + String filePath2 = dirPath + "/testfile2"; + LOG.info("Create {} in the path with the new EC policy", filePath2); + createFile(routerFS, filePath2, 32); + assertTrue(verifyFileExists(routerFS, filePath2)); + DFSClient file2Protocol = getFileDFSClient(filePath2); + + LOG.info("Check that the policy is set for {}", filePath2); + ErasureCodingPolicy policyRouter1 = + routerProtocol.getErasureCodingPolicy(filePath2); + ErasureCodingPolicy policyNamenode1 = + file2Protocol.getErasureCodingPolicy(filePath2); + assertNotNull(policyRouter1); + assertEquals(policyName, policyRouter1.getName()); + assertEquals(policyName, policyNamenode1.getName()); + + LOG.info("Create a new erasure coding policy"); + String newPolicyName = "RS-6-3-128k"; + ECSchema ecSchema = new ECSchema(ErasureCodeConstants.RS_CODEC_NAME, 6, 3); + ErasureCodingPolicy ecPolicy = new ErasureCodingPolicy( + newPolicyName, + ecSchema, + 128 * 1024, + (byte) -1); + ErasureCodingPolicy[] newPolicies = new ErasureCodingPolicy[] { + ecPolicy + }; + AddErasureCodingPolicyResponse[] responses = + routerProtocol.addErasureCodingPolicies(newPolicies); + assertEquals(1, responses.length); + assertTrue(responses[0].isSucceed()); + routerProtocol.disableErasureCodingPolicy(newPolicyName); + + LOG.info("The new policy should be there and disabled"); + policies = checkErasureCodingPolicies(); + boolean found = false; + for (ErasureCodingPolicyInfo policy : policies) { + LOG.info(" {}" + policy); + if (policy.getPolicy().getName().equals(newPolicyName)) { + found = true; + assertEquals(ErasureCodingPolicyState.DISABLED, policy.getState()); + break; + } + } + assertTrue(found); + + LOG.info("Set the test folder to use the new policy"); + routerProtocol.enableErasureCodingPolicy(newPolicyName); + routerProtocol.setErasureCodingPolicy(dirPath, newPolicyName); + + LOG.info("Create a file in the path with the new EC policy"); + String filePath3 = dirPath + "/testfile3"; + createFile(routerFS, filePath3, 32); + assertTrue(verifyFileExists(routerFS, filePath3)); + DFSClient file3Protocol = getFileDFSClient(filePath3); + + ErasureCodingPolicy policyRouterFile3 = + routerProtocol.getErasureCodingPolicy(filePath3); + assertEquals(newPolicyName, policyRouterFile3.getName()); + ErasureCodingPolicy policyNamenodeFile3 = + file3Protocol.getErasureCodingPolicy(filePath3); + assertEquals(newPolicyName, policyNamenodeFile3.getName()); + + LOG.info("Remove the policy and check the one for the test folder"); + routerProtocol.removeErasureCodingPolicy(newPolicyName); + ErasureCodingPolicy policyRouter3 = + routerProtocol.getErasureCodingPolicy(filePath3); + assertEquals(newPolicyName, policyRouter3.getName()); + ErasureCodingPolicy policyNamenode3 = + file3Protocol.getErasureCodingPolicy(filePath3); + assertEquals(newPolicyName, policyNamenode3.getName()); + + LOG.info("Check the stats"); + ECBlockGroupStats statsRouter = routerProtocol.getECBlockGroupStats(); + ECBlockGroupStats statsNamenode = nnProtocol.getECBlockGroupStats(); + assertEquals(statsNamenode.toString(), statsRouter.toString()); + } + + /** + * Check the erasure coding policies in the Router and the Namenode. + * @return The erasure coding policies. + */ + private ErasureCodingPolicyInfo[] checkErasureCodingPolicies() + throws IOException { + ErasureCodingPolicyInfo[] policiesRouter = + routerProtocol.getErasureCodingPolicies(); + assertNotNull(policiesRouter); + ErasureCodingPolicyInfo[] policiesNamenode = + nnProtocol.getErasureCodingPolicies(); + Arrays.sort(policiesRouter, EC_POLICY_CMP); + Arrays.sort(policiesNamenode, EC_POLICY_CMP); + assertArrayEquals(policiesRouter, policiesNamenode); + return policiesRouter; + } + + /** + * Find the Namenode for a particular file and return the DFSClient. + * @param path Path of the file to check. + * @return The DFSClient to the Namenode holding the file. + */ + private DFSClient getFileDFSClient(final String path) { + for (String nsId : cluster.getNameservices()) { + LOG.info("Checking {} for {}", nsId, path); + NamenodeContext nn = cluster.getNamenode(nsId, null); + try { + DFSClient nnClientProtocol = nn.getClient(); + if (nnClientProtocol.getFileInfo(path) != null) { + return nnClientProtocol; + } + } catch (Exception ignore) { + // ignore + } + } + return null; + } } \ No newline at end of file