diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/pom.xml b/hadoop-hdfs-project/hadoop-hdfs-rbf/pom.xml index 41290cc67f..8ba016a3f3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/pom.xml +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/pom.xml @@ -96,6 +96,11 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd"> test test-jar + + org.apache.hadoop + hadoop-distcp + test + com.fasterxml.jackson.core jackson-annotations @@ -115,6 +120,26 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd"> test test-jar + + org.apache.hadoop + hadoop-mapreduce-client-app + test + + + org.apache.hadoop + hadoop-mapreduce-client-hs + test + + + org.apache.hadoop + hadoop-mapreduce-client-core + test + + + org.apache.hadoop + hadoop-mapreduce-client-jobclient + test + org.apache.curator curator-test diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/RBFMetrics.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/RBFMetrics.java index 9e1fb67173..cc5bf07408 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/RBFMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/RBFMetrics.java @@ -697,6 +697,16 @@ public long getHighestPriorityLowRedundancyECBlocks() { MembershipStats::getHighestPriorityLowRedundancyECBlocks); } + @Override + public int getRouterFederationRenameCount() { + return this.router.getRpcServer().getRouterFederationRenameCount(); + } + + @Override + public int getSchedulerJobCount() { + return this.router.getRpcServer().getSchedulerJobCount(); + } + @Override public String getSafemode() { if (this.router.isRouterState(RouterServiceState.SAFEMODE)) { diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/RouterMBean.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/RouterMBean.java index 087c5b4bac..f5e3228c21 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/RouterMBean.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/RouterMBean.java @@ -108,4 +108,19 @@ public interface RouterMBean { * @return Json string of owners to token counts */ String getTopTokenRealOwners(); + + /** + * Gets the count of the currently running router federation rename jobs. + * + * @return the count of the currently running router federation rename jobs. + */ + int getRouterFederationRenameCount(); + + /** + * Gets the count of the currently running jobs in the scheduler. It includes + * both the submitted and the recovered jobs. + * + * @return the count of the currently running jobs in the scheduler. + */ + int getSchedulerJobCount(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java index 8fd2e28860..4777da4700 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java @@ -348,4 +348,31 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic { NoRouterRpcFairnessPolicyController.class; public static final String DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX = FEDERATION_ROUTER_FAIRNESS_PREFIX + "handler.count."; + + // HDFS Router Federation Rename. + public static final String DFS_ROUTER_FEDERATION_RENAME_PREFIX = + FEDERATION_ROUTER_PREFIX + "federation.rename."; + public static final String DFS_ROUTER_FEDERATION_RENAME_OPTION = + DFS_ROUTER_FEDERATION_RENAME_PREFIX + "option"; + public static final String DFS_ROUTER_FEDERATION_RENAME_OPTION_DEFAULT = + "NONE"; + public static final String + DFS_ROUTER_FEDERATION_RENAME_FORCE_CLOSE_OPEN_FILE = + DFS_ROUTER_FEDERATION_RENAME_PREFIX + "force.close.open.file"; + public static final boolean + DFS_ROUTER_FEDERATION_RENAME_FORCE_CLOSE_OPEN_FILE_DEFAULT = true; + public static final String DFS_ROUTER_FEDERATION_RENAME_MAP = + DFS_ROUTER_FEDERATION_RENAME_PREFIX + "map"; + public static final String DFS_ROUTER_FEDERATION_RENAME_BANDWIDTH = + DFS_ROUTER_FEDERATION_RENAME_PREFIX + "bandwidth"; + public static final String DFS_ROUTER_FEDERATION_RENAME_DELAY = + DFS_ROUTER_FEDERATION_RENAME_PREFIX + "delay"; + public static final long DFS_ROUTER_FEDERATION_RENAME_DELAY_DEFAULT = 1000; + public static final String DFS_ROUTER_FEDERATION_RENAME_DIFF = + DFS_ROUTER_FEDERATION_RENAME_PREFIX + "diff"; + public static final int DFS_ROUTER_FEDERATION_RENAME_DIFF_DEFAULT = 0; + public static final String DFS_ROUTER_FEDERATION_RENAME_TRASH = + DFS_ROUTER_FEDERATION_RENAME_PREFIX + "trash"; + public static final String DFS_ROUTER_FEDERATION_RENAME_TRASH_DEFAULT = + "trash"; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java index f62f553e03..17524d421c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java @@ -126,6 +126,7 @@ public class RouterClientProtocol implements ClientProtocol { private final RouterRpcServer rpcServer; private final RouterRpcClient rpcClient; + private final RouterFederationRename rbfRename; private final FileSubclusterResolver subclusterResolver; private final ActiveNamenodeResolver namenodeResolver; @@ -191,6 +192,7 @@ public class RouterClientProtocol implements ClientProtocol { this.snapshotProto = new RouterSnapshot(rpcServer); this.routerCacheAdmin = new RouterCacheAdmin(rpcServer); this.securityManager = rpcServer.getRouterSecurityManager(); + this.rbfRename = new RouterFederationRename(rpcServer, conf); } @Override @@ -594,13 +596,13 @@ public boolean rename(final String src, final String dst) final List srcLocations = rpcServer.getLocationsForPath(src, true, false); + final List dstLocations = + rpcServer.getLocationsForPath(dst, false, false); // srcLocations may be trimmed by getRenameDestinations() final List locs = new LinkedList<>(srcLocations); - RemoteParam dstParam = getRenameDestinations(locs, dst); + RemoteParam dstParam = getRenameDestinations(locs, dstLocations); if (locs.isEmpty()) { - throw new IOException( - "Rename of " + src + " to " + dst + " is not allowed," + - " no eligible destination in the same namespace was found."); + return rbfRename.routerFedRename(src, dst, srcLocations, dstLocations); } RemoteMethod method = new RemoteMethod("rename", new Class[] {String.class, String.class}, @@ -620,13 +622,14 @@ public void rename2(final String src, final String dst, final List srcLocations = rpcServer.getLocationsForPath(src, true, false); + final List dstLocations = + rpcServer.getLocationsForPath(dst, false, false); // srcLocations may be trimmed by getRenameDestinations() final List locs = new LinkedList<>(srcLocations); - RemoteParam dstParam = getRenameDestinations(locs, dst); + RemoteParam dstParam = getRenameDestinations(locs, dstLocations); if (locs.isEmpty()) { - throw new IOException( - "Rename of " + src + " to " + dst + " is not allowed," + - " no eligible destination in the same namespace was found."); + rbfRename.routerFedRename(src, dst, srcLocations, dstLocations); + return; } RemoteMethod method = new RemoteMethod("rename2", new Class[] {String.class, String.class, options.getClass()}, @@ -1821,11 +1824,9 @@ public HAServiceProtocol.HAServiceState getHAServiceState() { * @throws IOException If the dst paths could not be determined. */ private RemoteParam getRenameDestinations( - final List srcLocations, final String dst) - throws IOException { + final List srcLocations, + final List dstLocations) throws IOException { - final List dstLocations = - rpcServer.getLocationsForPath(dst, false, false); final Map dstMap = new HashMap<>(); Iterator iterator = srcLocations.iterator(); @@ -2203,4 +2204,8 @@ boolean isMultiDestDirectory(String src) throws IOException { } return false; } + + public int getRouterFederationRenameCount() { + return rbfRename.getRouterFederationRenameCount(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterFederationRename.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterFederationRename.java new file mode 100644 index 0000000000..8074fdd2f6 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterFederationRename.java @@ -0,0 +1,202 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; +import org.apache.hadoop.tools.fedbalance.DistCpProcedure; +import org.apache.hadoop.tools.fedbalance.FedBalanceConfigs; +import org.apache.hadoop.tools.fedbalance.FedBalanceContext; +import org.apache.hadoop.tools.fedbalance.TrashProcedure; +import org.apache.hadoop.tools.fedbalance.procedure.BalanceJob; +import org.apache.hadoop.tools.fedbalance.procedure.BalanceProcedure; +import org.apache.hadoop.tools.fedbalance.procedure.BalanceProcedureScheduler; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_FORCE_CLOSE_OPEN_FILE; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_FORCE_CLOSE_OPEN_FILE_DEFAULT; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_MAP; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_BANDWIDTH; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_DELAY; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_DELAY_DEFAULT; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_DIFF; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_DIFF_DEFAULT; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_TRASH; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_TRASH_DEFAULT; +import static org.apache.hadoop.tools.fedbalance.FedBalance.DISTCP_PROCEDURE; +import static org.apache.hadoop.tools.fedbalance.FedBalance.TRASH_PROCEDURE; +import static org.apache.hadoop.tools.fedbalance.FedBalance.NO_MOUNT; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Rename across router based federation namespaces. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class RouterFederationRename { + + private static final Logger LOG = + LoggerFactory.getLogger(RouterFederationRename.class.getName()); + private final RouterRpcServer rpcServer; + private final Configuration conf; + private final AtomicInteger routerRenameCounter = new AtomicInteger(); + public enum RouterRenameOption { + NONE, DISTCP + } + + public RouterFederationRename(RouterRpcServer rpcServer, Configuration conf) { + this.rpcServer = rpcServer; + this.conf = conf; + } + + /** + * Router federation rename across namespaces. + * + * @param src the source path. There is no mount point under the src path. + * @param dst the dst path. + * @param srcLocations the remote locations of src. + * @param dstLocations the remote locations of dst. + * @throws IOException if rename fails. + * @return true if rename succeeds. + */ + boolean routerFedRename(final String src, final String dst, + final List srcLocations, + final List dstLocations) throws IOException { + if (!rpcServer.isEnableRenameAcrossNamespace()) { + throw new IOException("Rename of " + src + " to " + dst + + " is not allowed, no eligible destination in the same namespace was" + + " found"); + } + if (srcLocations.size() != 1 || dstLocations.size() != 1) { + throw new IOException("Rename of " + src + " to " + dst + " is not" + + " allowed. The remote location should be exactly one."); + } + RemoteLocation srcLoc = srcLocations.get(0); + RemoteLocation dstLoc = dstLocations.get(0); + // Build and submit router federation rename job. + BalanceJob job = buildRouterRenameJob(srcLoc.getNameserviceId(), + dstLoc.getNameserviceId(), srcLoc.getDest(), dstLoc.getDest()); + BalanceProcedureScheduler scheduler = rpcServer.getFedRenameScheduler(); + countIncrement(); + try { + scheduler.submit(job); + LOG.info("Rename {} to {} from namespace {} to {}. JobId={}.", src, dst, + srcLoc.getNameserviceId(), dstLoc.getNameserviceId(), job.getId()); + scheduler.waitUntilDone(job); + if (job.getError() != null) { + throw new IOException("Rename of " + src + " to " + dst + " failed.", + job.getError()); + } + return true; + } finally { + countDecrement(); + } + } + + /** + * Build router federation rename job moving data from src to dst. + * @param srcNs the source namespace id. + * @param dstNs the dst namespace id. + * @param src the source path. + * @param dst the dst path. + */ + private BalanceJob buildRouterRenameJob(String srcNs, String dstNs, + String src, String dst) throws IOException { + checkConfiguration(conf); + Path srcPath = new Path("hdfs://" + srcNs + src); + Path dstPath = new Path("hdfs://" + dstNs + dst); + boolean forceCloseOpen = + conf.getBoolean(DFS_ROUTER_FEDERATION_RENAME_FORCE_CLOSE_OPEN_FILE, + DFS_ROUTER_FEDERATION_RENAME_FORCE_CLOSE_OPEN_FILE_DEFAULT); + int map = conf.getInt(DFS_ROUTER_FEDERATION_RENAME_MAP, -1); + int bandwidth = conf.getInt(DFS_ROUTER_FEDERATION_RENAME_BANDWIDTH, -1); + long delay = conf.getLong(DFS_ROUTER_FEDERATION_RENAME_DELAY, + DFS_ROUTER_FEDERATION_RENAME_DELAY_DEFAULT); + int diff = conf.getInt(DFS_ROUTER_FEDERATION_RENAME_DIFF, + DFS_ROUTER_FEDERATION_RENAME_DIFF_DEFAULT); + String trashPolicy = conf.get(DFS_ROUTER_FEDERATION_RENAME_TRASH, + DFS_ROUTER_FEDERATION_RENAME_TRASH_DEFAULT); + FedBalanceConfigs.TrashOption trashOpt = + FedBalanceConfigs.TrashOption.valueOf(trashPolicy.toUpperCase()); + // Construct job context. + FedBalanceContext context = + new FedBalanceContext.Builder(srcPath, dstPath, NO_MOUNT, conf) + .setForceCloseOpenFiles(forceCloseOpen) + .setUseMountReadOnly(true) + .setMapNum(map) + .setBandwidthLimit(bandwidth) + .setTrash(trashOpt) + .setDelayDuration(delay) + .setDiffThreshold(diff) + .build(); + + LOG.info(context.toString()); + // Construct the balance job. + BalanceJob.Builder builder = new BalanceJob.Builder<>(); + DistCpProcedure dcp = + new DistCpProcedure(DISTCP_PROCEDURE, null, delay, context); + builder.nextProcedure(dcp); + TrashProcedure tp = + new TrashProcedure(TRASH_PROCEDURE, null, delay, context); + builder.nextProcedure(tp); + return builder.build(); + } + + public int getRouterFederationRenameCount() { + return routerRenameCounter.get(); + } + + void countIncrement() { + routerRenameCounter.incrementAndGet(); + } + + void countDecrement() { + routerRenameCounter.decrementAndGet(); + } + + static void checkConfiguration(Configuration conf) throws IOException { + int map = conf.getInt(DFS_ROUTER_FEDERATION_RENAME_MAP, -1); + int bandwidth = conf.getInt(DFS_ROUTER_FEDERATION_RENAME_BANDWIDTH, -1); + long delay = conf.getLong(DFS_ROUTER_FEDERATION_RENAME_DELAY, + DFS_ROUTER_FEDERATION_RENAME_DELAY_DEFAULT); + int diff = conf.getInt(DFS_ROUTER_FEDERATION_RENAME_DIFF, + DFS_ROUTER_FEDERATION_RENAME_DIFF_DEFAULT); + if (map < 0) { + throw new IOException("map=" + map + " is negative. Please check " + + DFS_ROUTER_FEDERATION_RENAME_MAP); + } else if (bandwidth < 0) { + throw new IOException( + "bandwidth=" + bandwidth + " is negative. Please check " + + DFS_ROUTER_FEDERATION_RENAME_BANDWIDTH); + } else if (delay < 0) { + throw new IOException("delay=" + delay + " is negative. Please check " + + DFS_ROUTER_FEDERATION_RENAME_DELAY); + } else if (diff < 0) { + throw new IOException("diff=" + diff + " is negative. Please check " + + DFS_ROUTER_FEDERATION_RENAME_DIFF); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index a8cb5c6ce8..1d0800e4bd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -28,12 +28,18 @@ import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_READER_QUEUE_SIZE_KEY; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DN_REPORT_CACHE_EXPIRE; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DN_REPORT_CACHE_EXPIRE_MS_DEFAULT; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_OPTION; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_OPTION_DEFAULT; +import static org.apache.hadoop.hdfs.server.federation.router.RouterFederationRename.RouterRenameOption; +import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.SCHEDULER_JOURNAL_URI; import java.io.FileNotFoundException; import java.io.IOException; import java.lang.reflect.Array; import java.net.ConnectException; import java.net.InetSocketAddress; +import java.net.URI; +import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Collection; import java.util.EnumSet; @@ -50,6 +56,8 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.HAUtil; import org.apache.hadoop.thirdparty.com.google.common.cache.CacheBuilder; import org.apache.hadoop.thirdparty.com.google.common.cache.CacheLoader; import org.apache.hadoop.thirdparty.com.google.common.cache.LoadingCache; @@ -165,6 +173,7 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.tools.GetUserMappingsProtocol; +import org.apache.hadoop.tools.fedbalance.procedure.BalanceProcedureScheduler; import org.apache.hadoop.tools.proto.GetUserMappingsProtocolProtos; import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolPB; import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolServerSideTranslatorPB; @@ -238,6 +247,10 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol, /** DN type -> full DN report. */ private final LoadingCache dnCache; + /** Specify the option of router federation rename. */ + private RouterRenameOption routerRenameOption; + /** Schedule the router federation rename jobs. */ + private BalanceProcedureScheduler fedRenameScheduler; /** * Construct a router RPC server. * @@ -397,6 +410,57 @@ public RouterRpcServer(Configuration configuration, Router router, .forEach((key) -> this.dnCache.refresh(key)), 0, dnCacheExpire, TimeUnit.MILLISECONDS); + initRouterFedRename(); + } + + /** + * Init the router federation rename environment. Each router has its own + * journal path. + * In HA mode the journal path is: + * JOURNAL_BASE/nsId/namenodeId + * e.g. + * /journal/router-namespace/host0 + * In non-ha mode the journal path is based on ip and port: + * JOURNAL_BASE/host_port + * e.g. + * /journal/0.0.0.0_8888 + */ + private void initRouterFedRename() throws IOException { + routerRenameOption = RouterRenameOption.valueOf( + conf.get(DFS_ROUTER_FEDERATION_RENAME_OPTION, + DFS_ROUTER_FEDERATION_RENAME_OPTION_DEFAULT).toUpperCase()); + switch (routerRenameOption) { + case DISTCP: + RouterFederationRename.checkConfiguration(conf); + Configuration sConf = new Configuration(conf); + URI journalUri; + try { + journalUri = new URI(sConf.get(SCHEDULER_JOURNAL_URI)); + } catch (URISyntaxException e) { + throw new IOException("Bad journal uri. Please check configuration for " + + SCHEDULER_JOURNAL_URI); + } + Path child; + String nsId = DFSUtil.getNamenodeNameServiceId(conf); + String namenodeId = HAUtil.getNameNodeId(conf, nsId); + InetSocketAddress listenAddress = this.rpcServer.getListenerAddress(); + if (nsId == null || namenodeId == null) { + child = new Path( + listenAddress.getHostName() + "_" + listenAddress.getPort()); + } else { + child = new Path(nsId, namenodeId); + } + String routerJournal = new Path(journalUri.toString(), child).toString(); + sConf.set(SCHEDULER_JOURNAL_URI, routerJournal); + fedRenameScheduler = new BalanceProcedureScheduler(sConf); + fedRenameScheduler.init(true); + break; + case NONE: + fedRenameScheduler = null; + break; + default: + break; + } } @Override @@ -432,9 +496,20 @@ protected void serviceStop() throws Exception { if (securityManager != null) { this.securityManager.stop(); } + if (this.fedRenameScheduler != null) { + fedRenameScheduler.shutDown(); + } super.serviceStop(); } + boolean isEnableRenameAcrossNamespace() { + return routerRenameOption != RouterRenameOption.NONE; + } + + BalanceProcedureScheduler getFedRenameScheduler() { + return this.fedRenameScheduler; + } + /** * Get the RPC security manager. * @@ -1889,6 +1964,17 @@ public String[] getGroupsForUser(String user) throws IOException { return routerProto.getGroupsForUser(user); } + public int getRouterFederationRenameCount() { + return clientProto.getRouterFederationRenameCount(); + } + + public int getSchedulerJobCount() { + if (fedRenameScheduler == null) { + return 0; + } + return fedRenameScheduler.getAllJobs().size(); + } + /** * Deals with loading datanode report into the cache and refresh. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml index 8c171854e8..2c397d2d76 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml @@ -702,4 +702,63 @@ concurrent calls. + + + dfs.federation.router.federation.rename.bandwidth + + + Specify bandwidth per map in MB. + + + + + dfs.federation.router.federation.rename.map + + + Max number of concurrent maps to use for copy. + + + + + dfs.federation.router.federation.rename.delay + 1000 + + Specify the delayed duration(millie seconds) when the job needs to retry. + + + + + dfs.federation.router.federation.rename.diff + 0 + + Specify the threshold of the diff entries that used in incremental copy + stage. + + + + + dfs.federation.router.federation.rename.option + NONE + + Specify the action when rename across namespaces. The option can be NONE + and DISTCP. + + + + + dfs.federation.router.federation.rename.force.close.open.file + true + + Force close all open files when there is no diff in the DIFF_DISTCP stage. + + + + + dfs.federation.router.federation.rename.trash + trash + + This options has 3 values: trash (move the source path to trash), delete + (delete the source path directly) and skip (skip both trash and deletion). + + diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md index 702fa44974..d7838c75f3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md @@ -509,4 +509,19 @@ Metrics ------- The Router and State Store statistics are exposed in metrics/JMX. These info will be very useful for monitoring. -More metrics info can see [RBF Metrics](../../hadoop-project-dist/hadoop-common/Metrics.html#RBFMetrics), [Router RPC Metrics](../../hadoop-project-dist/hadoop-common/Metrics.html#RouterRPCMetrics) and [State Store Metrics](../../hadoop-project-dist/hadoop-common/Metrics.html#StateStoreMetrics). \ No newline at end of file +More metrics info can see [RBF Metrics](../../hadoop-project-dist/hadoop-common/Metrics.html#RBFMetrics), [Router RPC Metrics](../../hadoop-project-dist/hadoop-common/Metrics.html#RouterRPCMetrics) and [State Store Metrics](../../hadoop-project-dist/hadoop-common/Metrics.html#StateStoreMetrics). + +Router Federation Rename +------- + +Enable Router to rename across namespaces. Currently the router federation rename is implemented by distcp. We must set the rpc timeout high enough so it won't timeout. + +| Property | Default | Description| +|:---- |:---- |:---- | +| dfs.federation.router.federation.rename.option | NONE | Specify the action when rename across namespaces. The option can be NONE(reject rename across namespaces) and DISTCP(rename across namespaces with distcp). | +| dfs.federation.router.federation.rename.force.close.open.file | true | Force close all open files when there is no diff in the DIFF_DISTCP stage.| +| dfs.federation.router.federation.rename.map | | Max number of concurrent maps to use for copy.| +| dfs.federation.router.federation.rename.bandwidth | | Specify bandwidth per map in MB.| +| dfs.federation.router.federation.rename.delay | 1000 | Specify the delayed duration(millie seconds) when the job needs to retry.| +| dfs.federation.router.federation.rename.diff | 0 | Specify the threshold of the diff entries that used in incremental copy stage.| +| dfs.federation.router.federation.rename.trash | trash | This options has 3 values: trash (move the source path to trash), delete (delete the source path directly) and skip (skip both trash and deletion).| \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java index 3a366171b9..8b9ff10630 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java @@ -17,11 +17,17 @@ */ package org.apache.hadoop.hdfs.server.federation; +import static org.apache.hadoop.hdfs.server.federation.router.RouterFederationRename.RouterRenameOption; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys; import org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils; import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_OPTION; + /** * Constructs a router configuration with individual features enabled/disabled. */ @@ -38,7 +44,9 @@ public class RouterConfigBuilder { private boolean enableMetrics = false; private boolean enableQuota = false; private boolean enableSafemode = false; + private RouterRenameOption routerRenameOption = RouterRenameOption.NONE; private boolean enableCacheRefresh; + private Map innerMap = new HashMap<>(); public RouterConfigBuilder(Configuration configuration) { this.conf = configuration; @@ -95,6 +103,11 @@ public RouterConfigBuilder metrics(boolean enable) { return this; } + public RouterConfigBuilder routerRenameOption(RouterRenameOption option) { + this.routerRenameOption = option; + return this; + } + public RouterConfigBuilder quota(boolean enable) { this.enableQuota = enable; return this; @@ -138,6 +151,10 @@ public RouterConfigBuilder metrics() { return this.metrics(true); } + public RouterConfigBuilder routerRenameOption() { + return this.routerRenameOption(RouterRenameOption.DISTCP); + } + public RouterConfigBuilder quota() { return this.quota(true); } @@ -150,6 +167,13 @@ public RouterConfigBuilder refreshCache() { return this.refreshCache(true); } + public RouterConfigBuilder set(String key, String value) { + if (key != null && value != null) { + innerMap.put(key, value); + } + return this; + } + public Configuration build() { conf.setBoolean(RBFConfigKeys.DFS_ROUTER_STORE_ENABLE, this.enableStateStore); @@ -183,6 +207,10 @@ public Configuration build() { this.enableSafemode); conf.setBoolean(RBFConfigKeys.MOUNT_TABLE_CACHE_UPDATE, this.enableCacheRefresh); + conf.set(DFS_ROUTER_FEDERATION_RENAME_OPTION, routerRenameOption.name()); + for (Map.Entry kv : innerMap.entrySet()) { + conf.set(kv.getKey(), kv.getValue()); + } return conf; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFederationRename.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFederationRename.java new file mode 100644 index 0000000000..c47098f337 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFederationRename.java @@ -0,0 +1,455 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.createFile; +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.verifyFileExists; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_BANDWIDTH; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_MAP; +import static org.apache.hadoop.test.GenericTestUtils.getMethodName; +import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.SCHEDULER_JOURNAL_URI; +import static org.junit.Assert.*; +import static org.mockito.Mockito.verify; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster; +import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.NamenodeContext; +import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext; +import org.apache.hadoop.hdfs.server.federation.MockResolver; +import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; +import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.test.LambdaTestUtils; +import org.apache.hadoop.tools.fedbalance.DistCpProcedure; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Mockito; + +/** + * Basic tests of router federation rename. Rename across namespaces. + */ +public class TestRouterFederationRename { + + private static final int NUM_SUBCLUSTERS = 2; + private static final int NUM_DNS = 6; + + /** Federated HDFS cluster. */ + private static MiniRouterDFSCluster cluster; + + /** Random Router for this federated cluster. */ + private RouterContext router; + + /** Random nameservice in the federated cluster. */ + private String ns; + /** Filesystem interface to the Router. */ + private FileSystem routerFS; + /** Filesystem interface to the Namenode. */ + private FileSystem nnFS; + /** File in the Namenode. */ + private String nnFile; + + @BeforeClass + public static void globalSetUp() throws Exception { + Configuration namenodeConf = new Configuration(); + namenodeConf.setBoolean(DFSConfigKeys.HADOOP_CALLER_CONTEXT_ENABLED_KEY, + true); + cluster = new MiniRouterDFSCluster(false, NUM_SUBCLUSTERS); + cluster.setNumDatanodesPerNameservice(NUM_DNS); + cluster.addNamenodeOverrides(namenodeConf); + cluster.setIndependentDNs(); + + Configuration conf = new Configuration(); + conf.setInt(DFSConfigKeys.DFS_LIST_LIMIT, 5); + cluster.addNamenodeOverrides(conf); + // Start NNs and DNs and wait until ready. + cluster.startCluster(); + + // Start routers, enable router federation rename. + String journal = "hdfs://" + cluster.getCluster().getNameNode(1) + .getClientNamenodeAddress() + "/journal"; + Configuration routerConf = new RouterConfigBuilder() + .metrics() + .rpc() + .routerRenameOption() + .set(SCHEDULER_JOURNAL_URI, journal) + .set(DFS_ROUTER_FEDERATION_RENAME_MAP, "1") + .set(DFS_ROUTER_FEDERATION_RENAME_BANDWIDTH, "1") + .build(); + // We decrease the DN cache times to make the test faster. + routerConf.setTimeDuration( + RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS); + cluster.addRouterOverrides(routerConf); + cluster.startRouters(); + + // Register and verify all NNs with all routers + cluster.registerNamenodes(); + cluster.waitNamenodeRegistration(); + + // We decrease the DN heartbeat expire interval to make them dead faster + cluster.getCluster().getNamesystem(0).getBlockManager() + .getDatanodeManager().setHeartbeatInterval(1); + cluster.getCluster().getNamesystem(1).getBlockManager() + .getDatanodeManager().setHeartbeatInterval(1); + cluster.getCluster().getNamesystem(0).getBlockManager() + .getDatanodeManager().setHeartbeatExpireInterval(3000); + cluster.getCluster().getNamesystem(1).getBlockManager() + .getDatanodeManager().setHeartbeatExpireInterval(3000); + DistCpProcedure.enableForTest(); + } + + @AfterClass + public static void tearDown() { + cluster.shutdown(); + DistCpProcedure.disableForTest(); + } + + @Before + public void testSetup() throws Exception { + + // Create mock locations + cluster.installMockLocations(); + + // Delete all files via the NNs and verify + cluster.deleteAllFiles(); + + // Create test fixtures on NN + cluster.createTestDirectoriesNamenode(); + + // Wait to ensure NN has fully created its test directories + Thread.sleep(100); + + // Random router for this test + RouterContext rndRouter = cluster.getRandomRouter(); + this.setRouter(rndRouter); + + // Create a mount that points to 2 dirs in the same ns: + // /same + // ns0 -> / + // ns0 -> /target-ns0 + for (RouterContext rc : cluster.getRouters()) { + Router r = rc.getRouter(); + MockResolver resolver = (MockResolver) r.getSubclusterResolver(); + List nss = cluster.getNameservices(); + String ns0 = nss.get(0); + resolver.addLocation("/same", ns0, "/"); + resolver.addLocation("/same", ns0, cluster.getNamenodePathForNS(ns0)); + } + + // Pick a namenode for this test + String ns0 = cluster.getNameservices().get(0); + this.setNs(ns0); + this.setNamenode(cluster.getNamenode(ns0, null)); + + // Create a test file on the NN + Random rnd = new Random(); + String randomFile = "testfile-" + rnd.nextInt(); + this.nnFile = + cluster.getNamenodeTestDirectoryForNS(ns) + "/" + randomFile; + + createFile(nnFS, nnFile, 32); + verifyFileExists(nnFS, nnFile); + } + + protected void createDir(FileSystem fs, String dir) throws IOException { + fs.mkdirs(new Path(dir)); + String file = dir + "/file"; + createFile(fs, file, 32); + verifyFileExists(fs, dir); + verifyFileExists(fs, file); + } + + protected void testRenameDir(RouterContext testRouter, String path, + String renamedPath, boolean exceptionExpected, Callable call) + throws IOException { + createDir(testRouter.getFileSystem(), path); + // rename + boolean exceptionThrown = false; + try { + call.call(); + assertFalse(verifyFileExists(testRouter.getFileSystem(), path)); + assertTrue( + verifyFileExists(testRouter.getFileSystem(), renamedPath + "/file")); + } catch (Exception ex) { + exceptionThrown = true; + assertTrue(verifyFileExists(testRouter.getFileSystem(), path + "/file")); + assertFalse(verifyFileExists(testRouter.getFileSystem(), renamedPath)); + } finally { + FileContext fileContext = testRouter.getFileContext(); + fileContext.delete(new Path(path), true); + fileContext.delete(new Path(renamedPath), true); + } + if (exceptionExpected) { + // Error was expected. + assertTrue(exceptionThrown); + } else { + // No error was expected. + assertFalse(exceptionThrown); + } + } + + protected void setRouter(RouterContext r) throws IOException { + this.router = r; + this.routerFS = r.getFileSystem(); + } + + protected void setNs(String nameservice) { + this.ns = nameservice; + } + + protected void setNamenode(NamenodeContext nn) throws IOException { + this.nnFS = nn.getFileSystem(); + } + + protected FileSystem getRouterFileSystem() { + return this.routerFS; + } + + @Test + public void testSuccessfulRbfRename() throws Exception { + List nss = cluster.getNameservices(); + String ns0 = nss.get(0); + String ns1 = nss.get(1); + + // Test successfully rename a dir to a destination that is in a different + // namespace. + String dir = + cluster.getFederatedTestDirectoryForNS(ns0) + "/" + getMethodName(); + String renamedDir = + cluster.getFederatedTestDirectoryForNS(ns1) + "/" + getMethodName(); + testRenameDir(router, dir, renamedDir, false, () -> { + DFSClient client = router.getClient(); + ClientProtocol clientProtocol = client.getNamenode(); + clientProtocol.rename(dir, renamedDir); + return null; + }); + testRenameDir(router, dir, renamedDir, false, () -> { + DFSClient client = router.getClient(); + ClientProtocol clientProtocol = client.getNamenode(); + clientProtocol.rename2(dir, renamedDir); + return null; + }); + } + + @Test + public void testRbfRenameFile() throws Exception { + List nss = cluster.getNameservices(); + String ns0 = nss.get(0); + String ns1 = nss.get(1); + + // Test router federation rename a file. + String file = + cluster.getFederatedTestDirectoryForNS(ns0) + "/" + getMethodName(); + String renamedFile = + cluster.getFederatedTestDirectoryForNS(ns1) + "/" + getMethodName(); + createFile(routerFS, file, 32); + getRouterFileSystem().mkdirs(new Path(renamedFile)); + LambdaTestUtils.intercept(RemoteException.class, "should be a directory", + "Expect RemoteException.", () -> { + DFSClient client = router.getClient(); + ClientProtocol clientProtocol = client.getNamenode(); + clientProtocol.rename(file, renamedFile); + return null; + }); + LambdaTestUtils.intercept(RemoteException.class, "should be a directory", + "Expect RemoteException.", () -> { + DFSClient client = router.getClient(); + ClientProtocol clientProtocol = client.getNamenode(); + clientProtocol.rename2(file, renamedFile); + return null; + }); + getRouterFileSystem().delete(new Path(file), true); + getRouterFileSystem().delete(new Path(renamedFile), true); + } + + @Test + public void testRbfRenameWhenDstAlreadyExists() throws Exception { + List nss = cluster.getNameservices(); + String ns0 = nss.get(0); + String ns1 = nss.get(1); + + // Test router federation rename a path to a destination that is in a + // different namespace and already exists. + String dir = + cluster.getFederatedTestDirectoryForNS(ns0) + "/" + getMethodName(); + String renamedDir = + cluster.getFederatedTestDirectoryForNS(ns1) + "/" + getMethodName(); + createDir(routerFS, dir); + getRouterFileSystem().mkdirs(new Path(renamedDir)); + LambdaTestUtils.intercept(RemoteException.class, "already exists", + "Expect RemoteException.", () -> { + DFSClient client = router.getClient(); + ClientProtocol clientProtocol = client.getNamenode(); + clientProtocol.rename(dir, renamedDir); + return null; + }); + LambdaTestUtils.intercept(RemoteException.class, "already exists", + "Expect RemoteException.", () -> { + DFSClient client = router.getClient(); + ClientProtocol clientProtocol = client.getNamenode(); + clientProtocol.rename2(dir, renamedDir); + return null; + }); + getRouterFileSystem().delete(new Path(dir), true); + getRouterFileSystem().delete(new Path(renamedDir), true); + } + + @Test + public void testRbfRenameWhenSrcNotExists() throws Exception { + List nss = cluster.getNameservices(); + String ns0 = nss.get(0); + String ns1 = nss.get(1); + + // Test router federation rename un-existed path. + String dir = + cluster.getFederatedTestDirectoryForNS(ns0) + "/" + getMethodName(); + String renamedDir = + cluster.getFederatedTestDirectoryForNS(ns1) + "/" + getMethodName(); + LambdaTestUtils.intercept(RemoteException.class, "File does not exist", + "Expect RemoteException.", () -> { + DFSClient client = router.getClient(); + ClientProtocol clientProtocol = client.getNamenode(); + clientProtocol.rename(dir, renamedDir); + return null; + }); + LambdaTestUtils.intercept(RemoteException.class, "File does not exist", + "Expect RemoteException.", () -> { + DFSClient client = router.getClient(); + ClientProtocol clientProtocol = client.getNamenode(); + clientProtocol.rename2(dir, renamedDir); + return null; + }); + } + + @Test + public void testRbfRenameOfMountPoint() throws Exception { + List nss = cluster.getNameservices(); + String ns0 = nss.get(0); + String ns1 = nss.get(1); + + // Test router federation rename a mount point. + String dir = cluster.getFederatedPathForNS(ns0); + String renamedDir = cluster.getFederatedPathForNS(ns1); + LambdaTestUtils.intercept(RemoteException.class, "is a mount point", + "Expect RemoteException.", () -> { + DFSClient client = router.getClient(); + ClientProtocol clientProtocol = client.getNamenode(); + clientProtocol.rename(dir, renamedDir); + return null; + }); + LambdaTestUtils.intercept(RemoteException.class, "is a mount point", + "Expect RemoteException.", () -> { + DFSClient client = router.getClient(); + ClientProtocol clientProtocol = client.getNamenode(); + clientProtocol.rename2(dir, renamedDir); + return null; + }); + } + + @Test + public void testRbfRenameWithMultiDestination() throws Exception { + List nss = cluster.getNameservices(); + String ns1 = nss.get(1); + FileSystem rfs = getRouterFileSystem(); + + // Test router federation rename a path with multi-destination. + String dir = "/same/" + getMethodName(); + String renamedDir = cluster.getFederatedTestDirectoryForNS(ns1) + "/" + + getMethodName(); + createDir(rfs, dir); + getRouterFileSystem().mkdirs(new Path(renamedDir)); + LambdaTestUtils.intercept(RemoteException.class, + "The remote location should be exactly one", "Expect RemoteException.", + () -> { + DFSClient client = router.getClient(); + ClientProtocol clientProtocol = client.getNamenode(); + clientProtocol.rename(dir, renamedDir); + return null; + }); + LambdaTestUtils.intercept(RemoteException.class, + "The remote location should be exactly one", "Expect RemoteException.", + () -> { + DFSClient client = router.getClient(); + ClientProtocol clientProtocol = client.getNamenode(); + clientProtocol.rename2(dir, renamedDir); + return null; + }); + getRouterFileSystem().delete(new Path(dir), true); + getRouterFileSystem().delete(new Path(renamedDir), true); + } + + @Test(timeout = 10000) + public void testCounter() throws Exception { + final RouterRpcServer rpcServer = router.getRouter().getRpcServer(); + List nss = cluster.getNameservices(); + String ns0 = nss.get(0); + String ns1 = nss.get(1); + RouterFederationRename rbfRename = + Mockito.spy(new RouterFederationRename(rpcServer, router.getConf())); + String path = "/src"; + createDir(cluster.getCluster().getFileSystem(0), path); + // Watch the scheduler job count. + int expectedSchedulerCount = rpcServer.getSchedulerJobCount() + 1; + AtomicInteger maxSchedulerCount = new AtomicInteger(); + AtomicBoolean watch = new AtomicBoolean(true); + Thread watcher = new Thread(() -> { + while (watch.get()) { + int schedulerCount = rpcServer.getSchedulerJobCount(); + if (schedulerCount > maxSchedulerCount.get()) { + maxSchedulerCount.set(schedulerCount); + } + try { + Thread.sleep(1); + } catch (InterruptedException e) { + } + } + }); + watcher.start(); + // Trigger rename. + rbfRename.routerFedRename("/src", "/dst", + Arrays.asList(new RemoteLocation(ns0, path, null)), + Arrays.asList(new RemoteLocation(ns1, path, null))); + // Verify count. + verify(rbfRename).countIncrement(); + verify(rbfRename).countDecrement(); + watch.set(false); + watcher.interrupt(); + watcher.join(); + assertEquals(expectedSchedulerCount, maxSchedulerCount.get()); + // Clean up. + assertFalse(cluster.getCluster().getFileSystem(0).exists(new Path(path))); + assertTrue( + cluster.getCluster().getFileSystem(1).delete(new Path(path), true)); + } +} \ No newline at end of file diff --git a/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/DistCpProcedure.java b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/DistCpProcedure.java index fa4a088631..1a892991c8 100644 --- a/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/DistCpProcedure.java +++ b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/DistCpProcedure.java @@ -112,6 +112,14 @@ enum Stage { @VisibleForTesting static boolean enabledForTest = false; + public static void enableForTest() { + enabledForTest = true; + } + + public static void disableForTest() { + enabledForTest = false; + } + public DistCpProcedure() { } diff --git a/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalance.java b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalance.java index 64805c0518..1c9f9c0db1 100644 --- a/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalance.java +++ b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalance.java @@ -59,9 +59,9 @@ public class FedBalance extends Configured implements Tool { LoggerFactory.getLogger(FedBalance.class); private static final String SUBMIT_COMMAND = "submit"; private static final String CONTINUE_COMMAND = "continue"; - private static final String NO_MOUNT = "no-mount"; - private static final String DISTCP_PROCEDURE = "distcp-procedure"; - private static final String TRASH_PROCEDURE = "trash-procedure"; + public static final String NO_MOUNT = "no-mount"; + public static final String DISTCP_PROCEDURE = "distcp-procedure"; + public static final String TRASH_PROCEDURE = "trash-procedure"; public static final String FED_BALANCE_DEFAULT_XML = "hdfs-fedbalance-default.xml"; @@ -70,7 +70,7 @@ public class FedBalance extends Configured implements Tool { /** * This class helps building the balance job. */ - private class Builder { + private final class Builder { /* Force close all open files while there is no diff. */ private boolean forceCloseOpen = false; /* Max number of concurrent maps to use for copy. */ @@ -88,7 +88,7 @@ private class Builder { /* The dst input. This specifies the dst path. */ private final String inputDst; - Builder(String inputSrc, String inputDst) { + private Builder(String inputSrc, String inputDst) { this.inputSrc = inputSrc; this.inputDst = inputDst; } diff --git a/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalanceContext.java b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalanceContext.java index ec47a94227..2a49ecc9e6 100644 --- a/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalanceContext.java +++ b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalanceContext.java @@ -176,21 +176,34 @@ public int hashCode() { @Override public String toString() { - StringBuilder builder = new StringBuilder("FedBalance context:"); - builder.append(" src=").append(src); - builder.append(", dst=").append(dst); + StringBuilder builder = new StringBuilder(); + builder.append("Move ").append(src).append(" to ").append(dst); if (useMountReadOnly) { - builder.append(", router-mode=true"); - builder.append(", mount-point=").append(mount); + builder.append(" using router mode, mount point=").append(mount) + .append("."); } else { - builder.append(", router-mode=false"); + builder.append(" using normal federation mode."); } - builder.append(", forceCloseOpenFiles=").append(forceCloseOpenFiles); - builder.append(", trash=").append(trashOpt.name()); - builder.append(", map=").append(mapNum); - builder.append(", bandwidth=").append(bandwidthLimit); - builder.append(", delayDuration=").append(delayDuration); - builder.append(", diffThreshold=").append(diffThreshold); + builder.append(" Submit distcp job with map=").append(mapNum) + .append(" and bandwidth=").append(bandwidthLimit).append("."); + builder.append(" When the diff count is no greater than ") + .append(diffThreshold); + if (forceCloseOpenFiles) { + builder.append(", force close all open files."); + } else { + builder.append(", wait until there is no open files."); + } + switch (trashOpt) { + case DELETE: + builder.append(" Delete the src after the job is complete."); + break; + case TRASH: + builder.append(" Move the src to trash after the job is complete."); + break; + default: + break; + } + builder.append(" Delay duration is ").append(delayDuration).append("ms."); return builder.toString(); } diff --git a/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/procedure/BalanceProcedureScheduler.java b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/procedure/BalanceProcedureScheduler.java index 78e5ac7231..33500bc10e 100644 --- a/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/procedure/BalanceProcedureScheduler.java +++ b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/procedure/BalanceProcedureScheduler.java @@ -292,6 +292,7 @@ private void recoverAllJobs() throws IOException { for (BalanceJob job : jobs) { recoverQueue.add(job); jobSet.put(job, job); + LOG.info("Recover federation balance job {}.", job); } } diff --git a/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/TestDistCpProcedure.java b/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/TestDistCpProcedure.java index 9f554af2e6..8e282d1eb3 100644 --- a/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/TestDistCpProcedure.java +++ b/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/TestDistCpProcedure.java @@ -76,7 +76,7 @@ public class TestDistCpProcedure { @BeforeClass public static void beforeClass() throws IOException { - DistCpProcedure.enabledForTest = true; + DistCpProcedure.enableForTest(); conf = new Configuration(); conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, BLOCK_SIZE); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); @@ -92,7 +92,7 @@ public static void beforeClass() throws IOException { @AfterClass public static void afterClass() { - DistCpProcedure.enabledForTest = false; + DistCpProcedure.disableForTest(); if (cluster != null) { cluster.shutdown(); }