diff --git a/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/DistCpProcedure.java b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/DistCpProcedure.java index 223b77758d..33d37be35b 100644 --- a/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/DistCpProcedure.java +++ b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/DistCpProcedure.java @@ -89,6 +89,8 @@ enum Stage { private boolean forceCloseOpenFiles; /* Disable write by setting the mount point readonly. */ private boolean useMountReadOnly; + /* The threshold of diff entries. */ + private int diffThreshold; private FsPermission fPerm; // the permission of the src. private AclStatus acl; // the acl of the src. @@ -134,6 +136,7 @@ public DistCpProcedure(String name, String nextProcedure, long delayDuration, this.bandWidth = context.getBandwidthLimit(); this.forceCloseOpenFiles = context.getForceCloseOpenFiles(); this.useMountReadOnly = context.getUseMountReadOnly(); + this.diffThreshold = context.getDiffThreshold(); srcFs = (DistributedFileSystem) context.getSrc().getFileSystem(conf); dstFs = (DistributedFileSystem) context.getDst().getFileSystem(conf); } @@ -227,12 +230,8 @@ void diffDistCp() throws IOException, RetryException { } else { throw new RetryException(); // wait job complete. } - } else if (!verifyDiff()) { - if (!verifyOpenFiles() || forceCloseOpenFiles) { - updateStage(Stage.DISABLE_WRITE); - } else { - throw new RetryException(); - } + } else if (diffDistCpStageDone()) { + updateStage(Stage.DISABLE_WRITE); } else { submitDiffDistCp(); } @@ -372,14 +371,38 @@ private void closeAllOpenFiles(DistributedFileSystem dfs, Path path) } /** - * Verify whether the src has changed since CURRENT_SNAPSHOT_NAME snapshot. + * Check whether the conditions are satisfied for moving to the next stage. + * If the diff entries size is no greater than the threshold and the open + * files could be force closed or there is no open file, then moving to the + * next stage. * - * @return true if the src has changed. + * @return true if moving to the next stage. false if the conditions are not + * satisfied. + * @throws RetryException if the conditions are not satisfied and the diff + * size is under the given threshold scope. */ - private boolean verifyDiff() throws IOException { + @VisibleForTesting + boolean diffDistCpStageDone() throws IOException, RetryException { + int diffSize = getDiffSize(); + if (diffSize <= diffThreshold) { + if (forceCloseOpenFiles || !verifyOpenFiles()) { + return true; + } else { + throw new RetryException(); + } + } + return false; + } + + /** + * Get number of the diff entries. + * + * @return number of the diff entries. + */ + private int getDiffSize() throws IOException { SnapshotDiffReport diffReport = srcFs.getSnapshotDiffReport(src, CURRENT_SNAPSHOT_NAME, ""); - return diffReport.getDiffList().size() > 0; + return diffReport.getDiffList().size(); } /** diff --git a/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalance.java b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalance.java index ca6b0dfd37..c8507980c8 100644 --- a/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalance.java +++ b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalance.java @@ -51,6 +51,7 @@ import static org.apache.hadoop.tools.fedbalance.FedBalanceOptions.TRASH; import static org.apache.hadoop.tools.fedbalance.FedBalanceOptions.DELAY_DURATION; import static org.apache.hadoop.tools.fedbalance.FedBalanceOptions.CLI_OPTIONS; +import static org.apache.hadoop.tools.fedbalance.FedBalanceOptions.DIFF_THRESHOLD; import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.TrashOption; /** @@ -91,6 +92,8 @@ private class Builder { private TrashOption trashOpt = TrashOption.TRASH; /* Specify the duration(millie seconds) when the procedure needs retry. */ private long delayDuration = TimeUnit.SECONDS.toMillis(1); + /* Specify the threshold of diff entries. */ + private int diffThreshold = 0; /* The source input. This specifies the source path. */ private final String inputSrc; /* The dst input. This specifies the dst path. */ @@ -155,6 +158,15 @@ public Builder setDelayDuration(long value) { return this; } + /** + * Specify the threshold of diff entries. + * @param value the threshold of a fast distcp. + */ + public Builder setDiffThreshold(int value) { + this.diffThreshold = value; + return this; + } + /** * Build the balance job. */ @@ -172,7 +184,8 @@ public BalanceJob build() throws IOException { .setForceCloseOpenFiles(forceCloseOpen) .setUseMountReadOnly(routerCluster).setMapNum(map) .setBandwidthLimit(bandwidth).setTrash(trashOpt) - .setDelayDuration(delayDuration).build(); + .setDelayDuration(delayDuration) + .setDiffThreshold(diffThreshold).build(); } else { // normal federation cluster. Path src = new Path(inputSrc); if (src.toUri().getAuthority() == null) { @@ -181,7 +194,8 @@ public BalanceJob build() throws IOException { context = new FedBalanceContext.Builder(src, dst, NO_MOUNT, getConf()) .setForceCloseOpenFiles(forceCloseOpen) .setUseMountReadOnly(routerCluster).setMapNum(map) - .setBandwidthLimit(bandwidth).setTrash(trashOpt).build(); + .setBandwidthLimit(bandwidth).setTrash(trashOpt) + .setDiffThreshold(diffThreshold).build(); } LOG.info(context.toString()); @@ -290,6 +304,10 @@ private int submit(CommandLine command, String inputSrc, String inputDst) builder.setDelayDuration( Long.parseLong(command.getOptionValue(DELAY_DURATION.getOpt()))); } + if (command.hasOption(DIFF_THRESHOLD.getOpt())) { + builder.setDiffThreshold(Integer.parseInt( + command.getOptionValue(DIFF_THRESHOLD.getOpt()))); + } if (command.hasOption(TRASH.getOpt())) { String val = command.getOptionValue(TRASH.getOpt()); if (val.equalsIgnoreCase("skip")) { diff --git a/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalanceContext.java b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalanceContext.java index 56be7db48e..f4f570026f 100644 --- a/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalanceContext.java +++ b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalanceContext.java @@ -54,6 +54,8 @@ public class FedBalanceContext implements Writable { private TrashOption trashOpt; /* How long will the procedures be delayed. */ private long delayDuration; + /* The threshold of diff entries. */ + private int diffThreshold; private Configuration conf; @@ -91,6 +93,10 @@ public int getBandwidthLimit() { return bandwidthLimit; } + public int getDiffThreshold() { + return diffThreshold; + } + public TrashOption getTrashOpt() { return trashOpt; } @@ -107,6 +113,7 @@ public void write(DataOutput out) throws IOException { out.writeInt(bandwidthLimit); out.writeInt(trashOpt.ordinal()); out.writeLong(delayDuration); + out.writeInt(diffThreshold); } @Override @@ -122,6 +129,7 @@ public void readFields(DataInput in) throws IOException { bandwidthLimit = in.readInt(); trashOpt = TrashOption.values()[in.readInt()]; delayDuration = in.readLong(); + diffThreshold = in.readInt(); } @Override @@ -146,6 +154,7 @@ public boolean equals(Object obj) { .append(bandwidthLimit, bc.bandwidthLimit) .append(trashOpt, bc.trashOpt) .append(delayDuration, bc.delayDuration) + .append(diffThreshold, bc.diffThreshold) .isEquals(); } @@ -161,6 +170,7 @@ public int hashCode() { .append(bandwidthLimit) .append(trashOpt) .append(delayDuration) + .append(diffThreshold) .build(); } @@ -180,6 +190,7 @@ public String toString() { builder.append(", map=").append(mapNum); builder.append(", bandwidth=").append(bandwidthLimit); builder.append(", delayDuration=").append(delayDuration); + builder.append(", diffThreshold=").append(diffThreshold); return builder.toString(); } @@ -194,6 +205,7 @@ static class Builder { private int bandwidthLimit; private TrashOption trashOpt; private long delayDuration; + private int diffThreshold; /** * This class helps building the FedBalanceContext. @@ -263,6 +275,14 @@ public Builder setDelayDuration(long value) { return this; } + /** + * Specify the threshold of diff entries. + */ + public Builder setDiffThreshold(int value) { + this.diffThreshold = value; + return this; + } + /** * Build the FedBalanceContext. * @@ -280,6 +300,7 @@ public FedBalanceContext build() { context.bandwidthLimit = this.bandwidthLimit; context.trashOpt = this.trashOpt; context.delayDuration = this.delayDuration; + context.diffThreshold = this.diffThreshold; return context; } } diff --git a/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalanceOptions.java b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalanceOptions.java index 71a7d9db00..d7be6a8157 100644 --- a/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalanceOptions.java +++ b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalanceOptions.java @@ -71,6 +71,17 @@ private FedBalanceOptions() {} + " needs to retry. A job may retry many times and check the state" + " when it waits for the distcp job to finish."); + /** + * Specify the threshold of diff entries. + */ + final static Option DIFF_THRESHOLD = new Option("diffThreshold", true, + "This specifies the threshold of the diff entries that used in" + + " incremental copy stage. If the diff entries size is no greater" + + " than this threshold and the open files check is satisfied" + + "(no open files or force close all open files), the fedBalance will" + + " go to the final round of distcp. Default value is 0, that means" + + " waiting until there is no diff."); + /** * Move the source path to trash after all the data are sync to target, or * delete the source directly, or skip both trash and deletion. diff --git a/hadoop-tools/hadoop-federation-balance/src/site/markdown/HDFSFederationBalance.md b/hadoop-tools/hadoop-federation-balance/src/site/markdown/HDFSFederationBalance.md index ff42eaf552..03e6e60e57 100644 --- a/hadoop-tools/hadoop-federation-balance/src/site/markdown/HDFSFederationBalance.md +++ b/hadoop-tools/hadoop-federation-balance/src/site/markdown/HDFSFederationBalance.md @@ -101,6 +101,7 @@ Command `submit` has 5 options: | -bandwidth | Specify bandwidth per map in MB. | 10 | | -delay | Specify the delayed duration(millie seconds) when the job needs to retry. | 1000 | | -moveToTrash | This options has 3 values: `trash` (move the source path to trash), `delete` (delete the source path directly) and `skip` (skip both trash and deletion). By default the server side trash interval is used. If the trash is disabled in the server side, the default trash interval 60 minutes is used. | trash | +| -diffThreshold | Specify the threshold of the diff entries that used in incremental copy stage. If the diff entries size is no greater than the threshold and the open files check is satisfied(no open files or force close all open files), the fedBalance will go to the final round of distcp. Setting to 0 means waiting until there is no diff.| 0 | ### Configuration Options -------------------- diff --git a/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/TestDistCpProcedure.java b/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/TestDistCpProcedure.java index ec565c36d8..ea5a8a0280 100644 --- a/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/TestDistCpProcedure.java +++ b/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/TestDistCpProcedure.java @@ -171,6 +171,33 @@ public void testInitDistCp() throws Exception { cleanup(fs, new Path(testRoot)); } + @Test + public void testDiffThreshold() throws Exception { + String testRoot = nnUri + "/user/foo/testdir." + getMethodName(); + DistributedFileSystem fs = + (DistributedFileSystem) FileSystem.get(URI.create(nnUri), conf); + createFiles(fs, testRoot, srcfiles); + Path src = new Path(testRoot, SRCDAT); + Path dst = new Path(testRoot, DSTDAT); + + FedBalanceContext context = buildContext(src, dst, MOUNT, 10); + DistCpProcedure dcProcedure = + new DistCpProcedure("distcp-procedure", null, 1000, context); + executeProcedure(dcProcedure, Stage.DIFF_DISTCP, + () -> dcProcedure.initDistCp()); + // Test distcp with diff entries number no greater than threshold. + Path lastPath = new Path(src, "a"); + for (int i = 0; i < 5; i++) { + Path newPath = new Path(src, "a-" + i); + fs.rename(lastPath, newPath); + lastPath = newPath; + assertTrue(dcProcedure.diffDistCpStageDone()); + executeProcedure(dcProcedure, Stage.DISABLE_WRITE, + () -> dcProcedure.diffDistCp()); + } + cleanup(fs, new Path(testRoot)); + } + @Test(timeout = 30000) public void testDiffDistCp() throws Exception { String testRoot = nnUri + "/user/foo/testdir." + getMethodName(); @@ -351,9 +378,14 @@ public void testDisableWrite() throws Exception { } private FedBalanceContext buildContext(Path src, Path dst, String mount) { + return buildContext(src, dst, mount, 0); + } + + private FedBalanceContext buildContext(Path src, Path dst, String mount, + int diffThreshold) { return new FedBalanceContext.Builder(src, dst, mount, conf).setMapNum(10) .setBandwidthLimit(1).setTrash(TrashOption.TRASH).setDelayDuration(1000) - .build(); + .setDiffThreshold(diffThreshold).build(); } interface Call {