diff --git a/hadoop-assemblies/src/main/resources/assemblies/hadoop-tools.xml b/hadoop-assemblies/src/main/resources/assemblies/hadoop-tools.xml index 054d8c0ace..db744f511d 100644 --- a/hadoop-assemblies/src/main/resources/assemblies/hadoop-tools.xml +++ b/hadoop-assemblies/src/main/resources/assemblies/hadoop-tools.xml @@ -47,6 +47,14 @@ /libexec/shellprofile.d 0755 + + ../hadoop-federation-balance/src/main/shellprofile.d + + * + + /libexec/shellprofile.d + 0755 + ../hadoop-extras/src/main/shellprofile.d @@ -111,6 +119,13 @@ *-sources.jar + + ../hadoop-federation-balance/target + /share/hadoop/${hadoop.component}/sources + + *-sources.jar + + ../hadoop-extras/target /share/hadoop/${hadoop.component}/sources diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java index ab61e50450..a025b9bad2 100755 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java @@ -105,6 +105,8 @@ public byte value() { public static final String DOT_SNAPSHOT_DIR = ".snapshot"; public static final String SEPARATOR_DOT_SNAPSHOT_DIR = Path.SEPARATOR + DOT_SNAPSHOT_DIR; + public static final String DOT_SNAPSHOT_DIR_SEPARATOR = + DOT_SNAPSHOT_DIR + Path.SEPARATOR; public static final String SEPARATOR_DOT_SNAPSHOT_DIR_SEPARATOR = Path.SEPARATOR + DOT_SNAPSHOT_DIR + Path.SEPARATOR; public final static String DOT_RESERVED_STRING = ".reserved"; diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index f3a3d76a64..48928b508e 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -327,6 +327,12 @@ ${hadoop.version} test-jar + + org.apache.hadoop + hadoop-hdfs-rbf + ${hadoop.version} + test-jar + org.apache.hadoop hadoop-mapreduce-client-app @@ -578,6 +584,17 @@ ${hadoop.version} test-jar + + org.apache.hadoop + hadoop-federation-balance + ${hadoop.version} + + + org.apache.hadoop + hadoop-federation-balance + ${hadoop.version} + test-jar + org.apache.hadoop hadoop-datajoin diff --git a/hadoop-tools/hadoop-federation-balance/pom.xml b/hadoop-tools/hadoop-federation-balance/pom.xml new file mode 100644 index 0000000000..cf79e17c5a --- /dev/null +++ b/hadoop-tools/hadoop-federation-balance/pom.xml @@ -0,0 +1,249 @@ + + + + 4.0.0 + + org.apache.hadoop + hadoop-project + 3.4.0-SNAPSHOT + ../../hadoop-project + + hadoop-federation-balance + 3.4.0-SNAPSHOT + Apache Hadoop Federation Balance + Apache Hadoop Federation Balance + jar + + + UTF-8 + true + + + + + junit + junit + test + + + org.apache.hadoop + hadoop-common + provided + + + org.apache.hadoop + hadoop-annotations + provided + + + org.apache.hadoop + hadoop-mapreduce-client-app + test + + + org.apache.hadoop + hadoop-mapreduce-client-hs + test + + + org.apache.hadoop + hadoop-mapreduce-client-core + provided + + + org.apache.hadoop + hadoop-mapreduce-client-jobclient + provided + + + org.apache.hadoop + hadoop-mapreduce-client-jobclient + test + test-jar + + + org.apache.hadoop + hadoop-hdfs-client + provided + + + org.apache.hadoop + hadoop-hdfs + provided + + + org.apache.hadoop + hadoop-hdfs + test + test-jar + + + org.apache.hadoop + hadoop-distcp + provided + + + org.apache.hadoop + hadoop-common + test + test-jar + + + org.apache.hadoop + hadoop-hdfs-rbf + provided + + + org.apache.hadoop + hadoop-hdfs-rbf + test + test-jar + + + org.mockito + mockito-core + test + + + org.assertj + assertj-core + test + + + org.apache.hadoop + hadoop-minicluster + provided + + + + + + + src/main/resources + true + + + + + src/test/resources + true + + + + + org.apache.maven.plugins + maven-surefire-plugin + + ${ignoreTestFailure} + 1 + false + 600 + -Xmx1024m + + **/Test*.java + + true + + + test.build.data + ${basedir}/target/test/data + + + hadoop.log.dir + target/test/logs + + + org.apache.commons.logging.Log + org.apache.commons.logging.impl.SimpleLog + + + org.apache.commons.logging.simplelog.defaultlog + warn + + + + + + maven-dependency-plugin + + + package + + copy-dependencies + + + ${project.build.directory}/lib + + + + deplist + compile + + list + + + + ${project.basedir}/target/hadoop-tools-deps/${project.artifactId}.tools-builtin.txt + + + + + + org.apache.maven.plugins + maven-jar-plugin + + + + org.apache.hadoop.tools.fedbalance.FedBalance + + + + + + prepare-jar + prepare-package + + jar + + + + prepare-test-jar + prepare-package + + test-jar + + + + + + org.apache.maven.plugins + maven-source-plugin + + true + + + + + jar + + + + + + + diff --git a/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/DistCpBalanceOptions.java b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/DistCpBalanceOptions.java new file mode 100644 index 0000000000..704ffd9dcc --- /dev/null +++ b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/DistCpBalanceOptions.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.tools.fedbalance; + +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; + +/** + * Command line options of FedBalance. + */ +public final class DistCpBalanceOptions { + + /** + * The private construct protects this class from being instantiated. + */ + private DistCpBalanceOptions() {} + + /** + * Run in router-based federation mode. + */ + final static Option ROUTER = new Option("router", false, + "If `true` the command runs in router mode. The source path is " + + "taken as a mount point. It will disable write by setting the mount" + + " point readonly. Otherwise the command works in normal federation" + + " mode. The source path is taken as the full path. It will disable" + + " write by cancelling all permissions of the source path. The" + + " default value is `true`."); + + /** + * If true, in DIFF_DISTCP stage it will force close all open files when + * there is no diff between the source path and the dst path. Otherwise + * the DIFF_DISTCP stage will wait until there is no open files. The + * default value is `false`. + */ + final static Option FORCE_CLOSE_OPEN = new Option("forceCloseOpen", false, + "Force close all open files if the src and dst are synced."); + + /** + * Max number of maps to use during copy. DistCp will split work as equally + * as possible among these maps. + */ + final static Option MAP = + new Option("map", true, "Max number of concurrent maps to use for copy"); + + /** + * Specify bandwidth per map in MB, accepts bandwidth as a fraction. + */ + final static Option BANDWIDTH = + new Option("bandwidth", true, "Specify bandwidth per map in MB."); + + /** + * Specify the delayed duration(millie seconds) to retry the Job. + */ + final static Option DELAY_DURATION = new Option("delay", true, + "This specifies the delayed duration(millie seconds) when the job" + + " needs to retry. A job may retry many times and check the state" + + " when it waits for the distcp job to finish."); + + /** + * Move the source path to trash after all the data are sync to target, or + * delete the source directly, or skip both trash and deletion. + */ + final static Option TRASH = new Option("moveToTrash", true, + "Move the source path to trash, or delete the source path directly," + + " or skip both trash and deletion. This accepts 3 values: trash," + + " delete and skip. By default the server side trash interval is" + + " used. If the trash is disabled in the server side, the default" + + " trash interval 60 minutes is used."); + + final static Options CLI_OPTIONS = new Options(); + + static { + CLI_OPTIONS.addOption(ROUTER); + CLI_OPTIONS.addOption(FORCE_CLOSE_OPEN); + CLI_OPTIONS.addOption(MAP); + CLI_OPTIONS.addOption(BANDWIDTH); + CLI_OPTIONS.addOption(DELAY_DURATION); + CLI_OPTIONS.addOption(TRASH); + } +} diff --git a/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/DistCpProcedure.java b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/DistCpProcedure.java new file mode 100644 index 0000000000..73fecbf346 --- /dev/null +++ b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/DistCpProcedure.java @@ -0,0 +1,635 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.tools.fedbalance; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.permission.AclStatus; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.OpenFileEntry; +import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType; +import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; +import org.apache.hadoop.hdfs.protocol.proto.AclProtos; +import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; +import org.apache.hadoop.tools.DistCp; +import org.apache.hadoop.tools.OptionsParser; +import org.apache.hadoop.tools.fedbalance.procedure.BalanceProcedure; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobID; +import org.apache.hadoop.mapred.RunningJob; +import org.apache.hadoop.mapreduce.Job; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.ByteArrayOutputStream; +import java.io.ByteArrayInputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.EnumSet; +import java.util.List; + +import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.CURRENT_SNAPSHOT_NAME; +import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.LAST_SNAPSHOT_NAME; + +/** + * Copy data through distcp. Super user privilege needed. + * + * PRE_CHECK :pre-check of src and dst. + * INIT_DISTCP :the first round of distcp. + * DIFF_DISTCP :copy snapshot diff round by round until there is + * no diff. + * DISABLE_WRITE :disable write operations. + * FINAL_DISTCP :close all open files and do the final round distcp. + * FINISH :procedure finish. + */ +public class DistCpProcedure extends BalanceProcedure { + + public static final Logger LOG = + LoggerFactory.getLogger(DistCpProcedure.class); + + /* Stages of this procedure. */ + enum Stage { + PRE_CHECK, INIT_DISTCP, DIFF_DISTCP, DISABLE_WRITE, FINAL_DISTCP, FINISH + } + + private FedBalanceContext context; // the balance context. + private Path src; // the source path including the source cluster. + private Path dst; // the dst path including the dst cluster. + private Configuration conf; + private int mapNum; // the number of map tasks. + private int bandWidth; // the bandwidth limit of each distcp task. + private String jobId; // the id of the current distcp. + private Stage stage; // current stage of this procedure. + + /* Force close all open files when there is no diff between src and dst */ + private boolean forceCloseOpenFiles; + /* Disable write by setting the mount point readonly. */ + private boolean useMountReadOnly; + + private FsPermission fPerm; // the permission of the src. + private AclStatus acl; // the acl of the src. + + private JobClient client; + private DistributedFileSystem srcFs; // fs of the src cluster. + private DistributedFileSystem dstFs; // fs of the dst cluster. + + /** + * Test only. In unit test we use the LocalJobRunner to run the distcp jobs. + * Here we save the job to look up the job status. The localJob won't be + * serialized thus won't be recovered. + */ + @VisibleForTesting + private Job localJob; + /** + * Enable test mode. Use LocalJobRunner to run the distcp jobs. + */ + @VisibleForTesting + static boolean enabledForTest = false; + + public DistCpProcedure() { + } + + /** + * The constructor of DistCpProcedure. + * + * @param name the name of the procedure. + * @param nextProcedure the name of the next procedure. + * @param delayDuration the delay duration when this procedure is delayed. + * @param context the federation balance context. + */ + public DistCpProcedure(String name, String nextProcedure, long delayDuration, + FedBalanceContext context) throws IOException { + super(name, nextProcedure, delayDuration); + this.context = context; + this.src = context.getSrc(); + this.dst = context.getDst(); + this.conf = context.getConf(); + this.client = new JobClient(conf); + this.stage = Stage.PRE_CHECK; + this.mapNum = context.getMapNum(); + this.bandWidth = context.getBandwidthLimit(); + this.forceCloseOpenFiles = context.getForceCloseOpenFiles(); + this.useMountReadOnly = context.getUseMountReadOnly(); + srcFs = (DistributedFileSystem) context.getSrc().getFileSystem(conf); + dstFs = (DistributedFileSystem) context.getDst().getFileSystem(conf); + } + + @Override + public boolean execute() throws RetryException, IOException { + LOG.info("Stage={}", stage.name()); + switch (stage) { + case PRE_CHECK: + preCheck(); + return false; + case INIT_DISTCP: + initDistCp(); + return false; + case DIFF_DISTCP: + diffDistCp(); + return false; + case DISABLE_WRITE: + disableWrite(); + return false; + case FINAL_DISTCP: + finalDistCp(); + return false; + case FINISH: + finish(); + return true; + default: + throw new IOException("Unexpected stage=" + stage); + } + } + + /** + * Pre check of src and dst. + */ + void preCheck() throws IOException { + FileStatus status = srcFs.getFileStatus(src); + if (!status.isDirectory()) { + throw new IOException(src + " should be a directory."); + } + if (dstFs.exists(dst)) { + throw new IOException(dst + " already exists."); + } + if (srcFs.exists(new Path(src, HdfsConstants.DOT_SNAPSHOT_DIR))) { + throw new IOException(src + " shouldn't enable snapshot."); + } + updateStage(Stage.INIT_DISTCP); + } + + /** + * The initial distcp. Copying src to dst. + */ + void initDistCp() throws IOException, RetryException { + RunningJobStatus job = getCurrentJob(); + if (job != null) { + // the distcp has been submitted. + if (job.isComplete()) { + jobId = null; // unset jobId because the job is done. + if (job.isSuccessful()) { + updateStage(Stage.DIFF_DISTCP); + return; + } else { + LOG.warn("DistCp failed. Failure={}", job.getFailureInfo()); + } + } else { + throw new RetryException(); + } + } else { + pathCheckBeforeInitDistcp(); + srcFs.createSnapshot(src, CURRENT_SNAPSHOT_NAME); + jobId = submitDistCpJob( + src.toString() + HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR_SEPARATOR + + CURRENT_SNAPSHOT_NAME, dst.toString(), false); + } + } + + /** + * The distcp copying diffs between LAST_SNAPSHOT_NAME and + * CURRENT_SNAPSHOT_NAME. + */ + void diffDistCp() throws IOException, RetryException { + RunningJobStatus job = getCurrentJob(); + if (job != null) { + if (job.isComplete()) { + jobId = null; + if (job.isSuccessful()) { + LOG.info("DistCp succeeded. jobId={}", job.getJobID()); + } else { + throw new IOException("DistCp failed. jobId=" + job.getJobID() + + " failure=" + job.getFailureInfo()); + } + } else { + throw new RetryException(); // wait job complete. + } + } else if (!verifyDiff()) { + if (!verifyOpenFiles() || forceCloseOpenFiles) { + updateStage(Stage.DISABLE_WRITE); + } else { + throw new RetryException(); + } + } else { + submitDiffDistCp(); + } + } + + /** + * Disable write either by making the mount entry readonly or cancelling the + * execute permission of the source path. + */ + void disableWrite() throws IOException { + if (useMountReadOnly) { + String mount = context.getMount(); + MountTableProcedure.disableWrite(mount, conf); + } else { + // Save and cancel permission. + FileStatus status = srcFs.getFileStatus(src); + fPerm = status.getPermission(); + acl = srcFs.getAclStatus(src); + srcFs.setPermission(src, FsPermission.createImmutable((short) 0)); + } + updateStage(Stage.FINAL_DISTCP); + } + + /** + * Enable write by restoring the x permission. + */ + void restorePermission() throws IOException { + // restore permission. + dstFs.removeAcl(dst); + if (acl != null) { + dstFs.modifyAclEntries(dst, acl.getEntries()); + } + if (fPerm != null) { + dstFs.setPermission(dst, fPerm); + } + } + + /** + * Close all open files then submit the distcp with -diff. + */ + void finalDistCp() throws IOException, RetryException { + // Close all open files then do the final distcp. + closeAllOpenFiles(srcFs, src); + // Final distcp. + RunningJobStatus job = getCurrentJob(); + if (job != null) { + // the distcp has been submitted. + if (job.isComplete()) { + jobId = null; // unset jobId because the job is done. + if (job.isSuccessful()) { + updateStage(Stage.FINISH); + return; + } else { + throw new IOException( + "Final DistCp failed. Failure: " + job.getFailureInfo()); + } + } else { + throw new RetryException(); + } + } else { + submitDiffDistCp(); + } + } + + void finish() throws IOException { + if (!useMountReadOnly) { + restorePermission(); + } + if (srcFs.exists(src)) { + cleanupSnapshot(srcFs, src); + } + if (dstFs.exists(dst)) { + cleanupSnapshot(dstFs, dst); + } + } + + @VisibleForTesting + Stage getStage() { + return stage; + } + + @VisibleForTesting + void updateStage(Stage value) { + String oldStage = stage == null ? "null" : stage.name(); + String newStage = value == null ? "null" : value.name(); + LOG.info("Stage updated from {} to {}.", oldStage, newStage); + stage = value; + } + + /** + * Submit distcp with -diff option to do the incremental copy. + * + * | the source path | the dst path | + * | LAST_SNAPSHOT_NAME | LAST_SNAPSHOT_NAME | + * | CURRENT_SNAPSHOT_NAME | + * + * 1. Cleanup all the last snapshots. If there are no last snapshots then do + * nothing. + * 2. Create the dst path snapshot named the last snapshot. + * 3. Rename the source path current snapshot as the last snapshot. The dst + * path last snapshot and the source path last snapshot are the same now. + * 4. Create the current snapshot of the source path. + * 5. Submit the distcp job. The incremental part is from the source path last + * snapshot to the source path current snapshot. + */ + private void submitDiffDistCp() throws IOException { + enableSnapshot(dstFs, dst); + deleteSnapshot(srcFs, src, LAST_SNAPSHOT_NAME); + deleteSnapshot(dstFs, dst, LAST_SNAPSHOT_NAME); + dstFs.createSnapshot(dst, LAST_SNAPSHOT_NAME); + srcFs.renameSnapshot(src, CURRENT_SNAPSHOT_NAME, LAST_SNAPSHOT_NAME); + srcFs.createSnapshot(src, CURRENT_SNAPSHOT_NAME); + jobId = submitDistCpJob(src.toString(), dst.toString(), true); + } + + /** + * Close all open files. Block until all the files are closed. + */ + private void closeAllOpenFiles(DistributedFileSystem dfs, Path path) + throws IOException { + String pathStr = path.toUri().getPath(); + while (true) { + RemoteIterator iterator = + dfs.listOpenFiles(EnumSet.of(OpenFilesType.ALL_OPEN_FILES), pathStr); + if (!iterator.hasNext()) { // all files has been closed. + break; + } + while (iterator.hasNext()) { + OpenFileEntry e = iterator.next(); + try { + srcFs.recoverLease(new Path(e.getFilePath())); + } catch (IOException re) { + // ignore recoverLease error. + } + } + } + } + + /** + * Verify whether the src has changed since CURRENT_SNAPSHOT_NAME snapshot. + * + * @return true if the src has changed. + */ + private boolean verifyDiff() throws IOException { + SnapshotDiffReport diffReport = + srcFs.getSnapshotDiffReport(src, CURRENT_SNAPSHOT_NAME, ""); + return diffReport.getDiffList().size() > 0; + } + + /** + * Verify whether there is any open files under src. + * + * @return true if there are open files. + */ + private boolean verifyOpenFiles() throws IOException { + RemoteIterator iterator = srcFs + .listOpenFiles(EnumSet.of(OpenFilesType.ALL_OPEN_FILES), + src.toString()); + return iterator.hasNext(); + } + + private RunningJobStatus getCurrentJob() throws IOException { + if (jobId != null) { + if (enabledForTest) { + return getCurrentLocalJob(); + } else { + RunningJob latestJob = client.getJob(JobID.forName(jobId)); + return latestJob == null ? null : new YarnRunningJobStatus(latestJob); + } + } + return null; + } + + private LocalJobStatus getCurrentLocalJob() throws IOException { + if (localJob != null) { + Job latestJob; + try { + latestJob = localJob.getCluster().getJob(JobID.forName(jobId)); + } catch (InterruptedException e) { + throw new IOException(e); + } + return latestJob == null ? null : new LocalJobStatus(latestJob); + } else { + return null; + } + } + + private void pathCheckBeforeInitDistcp() throws IOException { + if (dstFs.exists(dst)) { // clean up. + throw new IOException("The dst path=" + dst + " already exists. The admin" + + " should delete it before submitting the initial distcp job."); + } + Path snapshotPath = new Path(src, + HdfsConstants.DOT_SNAPSHOT_DIR_SEPARATOR + CURRENT_SNAPSHOT_NAME); + if (srcFs.exists(snapshotPath)) { + throw new IOException("The src snapshot=" + snapshotPath + + " already exists. The admin should delete the snapshot before" + + " submitting the initial distcp."); + } + srcFs.allowSnapshot(src); + } + + /** + * Submit distcp job and return jobId. + */ + private String submitDistCpJob(String srcParam, String dstParam, + boolean useSnapshotDiff) throws IOException { + List command = new ArrayList<>(); + command.addAll(Arrays + .asList(new String[] {"-async", "-update", "-append", "-pruxgpcab"})); + if (useSnapshotDiff) { + command.add("-diff"); + command.add(LAST_SNAPSHOT_NAME); + command.add(CURRENT_SNAPSHOT_NAME); + } + command.add("-m"); + command.add(mapNum + ""); + command.add("-bandwidth"); + command.add(bandWidth + ""); + command.add(srcParam); + command.add(dstParam); + + Configuration config = new Configuration(conf); + DistCp distCp; + try { + distCp = new DistCp(config, + OptionsParser.parse(command.toArray(new String[]{}))); + Job job = distCp.createAndSubmitJob(); + LOG.info("Submit distcp job={}", job); + if (enabledForTest) { + localJob = job; + } + return job.getJobID().toString(); + } catch (Exception e) { + throw new IOException("Submit job failed.", e); + } + } + + @Override + public void write(DataOutput out) throws IOException { + super.write(out); + context.write(out); + if (jobId == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + Text.writeString(out, jobId); + } + out.writeInt(stage.ordinal()); + if (fPerm == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + out.writeShort(fPerm.toShort()); + } + if (acl == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + ByteArrayOutputStream bout = new ByteArrayOutputStream(); + PBHelperClient.convert(acl).writeDelimitedTo(bout); + byte[] data = bout.toByteArray(); + out.writeInt(data.length); + out.write(data); + } + } + + @Override + public void readFields(DataInput in) throws IOException { + super.readFields(in); + context = new FedBalanceContext(); + context.readFields(in); + src = context.getSrc(); + dst = context.getDst(); + conf = context.getConf(); + if (in.readBoolean()) { + jobId = Text.readString(in); + } + stage = Stage.values()[in.readInt()]; + if (in.readBoolean()) { + fPerm = FsPermission.read(in); + } + if (in.readBoolean()) { + int len = in.readInt(); + byte[] data = new byte[len]; + in.readFully(data); + ByteArrayInputStream bin = new ByteArrayInputStream(data); + AclProtos.GetAclStatusResponseProto proto = + AclProtos.GetAclStatusResponseProto.parseDelimitedFrom(bin); + acl = PBHelperClient.convert(proto); + } + srcFs = (DistributedFileSystem) context.getSrc().getFileSystem(conf); + dstFs = (DistributedFileSystem) context.getDst().getFileSystem(conf); + mapNum = context.getMapNum(); + bandWidth = context.getBandwidthLimit(); + forceCloseOpenFiles = context.getForceCloseOpenFiles(); + useMountReadOnly = context.getUseMountReadOnly(); + this.client = new JobClient(conf); + } + + private static void enableSnapshot(DistributedFileSystem dfs, Path path) + throws IOException { + if (!dfs.exists(new Path(path, HdfsConstants.DOT_SNAPSHOT_DIR))) { + dfs.allowSnapshot(path); + } + } + + static void deleteSnapshot(DistributedFileSystem dfs, Path path, + String snapshotName) throws IOException { + Path snapshot = + new Path(path, HdfsConstants.DOT_SNAPSHOT_DIR_SEPARATOR + snapshotName); + if (dfs.exists(snapshot)) { + dfs.deleteSnapshot(path, snapshotName); + } + } + + static void cleanupSnapshot(DistributedFileSystem dfs, Path path) + throws IOException { + if (dfs.exists(new Path(path, HdfsConstants.DOT_SNAPSHOT_DIR))) { + FileStatus[] status = + dfs.listStatus(new Path(path, HdfsConstants.DOT_SNAPSHOT_DIR)); + for (FileStatus s : status) { + deleteSnapshot(dfs, path, s.getPath().getName()); + } + dfs.disallowSnapshot(path); + } + } + + interface RunningJobStatus { + String getJobID(); + + boolean isComplete() throws IOException; + + boolean isSuccessful() throws IOException; + + String getFailureInfo() throws IOException; + } + + private static class YarnRunningJobStatus implements RunningJobStatus { + + private final RunningJob job; + + YarnRunningJobStatus(RunningJob job) { + this.job = job; + } + + @Override + public String getJobID() { + return job.getID().toString(); + } + + @Override + public boolean isComplete() throws IOException { + return job.isComplete(); + } + + @Override + public boolean isSuccessful() throws IOException { + return job.isSuccessful(); + } + + @Override + public String getFailureInfo() throws IOException { + return job.getFailureInfo(); + } + } + + private static class LocalJobStatus implements RunningJobStatus { + + private final Job testJob; + + LocalJobStatus(Job testJob) { + this.testJob = testJob; + } + + @Override + public String getJobID() { + return testJob.getJobID().toString(); + } + + @Override + public boolean isComplete() throws IOException { + return testJob.isComplete(); + } + + @Override + public boolean isSuccessful() throws IOException { + return testJob.isSuccessful(); + } + + @Override + public String getFailureInfo() throws IOException { + try { + return testJob.getStatus().getFailureInfo(); + } catch (InterruptedException e) { + throw new IOException(e); + } + } + } +} \ No newline at end of file diff --git a/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalance.java b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalance.java new file mode 100644 index 0000000000..adfb40bf74 --- /dev/null +++ b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalance.java @@ -0,0 +1,377 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.tools.fedbalance; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.tools.fedbalance.procedure.BalanceProcedure; +import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager; +import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys; +import org.apache.hadoop.hdfs.server.federation.router.RouterClient; +import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; +import org.apache.hadoop.tools.fedbalance.procedure.BalanceJob; +import org.apache.hadoop.tools.fedbalance.procedure.BalanceProcedureScheduler; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Collection; +import java.util.concurrent.TimeUnit; + +import static org.apache.hadoop.tools.fedbalance.DistCpBalanceOptions.ROUTER; +import static org.apache.hadoop.tools.fedbalance.DistCpBalanceOptions.FORCE_CLOSE_OPEN; +import static org.apache.hadoop.tools.fedbalance.DistCpBalanceOptions.MAP; +import static org.apache.hadoop.tools.fedbalance.DistCpBalanceOptions.BANDWIDTH; +import static org.apache.hadoop.tools.fedbalance.DistCpBalanceOptions.TRASH; +import static org.apache.hadoop.tools.fedbalance.DistCpBalanceOptions.DELAY_DURATION; +import static org.apache.hadoop.tools.fedbalance.DistCpBalanceOptions.CLI_OPTIONS; +import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.FEDERATION_BALANCE_CLASS; +import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.TrashOption; + +/** + * Balance data from src cluster to dst cluster with distcp. + * + * 1. Move data from the source path to the destination path with distcp. + * 2. Update the the mount entry. + * 3. Delete the source path to trash. + */ +public class FedBalance extends Configured implements Tool { + + public static final Logger LOG = + LoggerFactory.getLogger(FedBalance.class); + private static final String SUBMIT_COMMAND = "submit"; + private static final String CONTINUE_COMMAND = "continue"; + private static final String NO_MOUNT = "no-mount"; + private static final String DISTCP_PROCEDURE = "distcp-procedure"; + private static final String MOUNT_TABLE_PROCEDURE = "mount-table-procedure"; + private static final String TRASH_PROCEDURE = "trash-procedure"; + + /** + * This class helps building the balance job. + */ + private class Builder { + /* Balancing in an rbf cluster. */ + private boolean routerCluster = false; + /* Force close all open files while there is no diff. */ + private boolean forceCloseOpen = false; + /* Max number of concurrent maps to use for copy. */ + private int map = 10; + /* Specify bandwidth per map in MB. */ + private int bandwidth = 10; + /* Specify the trash behaviour of the source path. */ + private TrashOption trashOpt = TrashOption.TRASH; + /* Specify the duration(millie seconds) when the procedure needs retry. */ + private long delayDuration = TimeUnit.SECONDS.toMillis(1); + /* The source input. This specifies the source path. */ + private final String inputSrc; + /* The dst input. This specifies the dst path. */ + private final String inputDst; + + Builder(String inputSrc, String inputDst) { + this.inputSrc = inputSrc; + this.inputDst = inputDst; + } + + /** + * Whether balancing in an rbf cluster. + * @param value true if it's running in a router-based federation cluster. + */ + public Builder setRouterCluster(boolean value) { + this.routerCluster = value; + return this; + } + + /** + * Whether force close all open files while there is no diff. + * @param value true if force close all the open files. + */ + public Builder setForceCloseOpen(boolean value) { + this.forceCloseOpen = value; + return this; + } + + /** + * Max number of concurrent maps to use for copy. + * @param value the map number of the distcp. + */ + public Builder setMap(int value) { + this.map = value; + return this; + } + + /** + * Specify bandwidth per map in MB. + * @param value the bandwidth. + */ + public Builder setBandWidth(int value) { + this.bandwidth = value; + return this; + } + + /** + * Specify the trash behaviour of the source path. + * @param value the trash option. + */ + public Builder setTrashOpt(TrashOption value) { + this.trashOpt = value; + return this; + } + + /** + * Specify the duration(millie seconds) when the procedure needs retry. + * @param value the delay duration of the job. + */ + public Builder setDelayDuration(long value) { + this.delayDuration = value; + return this; + } + + /** + * Build the balance job. + */ + public BalanceJob build() throws IOException { + // Construct job context. + FedBalanceContext context; + Path dst = new Path(inputDst); + if (dst.toUri().getAuthority() == null) { + throw new IOException("The destination cluster must be specified."); + } + if (routerCluster) { // router-based federation. + Path src = getSrcPath(inputSrc); + String mount = inputSrc; + context = new FedBalanceContext.Builder(src, dst, mount, getConf()) + .setForceCloseOpenFiles(forceCloseOpen) + .setUseMountReadOnly(routerCluster).setMapNum(map) + .setBandwidthLimit(bandwidth).setTrash(trashOpt) + .setDelayDuration(delayDuration).build(); + } else { // normal federation cluster. + Path src = new Path(inputSrc); + if (src.toUri().getAuthority() == null) { + throw new IOException("The source cluster must be specified."); + } + context = new FedBalanceContext.Builder(src, dst, NO_MOUNT, getConf()) + .setForceCloseOpenFiles(forceCloseOpen) + .setUseMountReadOnly(routerCluster).setMapNum(map) + .setBandwidthLimit(bandwidth).setTrash(trashOpt).build(); + } + + LOG.info(context.toString()); + // Construct the balance job. + BalanceJob.Builder builder = new BalanceJob.Builder<>(); + DistCpProcedure dcp = + new DistCpProcedure(DISTCP_PROCEDURE, null, delayDuration, context); + builder.nextProcedure(dcp); + if (routerCluster) { + MountTableProcedure mtp = + new MountTableProcedure(MOUNT_TABLE_PROCEDURE, null, delayDuration, + inputSrc, dst.toUri().getPath(), dst.toUri().getAuthority(), + getConf()); + builder.nextProcedure(mtp); + } + TrashProcedure tp = + new TrashProcedure(TRASH_PROCEDURE, null, delayDuration, context); + builder.nextProcedure(tp); + return builder.build(); + } + } + + public FedBalance() { + super(); + } + + @Override + public int run(String[] args) throws Exception { + CommandLineParser parser = new GnuParser(); + CommandLine command = + parser.parse(DistCpBalanceOptions.CLI_OPTIONS, args, true); + String[] leftOverArgs = command.getArgs(); + if (leftOverArgs == null || leftOverArgs.length < 1) { + printUsage(); + return -1; + } + String cmd = leftOverArgs[0]; + if (cmd.equals(SUBMIT_COMMAND)) { + if (leftOverArgs.length < 3) { + printUsage(); + return -1; + } + String inputSrc = leftOverArgs[1]; + String inputDst = leftOverArgs[2]; + return submit(command, inputSrc, inputDst); + } else if (cmd.equals(CONTINUE_COMMAND)) { + return continueJob(); + } else { + printUsage(); + return -1; + } + } + + /** + * Recover and continue the unfinished jobs. + */ + private int continueJob() throws InterruptedException { + BalanceProcedureScheduler scheduler = + new BalanceProcedureScheduler(getConf()); + try { + scheduler.init(true); + while (true) { + Collection jobs = scheduler.getAllJobs(); + int unfinished = 0; + for (BalanceJob job : jobs) { + if (!job.isJobDone()) { + unfinished++; + } + LOG.info(job.toString()); + } + if (unfinished == 0) { + break; + } + Thread.sleep(TimeUnit.SECONDS.toMillis(10)); + } + } catch (IOException e) { + LOG.error("Continue balance job failed.", e); + return -1; + } finally { + scheduler.shutDown(); + } + return 0; + } + + /** + * Start a ProcedureScheduler and submit the job. + * + * @param command the command options. + * @param inputSrc the source input. This specifies the source path. + * @param inputDst the dst input. This specifies the dst path. + */ + private int submit(CommandLine command, String inputSrc, String inputDst) + throws IOException { + Builder builder = new Builder(inputSrc, inputDst); + // parse options. + builder.setRouterCluster(command.hasOption(ROUTER.getOpt())); + builder.setForceCloseOpen(command.hasOption(FORCE_CLOSE_OPEN.getOpt())); + if (command.hasOption(MAP.getOpt())) { + builder.setMap(Integer.parseInt(command.getOptionValue(MAP.getOpt()))); + } + if (command.hasOption(BANDWIDTH.getOpt())) { + builder.setBandWidth( + Integer.parseInt(command.getOptionValue(BANDWIDTH.getOpt()))); + } + if (command.hasOption(DELAY_DURATION.getOpt())) { + builder.setDelayDuration( + Long.parseLong(command.getOptionValue(DELAY_DURATION.getOpt()))); + } + if (command.hasOption(TRASH.getOpt())) { + String val = command.getOptionValue(TRASH.getOpt()); + if (val.equalsIgnoreCase("skip")) { + builder.setTrashOpt(TrashOption.SKIP); + } else if (val.equalsIgnoreCase("trash")) { + builder.setTrashOpt(TrashOption.TRASH); + } else if (val.equalsIgnoreCase("delete")) { + builder.setTrashOpt(TrashOption.DELETE); + } else { + printUsage(); + return -1; + } + } + + // Submit the job. + BalanceProcedureScheduler scheduler = + new BalanceProcedureScheduler(getConf()); + scheduler.init(false); + try { + BalanceJob balanceJob = builder.build(); + // Submit and wait until the job is done. + scheduler.submit(balanceJob); + scheduler.waitUntilDone(balanceJob); + } catch (IOException e) { + LOG.error("Submit balance job failed.", e); + return -1; + } finally { + scheduler.shutDown(); + } + return 0; + } + + /** + * Get src uri from Router. + */ + private Path getSrcPath(String fedPath) throws IOException { + String address = getConf().getTrimmed( + RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY, + RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_DEFAULT); + InetSocketAddress routerSocket = NetUtils.createSocketAddr(address); + RouterClient rClient = new RouterClient(routerSocket, getConf()); + try { + MountTableManager mountTable = rClient.getMountTableManager(); + MountTable entry = MountTableProcedure.getMountEntry(fedPath, mountTable); + if (entry == null) { + throw new IllegalArgumentException( + "The mount point doesn't exist. path=" + fedPath); + } else if (entry.getDestinations().size() > 1) { + throw new IllegalArgumentException( + "The mount point has more than one destination. path=" + fedPath); + } else { + String ns = entry.getDestinations().get(0).getNameserviceId(); + String path = entry.getDestinations().get(0).getDest(); + return new Path("hdfs://" + ns + path); + } + } finally { + rClient.close(); + } + } + + private void printUsage() { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp( + "fedbalance OPTIONS [submit|continue] \n\nOPTIONS", + CLI_OPTIONS); + } + + /** + * Main function of the FedBalance program. Parses the input arguments and + * invokes the FedBalance::run() method, via the ToolRunner. + * @param argv Command-line arguments sent to FedBalance. + */ + public static void main(String[] argv) { + Configuration conf = new HdfsConfiguration(); + Class balanceClazz = (Class) conf + .getClass(FEDERATION_BALANCE_CLASS, FedBalance.class); + Tool balancer = ReflectionUtils.newInstance(balanceClazz, conf); + int exitCode; + try { + exitCode = ToolRunner.run(balancer, argv); + } catch (Exception e) { + LOG.warn("Couldn't complete FedBalance operation.", e); + exitCode = -1; + } + System.exit(exitCode); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/BalanceProcedureConfigKeys.java b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalanceConfigs.java similarity index 72% rename from hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/BalanceProcedureConfigKeys.java rename to hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalanceConfigs.java index f869035196..952aef20d9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/BalanceProcedureConfigKeys.java +++ b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalanceConfigs.java @@ -15,16 +15,25 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hdfs.procedure; +package org.apache.hadoop.tools.fedbalance; import org.apache.hadoop.classification.InterfaceAudience; /** - * This class contains constants for configuration keys and default values - * used in hdfs procedure. + * Federation balance configuration properties. */ @InterfaceAudience.Private -public final class BalanceProcedureConfigKeys { +public final class FedBalanceConfigs { + /* The class used for federation balance */ + public static final String FEDERATION_BALANCE_CLASS = + "federation.balance.class"; + public static final String LAST_SNAPSHOT_NAME = "DISTCP-BALANCE-CURRENT"; + public static final String CURRENT_SNAPSHOT_NAME = "DISTCP-BALANCE-NEXT"; + /* Specify the behaviour of trash. */ + public enum TrashOption { + TRASH, DELETE, SKIP + } + /* The worker threads number of the BalanceProcedureScheduler */ public static final String WORK_THREAD_NUM = "hadoop.hdfs.procedure.work.thread.num"; @@ -37,5 +46,5 @@ public final class BalanceProcedureConfigKeys { public static final String JOURNAL_CLASS = "hadoop.hdfs.procedure.journal.class"; - private BalanceProcedureConfigKeys() {} + private FedBalanceConfigs(){} } diff --git a/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalanceContext.java b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalanceContext.java new file mode 100644 index 0000000000..56be7db48e --- /dev/null +++ b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalanceContext.java @@ -0,0 +1,286 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.tools.fedbalance; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.TrashOption; + +/** + * This class contains the basic information needed when Federation Balance. + */ +public class FedBalanceContext implements Writable { + + /* the source path in the source sub-cluster */ + private Path src; + /* the target path in the target sub-cluster */ + private Path dst; + /* the mount point to be balanced */ + private String mount; + /* Force close all open files when there is no diff between src and dst */ + private boolean forceCloseOpenFiles; + /* Disable write by setting the mount point readonly. */ + private boolean useMountReadOnly; + /* The map number of the distcp job. */ + private int mapNum; + /* The bandwidth limit of the distcp job(MB). */ + private int bandwidthLimit; + /* Move source path to trash after all the data are sync to target. Otherwise + delete the source directly. */ + private TrashOption trashOpt; + /* How long will the procedures be delayed. */ + private long delayDuration; + + private Configuration conf; + + public FedBalanceContext() {} + + public Configuration getConf() { + return conf; + } + + public Path getSrc() { + return src; + } + + public Path getDst() { + return dst; + } + + public String getMount() { + return mount; + } + + public boolean getForceCloseOpenFiles() { + return forceCloseOpenFiles; + } + + public boolean getUseMountReadOnly() { + return useMountReadOnly; + } + + public int getMapNum() { + return mapNum; + } + + public int getBandwidthLimit() { + return bandwidthLimit; + } + + public TrashOption getTrashOpt() { + return trashOpt; + } + + @Override + public void write(DataOutput out) throws IOException { + conf.write(out); + Text.writeString(out, src.toString()); + Text.writeString(out, dst.toString()); + Text.writeString(out, mount); + out.writeBoolean(forceCloseOpenFiles); + out.writeBoolean(useMountReadOnly); + out.writeInt(mapNum); + out.writeInt(bandwidthLimit); + out.writeInt(trashOpt.ordinal()); + out.writeLong(delayDuration); + } + + @Override + public void readFields(DataInput in) throws IOException { + conf = new Configuration(false); + conf.readFields(in); + src = new Path(Text.readString(in)); + dst = new Path(Text.readString(in)); + mount = Text.readString(in); + forceCloseOpenFiles = in.readBoolean(); + useMountReadOnly = in.readBoolean(); + mapNum = in.readInt(); + bandwidthLimit = in.readInt(); + trashOpt = TrashOption.values()[in.readInt()]; + delayDuration = in.readLong(); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (obj == this) { + return true; + } + if (obj.getClass() != getClass()) { + return false; + } + FedBalanceContext bc = (FedBalanceContext) obj; + return new EqualsBuilder() + .append(src, bc.src) + .append(dst, bc.dst) + .append(mount, bc.mount) + .append(forceCloseOpenFiles, bc.forceCloseOpenFiles) + .append(useMountReadOnly, bc.useMountReadOnly) + .append(mapNum, bc.mapNum) + .append(bandwidthLimit, bc.bandwidthLimit) + .append(trashOpt, bc.trashOpt) + .append(delayDuration, bc.delayDuration) + .isEquals(); + } + + @Override + public int hashCode() { + return new HashCodeBuilder(17, 37) + .append(src) + .append(dst) + .append(mount) + .append(forceCloseOpenFiles) + .append(useMountReadOnly) + .append(mapNum) + .append(bandwidthLimit) + .append(trashOpt) + .append(delayDuration) + .build(); + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder("FedBalance context:"); + builder.append(" src=").append(src); + builder.append(", dst=").append(dst); + if (useMountReadOnly) { + builder.append(", router-mode=true"); + builder.append(", mount-point=").append(mount); + } else { + builder.append(", router-mode=false"); + } + builder.append(", forceCloseOpenFiles=").append(forceCloseOpenFiles); + builder.append(", trash=").append(trashOpt.name()); + builder.append(", map=").append(mapNum); + builder.append(", bandwidth=").append(bandwidthLimit); + builder.append(", delayDuration=").append(delayDuration); + return builder.toString(); + } + + static class Builder { + private final Path src; + private final Path dst; + private final String mount; + private final Configuration conf; + private boolean forceCloseOpenFiles = false; + private boolean useMountReadOnly = false; + private int mapNum; + private int bandwidthLimit; + private TrashOption trashOpt; + private long delayDuration; + + /** + * This class helps building the FedBalanceContext. + * + * @param src the source path in the source sub-cluster. + * @param dst the target path in the target sub-cluster. + * @param mount the mount point to be balanced. + * @param conf the configuration. + */ + Builder(Path src, Path dst, String mount, Configuration conf) { + this.src = src; + this.dst = dst; + this.mount = mount; + this.conf = conf; + } + + /** + * Force close open files. + * @param value true if force close all the open files. + */ + public Builder setForceCloseOpenFiles(boolean value) { + this.forceCloseOpenFiles = value; + return this; + } + + /** + * Use mount point readonly to disable write. + * @param value true if disabling write by setting mount point readonly. + */ + public Builder setUseMountReadOnly(boolean value) { + this.useMountReadOnly = value; + return this; + } + + /** + * The map number of the distcp job. + * @param value the map number of the distcp. + */ + public Builder setMapNum(int value) { + this.mapNum = value; + return this; + } + + /** + * The bandwidth limit of the distcp job(MB). + * @param value the bandwidth. + */ + public Builder setBandwidthLimit(int value) { + this.bandwidthLimit = value; + return this; + } + + /** + * Specify the trash behaviour after all the data is sync to the target. + * @param value the trash option. + * */ + public Builder setTrash(TrashOption value) { + this.trashOpt = value; + return this; + } + + /** + * Specify the delayed duration when the procedures need to retry. + */ + public Builder setDelayDuration(long value) { + this.delayDuration = value; + return this; + } + + /** + * Build the FedBalanceContext. + * + * @return the FedBalanceContext obj. + */ + public FedBalanceContext build() { + FedBalanceContext context = new FedBalanceContext(); + context.src = this.src; + context.dst = this.dst; + context.mount = this.mount; + context.conf = this.conf; + context.forceCloseOpenFiles = this.forceCloseOpenFiles; + context.useMountReadOnly = this.useMountReadOnly; + context.mapNum = this.mapNum; + context.bandwidthLimit = this.bandwidthLimit; + context.trashOpt = this.trashOpt; + context.delayDuration = this.delayDuration; + return context; + } + } +} \ No newline at end of file diff --git a/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/MountTableProcedure.java b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/MountTableProcedure.java new file mode 100644 index 0000000000..8f789831d3 --- /dev/null +++ b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/MountTableProcedure.java @@ -0,0 +1,244 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.tools.fedbalance; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager; +import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; +import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys; +import org.apache.hadoop.hdfs.server.federation.router.RouterClient; +import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse; +import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; +import org.apache.hadoop.tools.fedbalance.procedure.BalanceProcedure; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.net.NetUtils; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Arrays; +import java.util.List; + +/** + * Update mount table. + * Old mount table: + * /a/b/c -> {ns:src path:/a/b/c} + * New mount table: + * /a/b/c -> {ns:dst path:/a/b/c} + */ +public class MountTableProcedure extends BalanceProcedure { + + private String mount; + private String dstPath; + private String dstNs; + private Configuration conf; + + public MountTableProcedure() {} + + /** + * Update mount entry to specified dst uri. + * + * @param mount the mount entry to be updated. + * @param dstPath the sub-cluster uri of the dst path. + * @param conf the configuration. + */ + public MountTableProcedure(String name, String nextProcedure, + long delayDuration, String mount, String dstPath, String dstNs, + Configuration conf) throws IOException { + super(name, nextProcedure, delayDuration); + this.mount = mount; + this.dstPath = dstPath; + this.dstNs = dstNs; + this.conf = conf; + } + + @Override + public boolean execute() throws RetryException, IOException { + updateMountTable(); + return true; + } + + private void updateMountTable() throws IOException { + updateMountTableDestination(mount, dstNs, dstPath, conf); + enableWrite(mount, conf); + } + + /** + * Update the destination of the mount point to target namespace and target + * path. + * + * @param mount the mount point. + * @param dstNs the target namespace. + * @param dstPath the target path + * @param conf the configuration of the router. + */ + private static void updateMountTableDestination(String mount, String dstNs, + String dstPath, Configuration conf) throws IOException { + String address = conf.getTrimmed(RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY, + RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_DEFAULT); + InetSocketAddress routerSocket = NetUtils.createSocketAddr(address); + RouterClient rClient = new RouterClient(routerSocket, conf); + try { + MountTableManager mountTable = rClient.getMountTableManager(); + + MountTable originalEntry = getMountEntry(mount, mountTable); + if (originalEntry == null) { + throw new IOException("Mount table " + mount + " doesn't exist"); + } else { + RemoteLocation remoteLocation = + new RemoteLocation(dstNs, dstPath, mount); + originalEntry.setDestinations(Arrays.asList(remoteLocation)); + UpdateMountTableEntryRequest updateRequest = + UpdateMountTableEntryRequest.newInstance(originalEntry); + UpdateMountTableEntryResponse response = + mountTable.updateMountTableEntry(updateRequest); + if (!response.getStatus()) { + throw new IOException("Failed update mount table " + mount); + } + rClient.getMountTableManager().refreshMountTableEntries( + RefreshMountTableEntriesRequest.newInstance()); + } + } finally { + rClient.close(); + } + } + + /** + * Gets the mount table entry. + * @param mount name of the mount entry. + * @param mountTable the mount table. + * @return corresponding mount entry. + * @throws IOException in case of failure to retrieve mount entry. + */ + public static MountTable getMountEntry(String mount, + MountTableManager mountTable) + throws IOException { + GetMountTableEntriesRequest getRequest = + GetMountTableEntriesRequest.newInstance(mount); + GetMountTableEntriesResponse getResponse = + mountTable.getMountTableEntries(getRequest); + List results = getResponse.getEntries(); + MountTable existingEntry = null; + for (MountTable result : results) { + if (mount.equals(result.getSourcePath())) { + existingEntry = result; + break; + } + } + return existingEntry; + } + + /** + * Disable write by making the mount point readonly. + * + * @param mount the mount point to set readonly. + * @param conf the configuration of the router. + */ + static void disableWrite(String mount, Configuration conf) + throws IOException { + setMountReadOnly(mount, true, conf); + } + + /** + * Enable write by cancelling the mount point readonly. + * + * @param mount the mount point to cancel readonly. + * @param conf the configuration of the router. + */ + static void enableWrite(String mount, Configuration conf) throws IOException { + setMountReadOnly(mount, false, conf); + } + + /** + * Enable or disable readonly of the mount point. + * + * @param mount the mount point. + * @param readOnly enable or disable readonly. + * @param conf the configuration of the router. + */ + private static void setMountReadOnly(String mount, boolean readOnly, + Configuration conf) throws IOException { + String address = conf.getTrimmed(RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY, + RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_DEFAULT); + InetSocketAddress routerSocket = NetUtils.createSocketAddr(address); + RouterClient rClient = new RouterClient(routerSocket, conf); + try { + MountTableManager mountTable = rClient.getMountTableManager(); + + MountTable originalEntry = getMountEntry(mount, mountTable); + if (originalEntry == null) { + throw new IOException("Mount table " + mount + " doesn't exist"); + } else { + originalEntry.setReadOnly(readOnly); + UpdateMountTableEntryRequest updateRequest = + UpdateMountTableEntryRequest.newInstance(originalEntry); + UpdateMountTableEntryResponse response = + mountTable.updateMountTableEntry(updateRequest); + if (!response.getStatus()) { + throw new IOException( + "Failed update mount table " + mount + " with readonly=" + + readOnly); + } + rClient.getMountTableManager().refreshMountTableEntries( + RefreshMountTableEntriesRequest.newInstance()); + } + } finally { + rClient.close(); + } + } + + @Override + public void write(DataOutput out) throws IOException { + super.write(out); + Text.writeString(out, mount); + Text.writeString(out, dstPath); + Text.writeString(out, dstNs); + conf.write(out); + } + + @Override + public void readFields(DataInput in) throws IOException { + super.readFields(in); + mount = Text.readString(in); + dstPath = Text.readString(in); + dstNs = Text.readString(in); + conf = new Configuration(false); + conf.readFields(in); + } + + @VisibleForTesting + String getMount() { + return mount; + } + + @VisibleForTesting + String getDstPath() { + return dstPath; + } + + @VisibleForTesting + String getDstNs() { + return dstNs; + } +} \ No newline at end of file diff --git a/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/TrashProcedure.java b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/TrashProcedure.java new file mode 100644 index 0000000000..94ae6160b0 --- /dev/null +++ b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/TrashProcedure.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.tools.fedbalance; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.Trash; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.tools.fedbalance.procedure.BalanceProcedure; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY; +import org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.TrashOption; + +/** + * This procedure moves the source path to the corresponding trash. + */ +public class TrashProcedure extends BalanceProcedure { + + private DistributedFileSystem srcFs; + private FedBalanceContext context; + private Configuration conf; + + public TrashProcedure() {} + + /** + * The constructor of TrashProcedure. + * + * @param name the name of the procedure. + * @param nextProcedure the name of the next procedure. + * @param delayDuration the delay duration when this procedure is delayed. + * @param context the federation balance context. + */ + public TrashProcedure(String name, String nextProcedure, long delayDuration, + FedBalanceContext context) throws IOException { + super(name, nextProcedure, delayDuration); + this.context = context; + this.conf = context.getConf(); + this.srcFs = (DistributedFileSystem) context.getSrc().getFileSystem(conf); + } + + @Override + public boolean execute() throws IOException { + moveToTrash(); + return true; + } + + /** + * Delete source path to trash. + */ + void moveToTrash() throws IOException { + Path src = context.getSrc(); + if (srcFs.exists(src)) { + TrashOption trashOption = context.getTrashOpt(); + switch (trashOption) { + case TRASH: + conf.setFloat(FS_TRASH_INTERVAL_KEY, 60); + if (!Trash.moveToAppropriateTrash(srcFs, src, conf)) { + throw new IOException("Failed move " + src + " to trash."); + } + break; + case DELETE: + if (!srcFs.delete(src, true)) { + throw new IOException("Failed delete " + src); + } + LOG.info("{} is deleted.", src); + break; + case SKIP: + break; + default: + throw new IOException("Unexpected trash option=" + trashOption); + } + } + } + + public FedBalanceContext getContext() { + return context; + } + + @Override + public void write(DataOutput out) throws IOException { + super.write(out); + context.write(out); + } + + @Override + public void readFields(DataInput in) throws IOException { + super.readFields(in); + context = new FedBalanceContext(); + context.readFields(in); + conf = context.getConf(); + srcFs = (DistributedFileSystem) context.getSrc().getFileSystem(conf); + } +} diff --git a/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/package-info.java b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/package-info.java new file mode 100644 index 0000000000..3007402f69 --- /dev/null +++ b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/package-info.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +/** + * FedBalance is a tool for balancing data across federation clusters. + */ +@InterfaceAudience.Public +package org.apache.hadoop.tools.fedbalance; +import org.apache.hadoop.classification.InterfaceAudience; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/BalanceJob.java b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/procedure/BalanceJob.java similarity index 99% rename from hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/BalanceJob.java rename to hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/procedure/BalanceJob.java index 847092a2aa..8d5f9d401a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/BalanceJob.java +++ b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/procedure/BalanceJob.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hdfs.procedure; +package org.apache.hadoop.tools.fedbalance.procedure; import com.google.common.annotations.VisibleForTesting; import org.apache.commons.lang3.builder.EqualsBuilder; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/BalanceJournal.java b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/procedure/BalanceJournal.java similarity index 96% rename from hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/BalanceJournal.java rename to hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/procedure/BalanceJournal.java index 011ae857bc..da8eb74b2b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/BalanceJournal.java +++ b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/procedure/BalanceJournal.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hdfs.procedure; +package org.apache.hadoop.tools.fedbalance.procedure; import org.apache.hadoop.conf.Configurable; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/BalanceJournalInfoHDFS.java b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/procedure/BalanceJournalInfoHDFS.java similarity index 95% rename from hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/BalanceJournalInfoHDFS.java rename to hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/procedure/BalanceJournalInfoHDFS.java index 4e759d8d7f..0da8c36637 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/BalanceJournalInfoHDFS.java +++ b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/procedure/BalanceJournalInfoHDFS.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hdfs.procedure; +package org.apache.hadoop.tools.fedbalance.procedure; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -37,9 +37,9 @@ import java.net.URI; import java.net.URISyntaxException; -import static org.apache.hadoop.hdfs.procedure.BalanceProcedureConfigKeys.SCHEDULER_JOURNAL_URI; -import static org.apache.hadoop.hdfs.procedure.BalanceProcedureConfigKeys.TMP_TAIL; -import static org.apache.hadoop.hdfs.procedure.BalanceProcedureConfigKeys.JOB_PREFIX; +import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.SCHEDULER_JOURNAL_URI; +import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.TMP_TAIL; +import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.JOB_PREFIX; /** * BalanceJournal based on HDFS. This class stores all the journals in the HDFS. diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/BalanceProcedure.java b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/procedure/BalanceProcedure.java similarity index 97% rename from hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/BalanceProcedure.java rename to hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/procedure/BalanceProcedure.java index 6320e8fe99..080a73750e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/BalanceProcedure.java +++ b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/procedure/BalanceProcedure.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hdfs.procedure; +package org.apache.hadoop.tools.fedbalance.procedure; import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.HashCodeBuilder; @@ -29,7 +29,7 @@ import java.io.DataOutput; import java.io.IOException; -import static org.apache.hadoop.hdfs.procedure.BalanceJob.NEXT_PROCEDURE_NONE; +import static org.apache.hadoop.tools.fedbalance.procedure.BalanceJob.NEXT_PROCEDURE_NONE; /** * The basic components of the Job. Extend this class to implement different diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/BalanceProcedureScheduler.java b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/procedure/BalanceProcedureScheduler.java similarity index 97% rename from hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/BalanceProcedureScheduler.java rename to hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/procedure/BalanceProcedureScheduler.java index 74606c5580..0f82b88f0a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/BalanceProcedureScheduler.java +++ b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/procedure/BalanceProcedureScheduler.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hdfs.procedure; +package org.apache.hadoop.tools.fedbalance.procedure; import com.google.common.annotations.VisibleForTesting; @@ -40,9 +40,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.hadoop.hdfs.procedure.BalanceProcedureConfigKeys.WORK_THREAD_NUM; -import static org.apache.hadoop.hdfs.procedure.BalanceProcedureConfigKeys.WORK_THREAD_NUM_DEFAULT; -import static org.apache.hadoop.hdfs.procedure.BalanceProcedureConfigKeys.JOURNAL_CLASS; +import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.WORK_THREAD_NUM; +import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.WORK_THREAD_NUM_DEFAULT; +import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.JOURNAL_CLASS; /** * The state machine framework consist of: * Job: The state machine. It implements the basic logic of the diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/package-info.java b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/procedure/package-info.java similarity index 95% rename from hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/package-info.java rename to hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/procedure/package-info.java index 626d3b3727..cb03d137fa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/procedure/package-info.java +++ b/hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/procedure/package-info.java @@ -23,7 +23,7 @@ @InterfaceAudience.Private @InterfaceStability.Evolving -package org.apache.hadoop.hdfs.procedure; +package org.apache.hadoop.tools.fedbalance.procedure; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; diff --git a/hadoop-tools/hadoop-federation-balance/src/main/shellprofile.d/hadoop-federation-balance.sh b/hadoop-tools/hadoop-federation-balance/src/main/shellprofile.d/hadoop-federation-balance.sh new file mode 100644 index 0000000000..2872c7afba --- /dev/null +++ b/hadoop-tools/hadoop-federation-balance/src/main/shellprofile.d/hadoop-federation-balance.sh @@ -0,0 +1,38 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +if ! declare -f hadoop_subcommand_fedbalance >/dev/null 2>/dev/null; then + + if [[ "${HADOOP_SHELL_EXECNAME}" = hadoop ]]; then + hadoop_add_subcommand "fedbalance" client "balance data between sub-clusters" + fi + + # this can't be indented otherwise shelldocs won't get it + +## @description fedbalance command for hadoop +## @audience public +## @stability stable +## @replaceable yes +function hadoop_subcommand_fedbalance +{ + # shellcheck disable=SC2034 + HADOOP_CLASSNAME=org.apache.hadoop.tools.fedbalance.FedBalance + hadoop_add_to_classpath_tools hadoop-distcp + hadoop_add_to_classpath_tools hadoop-federation-balance +} + +fi \ No newline at end of file diff --git a/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/TestDistCpProcedure.java b/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/TestDistCpProcedure.java new file mode 100644 index 0000000000..ec565c36d8 --- /dev/null +++ b/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/TestDistCpProcedure.java @@ -0,0 +1,446 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.tools.fedbalance; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.tools.fedbalance.DistCpProcedure.Stage; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.tools.fedbalance.procedure.BalanceJob; +import org.apache.hadoop.tools.fedbalance.procedure.BalanceProcedure.RetryException; +import org.apache.hadoop.tools.fedbalance.procedure.BalanceProcedureScheduler; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.DataOutput; +import java.io.DataInputStream; +import java.io.ByteArrayInputStream; +import java.net.URI; +import java.util.Random; + +import static junit.framework.TestCase.assertTrue; +import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.SCHEDULER_JOURNAL_URI; +import static org.apache.hadoop.test.GenericTestUtils.getMethodName; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; +import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.CURRENT_SNAPSHOT_NAME; +import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.LAST_SNAPSHOT_NAME; +import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.TrashOption; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + +/** + * Test DistCpProcedure. + */ +public class TestDistCpProcedure { + private static MiniDFSCluster cluster; + private static Configuration conf; + static final String MOUNT = "mock_mount_point"; + private static final String SRCDAT = "srcdat"; + private static final String DSTDAT = "dstdat"; + private static final long BLOCK_SIZE = 1024; + private static final long FILE_SIZE = BLOCK_SIZE * 100; + private FileEntry[] srcfiles = + {new FileEntry(SRCDAT, true), new FileEntry(SRCDAT + "/a", false), + new FileEntry(SRCDAT + "/b", true), + new FileEntry(SRCDAT + "/b/c", false)}; + private static String nnUri; + + @BeforeClass + public static void beforeClass() throws IOException { + DistCpProcedure.enabledForTest = true; + conf = new Configuration(); + conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, BLOCK_SIZE); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build(); + cluster.waitActive(); + + String workPath = + "hdfs://" + cluster.getNameNode().getHostAndPort() + "/procedure"; + conf.set(SCHEDULER_JOURNAL_URI, workPath); + + nnUri = FileSystem.getDefaultUri(conf).toString(); + } + + @AfterClass + public static void afterClass() { + DistCpProcedure.enabledForTest = false; + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test(timeout = 30000) + public void testSuccessfulDistCpProcedure() throws Exception { + String testRoot = nnUri + "/user/foo/testdir." + getMethodName(); + DistributedFileSystem fs = + (DistributedFileSystem) FileSystem.get(URI.create(nnUri), conf); + createFiles(fs, testRoot, srcfiles); + + Path src = new Path(testRoot, SRCDAT); + Path dst = new Path(testRoot, DSTDAT); + FsPermission originalPerm = new FsPermission(777); + fs.setPermission(src, originalPerm); + FedBalanceContext context = buildContext(src, dst, MOUNT); + DistCpProcedure dcProcedure = + new DistCpProcedure("distcp-procedure", null, 1000, context); + BalanceProcedureScheduler scheduler = new BalanceProcedureScheduler(conf); + scheduler.init(true); + + BalanceJob balanceJob = + new BalanceJob.Builder<>().nextProcedure(dcProcedure).build(); + scheduler.submit(balanceJob); + scheduler.waitUntilDone(balanceJob); + assertTrue(balanceJob.isJobDone()); + if (balanceJob.getError() != null) { + throw balanceJob.getError(); + } + assertNull(balanceJob.getError()); + assertTrue(fs.exists(dst)); + assertFalse( + fs.exists(new Path(context.getSrc(), HdfsConstants.DOT_SNAPSHOT_DIR))); + assertFalse( + fs.exists(new Path(context.getDst(), HdfsConstants.DOT_SNAPSHOT_DIR))); + assertEquals(originalPerm, fs.getFileStatus(dst).getPermission()); + assertEquals(0, fs.getFileStatus(src).getPermission().toShort()); + for (FileEntry e : srcfiles) { // verify file len. + if (!e.isDir) { + Path targetFile = new Path(testRoot, e.path.replace(SRCDAT, DSTDAT)); + assertEquals(FILE_SIZE, fs.getFileStatus(targetFile).getLen()); + } + } + cleanup(fs, new Path(testRoot)); + } + + @Test(timeout = 30000) + public void testInitDistCp() throws Exception { + String testRoot = nnUri + "/user/foo/testdir." + getMethodName(); + DistributedFileSystem fs = + (DistributedFileSystem) FileSystem.get(URI.create(nnUri), conf); + createFiles(fs, testRoot, srcfiles); + + Path src = new Path(testRoot, SRCDAT); + Path dst = new Path(testRoot, DSTDAT); + // set permission. + fs.setPermission(src, FsPermission.createImmutable((short) 020)); + + FedBalanceContext context = buildContext(src, dst, MOUNT); + DistCpProcedure dcProcedure = + new DistCpProcedure("distcp-procedure", null, 1000, context); + + // submit distcp. + try { + dcProcedure.initDistCp(); + } catch (RetryException e) { + } + fs.delete(new Path(src, "a"), true); + // wait until job done. + executeProcedure(dcProcedure, Stage.DIFF_DISTCP, + () -> dcProcedure.initDistCp()); + assertTrue(fs.exists(dst)); + // Because we used snapshot, the file should be copied. + assertTrue(fs.exists(new Path(dst, "a"))); + cleanup(fs, new Path(testRoot)); + } + + @Test(timeout = 30000) + public void testDiffDistCp() throws Exception { + String testRoot = nnUri + "/user/foo/testdir." + getMethodName(); + DistributedFileSystem fs = + (DistributedFileSystem) FileSystem.get(URI.create(nnUri), conf); + createFiles(fs, testRoot, srcfiles); + Path src = new Path(testRoot, SRCDAT); + Path dst = new Path(testRoot, DSTDAT); + + FedBalanceContext context = buildContext(src, dst, MOUNT); + DistCpProcedure dcProcedure = + new DistCpProcedure("distcp-procedure", null, 1000, context); + executeProcedure(dcProcedure, Stage.DIFF_DISTCP, + () -> dcProcedure.initDistCp()); + assertTrue(fs.exists(dst)); + + // move file out of src and test distcp. + fs.rename(new Path(src, "a"), new Path("/a")); + executeProcedure(dcProcedure, Stage.FINISH, + () -> dcProcedure.finalDistCp()); + assertFalse(fs.exists(new Path(dst, "a"))); + // move back file src/a and test distcp. + fs.rename(new Path("/a"), new Path(src, "a")); + executeProcedure(dcProcedure, Stage.FINISH, + () -> dcProcedure.finalDistCp()); + assertTrue(fs.exists(new Path(dst, "a"))); + // append file src/a and test. + OutputStream out = fs.append(new Path(src, "a")); + out.write("hello".getBytes()); + out.close(); + long len = fs.getFileStatus(new Path(src, "a")).getLen(); + executeProcedure(dcProcedure, Stage.FINISH, + () -> dcProcedure.finalDistCp()); + assertEquals(len, fs.getFileStatus(new Path(dst, "a")).getLen()); + cleanup(fs, new Path(testRoot)); + } + + @Test(timeout = 30000) + public void testStageFinalDistCp() throws Exception { + String testRoot = nnUri + "/user/foo/testdir." + getMethodName(); + DistributedFileSystem fs = + (DistributedFileSystem) FileSystem.get(URI.create(nnUri), conf); + createFiles(fs, testRoot, srcfiles); + + Path src = new Path(testRoot, SRCDAT); + Path dst = new Path(testRoot, DSTDAT); + // open files. + OutputStream out = fs.append(new Path(src, "a")); + + FedBalanceContext context = buildContext(src, dst, MOUNT); + DistCpProcedure dcProcedure = + new DistCpProcedure("distcp-procedure", null, 1000, context); + executeProcedure(dcProcedure, Stage.DIFF_DISTCP, + () -> dcProcedure.initDistCp()); + executeProcedure(dcProcedure, Stage.FINISH, + () -> dcProcedure.finalDistCp()); + // Verify all the open files have been closed. + intercept(RemoteException.class, "LeaseExpiredException", + "Expect RemoteException(LeaseExpiredException).", () -> out.close()); + cleanup(fs, new Path(testRoot)); + } + + @Test(timeout = 30000) + public void testStageFinish() throws Exception { + String testRoot = nnUri + "/user/foo/testdir." + getMethodName(); + DistributedFileSystem fs = + (DistributedFileSystem) FileSystem.get(URI.create(nnUri), conf); + Path src = new Path(testRoot, SRCDAT); + Path dst = new Path(testRoot, DSTDAT); + fs.mkdirs(src); + fs.mkdirs(dst); + fs.allowSnapshot(src); + fs.allowSnapshot(dst); + fs.createSnapshot(src, LAST_SNAPSHOT_NAME); + fs.createSnapshot(src, CURRENT_SNAPSHOT_NAME); + fs.createSnapshot(dst, LAST_SNAPSHOT_NAME); + FsPermission originalPerm = new FsPermission(777); + fs.setPermission(src, originalPerm); + + // Test the finish stage. + FedBalanceContext context = buildContext(src, dst, MOUNT); + DistCpProcedure dcProcedure = + new DistCpProcedure("distcp-procedure", null, 1000, context); + dcProcedure.disableWrite(); + dcProcedure.finish(); + + // Verify path and permission. + assertTrue(fs.exists(dst)); + assertFalse(fs.exists(new Path(src, HdfsConstants.DOT_SNAPSHOT_DIR))); + assertFalse(fs.exists(new Path(dst, HdfsConstants.DOT_SNAPSHOT_DIR))); + assertEquals(originalPerm, fs.getFileStatus(dst).getPermission()); + assertEquals(0, fs.getFileStatus(src).getPermission().toShort()); + cleanup(fs, new Path(testRoot)); + } + + @Test(timeout = 30000) + public void testRecoveryByStage() throws Exception { + String testRoot = nnUri + "/user/foo/testdir." + getMethodName(); + DistributedFileSystem fs = + (DistributedFileSystem) FileSystem.get(URI.create(nnUri), conf); + createFiles(fs, testRoot, srcfiles); + + Path src = new Path(testRoot, SRCDAT); + Path dst = new Path(testRoot, DSTDAT); + + FedBalanceContext context = buildContext(src, dst, MOUNT); + final DistCpProcedure[] dcp = new DistCpProcedure[1]; + dcp[0] = new DistCpProcedure("distcp-procedure", null, 1000, context); + + // Doing serialization and deserialization before each stage to monitor the + // recovery. + dcp[0] = serializeProcedure(dcp[0]); + executeProcedure(dcp[0], Stage.INIT_DISTCP, () -> dcp[0].preCheck()); + dcp[0] = serializeProcedure(dcp[0]); + executeProcedure(dcp[0], Stage.DIFF_DISTCP, () -> dcp[0].initDistCp()); + fs.delete(new Path(src, "a"), true); // make some difference. + dcp[0] = serializeProcedure(dcp[0]); + executeProcedure(dcp[0], Stage.DISABLE_WRITE, () -> dcp[0].diffDistCp()); + dcp[0] = serializeProcedure(dcp[0]); + executeProcedure(dcp[0], Stage.FINAL_DISTCP, () -> dcp[0].disableWrite()); + dcp[0] = serializeProcedure(dcp[0]); + OutputStream out = fs.append(new Path(src, "b/c")); + executeProcedure(dcp[0], Stage.FINISH, () -> dcp[0].finalDistCp()); + intercept(RemoteException.class, "LeaseExpiredException", + "Expect RemoteException(LeaseExpiredException).", () -> out.close()); + dcp[0] = serializeProcedure(dcp[0]); + assertTrue(dcp[0].execute()); + assertTrue(fs.exists(dst)); + assertFalse( + fs.exists(new Path(context.getSrc(), HdfsConstants.DOT_SNAPSHOT_DIR))); + assertFalse( + fs.exists(new Path(context.getDst(), HdfsConstants.DOT_SNAPSHOT_DIR))); + cleanup(fs, new Path(testRoot)); + } + + @Test(timeout = 30000) + public void testShutdown() throws Exception { + String testRoot = nnUri + "/user/foo/testdir." + getMethodName(); + DistributedFileSystem fs = + (DistributedFileSystem) FileSystem.get(URI.create(nnUri), conf); + createFiles(fs, testRoot, srcfiles); + + Path src = new Path(testRoot, SRCDAT); + Path dst = new Path(testRoot, DSTDAT); + FedBalanceContext context = buildContext(src, dst, MOUNT); + DistCpProcedure dcProcedure = + new DistCpProcedure("distcp-procedure", null, 1000, context); + BalanceProcedureScheduler scheduler = new BalanceProcedureScheduler(conf); + scheduler.init(true); + + BalanceJob balanceJob = + new BalanceJob.Builder<>().nextProcedure(dcProcedure).build(); + scheduler.submit(balanceJob); + + long sleep = Math.abs(new Random().nextLong()) % 10000; + Thread.sleep(sleep); + scheduler.shutDown(); + cleanup(fs, new Path(testRoot)); + } + + @Test(timeout = 30000) + public void testDisableWrite() throws Exception { + String testRoot = nnUri + "/user/foo/testdir." + getMethodName(); + DistributedFileSystem fs = + (DistributedFileSystem) FileSystem.get(URI.create(nnUri), conf); + createFiles(fs, testRoot, srcfiles); + Path src = new Path(testRoot, SRCDAT); + Path dst = new Path(testRoot, DSTDAT); + + FedBalanceContext context = buildContext(src, dst, MOUNT); + DistCpProcedure dcProcedure = + new DistCpProcedure("distcp-procedure", null, 1000, context); + assertNotEquals(0, fs.getFileStatus(src).getPermission().toShort()); + executeProcedure(dcProcedure, Stage.FINAL_DISTCP, + () -> dcProcedure.disableWrite()); + assertEquals(0, fs.getFileStatus(src).getPermission().toShort()); + cleanup(fs, new Path(testRoot)); + } + + private FedBalanceContext buildContext(Path src, Path dst, String mount) { + return new FedBalanceContext.Builder(src, dst, mount, conf).setMapNum(10) + .setBandwidthLimit(1).setTrash(TrashOption.TRASH).setDelayDuration(1000) + .build(); + } + + interface Call { + void execute() throws IOException, RetryException; + } + + /** + * Execute the procedure until its stage is updated to the target stage. + * + * @param procedure the procedure to be executed and verified. + * @param target the target stage. + * @param call the function executing the procedure. + */ + private static void executeProcedure(DistCpProcedure procedure, Stage target, + Call call) throws IOException { + Stage stage = Stage.PRE_CHECK; + procedure.updateStage(stage); + while (stage != target) { + try { + call.execute(); + } catch (RetryException e) { + } finally { + stage = procedure.getStage(); + } + } + } + + static class FileEntry { + private String path; + private boolean isDir; + + FileEntry(String path, boolean isDir) { + this.path = path; + this.isDir = isDir; + } + + String getPath() { + return path; + } + + boolean isDirectory() { + return isDir; + } + } + + /** + * Create directories and files with random data. + * + * @param fs the file system obj. + * @param topdir the base dir of the directories and files. + * @param entries the directory and file entries to be created. + */ + private void createFiles(DistributedFileSystem fs, String topdir, + FileEntry[] entries) throws IOException { + long seed = System.currentTimeMillis(); + Random rand = new Random(seed); + short replicationFactor = 2; + for (FileEntry entry : entries) { + Path newPath = new Path(topdir + "/" + entry.getPath()); + if (entry.isDirectory()) { + fs.mkdirs(newPath); + } else { + int bufSize = 128; + DFSTestUtil.createFile(fs, newPath, bufSize, FILE_SIZE, BLOCK_SIZE, + replicationFactor, seed); + } + seed = System.currentTimeMillis() + rand.nextLong(); + } + } + + private DistCpProcedure serializeProcedure(DistCpProcedure dcp) + throws IOException { + ByteArrayOutputStream bao = new ByteArrayOutputStream(); + DataOutput dataOut = new DataOutputStream(bao); + dcp.write(dataOut); + dcp = new DistCpProcedure(); + dcp.readFields( + new DataInputStream(new ByteArrayInputStream(bao.toByteArray()))); + return dcp; + } + + private void cleanup(DistributedFileSystem dfs, Path root) + throws IOException { + Path src = new Path(root, SRCDAT); + Path dst = new Path(root, DSTDAT); + DistCpProcedure.cleanupSnapshot(dfs, src); + DistCpProcedure.cleanupSnapshot(dfs, dst); + dfs.delete(root, true); + } +} diff --git a/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/TestMountTableProcedure.java b/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/TestMountTableProcedure.java new file mode 100644 index 0000000000..9dd4e5da8f --- /dev/null +++ b/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/TestMountTableProcedure.java @@ -0,0 +1,222 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.tools.fedbalance; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.ha.HAServiceProtocol; +import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; +import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster; +import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager; +import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys; +import org.apache.hadoop.hdfs.server.federation.router.Router; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; +import org.apache.hadoop.hdfs.server.federation.store.impl.MountTableStoreImpl; +import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse; +import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.util.Time; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutput; +import java.io.DataInputStream; +import java.io.ByteArrayInputStream; +import java.io.DataOutputStream; +import java.net.InetSocketAddress; +import java.net.URI; +import java.util.Collections; +import java.util.List; + +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.createNamenodeReport; +import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.synchronizeRecords; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +/** + * Basic tests of MountTableProcedure. + */ +public class TestMountTableProcedure { + + private static StateStoreDFSCluster cluster; + private static RouterContext routerContext; + private static Configuration routerConf; + private static List mockMountTable; + private static StateStoreService stateStore; + + @BeforeClass + public static void globalSetUp() throws Exception { + cluster = new StateStoreDFSCluster(false, 1); + // Build and start a router with State Store + admin + RPC + Configuration conf = new RouterConfigBuilder() + .stateStore() + .admin() + .rpc() + .build(); + cluster.addRouterOverrides(conf); + cluster.startRouters(); + routerContext = cluster.getRandomRouter(); + mockMountTable = cluster.generateMockMountTable(); + Router router = routerContext.getRouter(); + stateStore = router.getStateStore(); + + // Add two name services for testing + ActiveNamenodeResolver membership = router.getNamenodeResolver(); + membership.registerNamenode(createNamenodeReport("ns0", "nn1", + HAServiceProtocol.HAServiceState.ACTIVE)); + membership.registerNamenode(createNamenodeReport("ns1", "nn1", + HAServiceProtocol.HAServiceState.ACTIVE)); + stateStore.refreshCaches(true); + + routerConf = new Configuration(); + InetSocketAddress routerSocket = router.getAdminServerAddress(); + routerConf.setSocketAddr(RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY, + routerSocket); + } + + @AfterClass + public static void tearDown() { + cluster.stopRouter(routerContext); + } + + @Before + public void testSetup() throws Exception { + assertTrue( + synchronizeRecords(stateStore, mockMountTable, MountTable.class)); + // Avoid running with random users + routerContext.resetAdminClient(); + } + + @Test + public void testUpdateMountpoint() throws Exception { + // Firstly add mount entry: /test-path->{ns0,/test-path}. + String mount = "/test-path"; + String dst = "/test-dst"; + MountTable newEntry = MountTable + .newInstance(mount, Collections.singletonMap("ns0", mount), + Time.now(), Time.now()); + MountTableManager mountTable = + routerContext.getAdminClient().getMountTableManager(); + AddMountTableEntryRequest addRequest = + AddMountTableEntryRequest.newInstance(newEntry); + AddMountTableEntryResponse addResponse = + mountTable.addMountTableEntry(addRequest); + assertTrue(addResponse.getStatus()); + // verify the mount entry is added successfully. + GetMountTableEntriesRequest request = + GetMountTableEntriesRequest.newInstance("/"); + stateStore.loadCache(MountTableStoreImpl.class, true); // load cache. + GetMountTableEntriesResponse response = + mountTable.getMountTableEntries(request); + assertEquals(3, response.getEntries().size()); + + // set the mount table to readonly. + MountTableProcedure.disableWrite(mount, routerConf); + + // test MountTableProcedure updates the mount point. + String dstNs = "ns1"; + MountTableProcedure smtp = + new MountTableProcedure("single-mount-table-procedure", null, + 1000, mount, dst, dstNs, routerConf); + assertTrue(smtp.execute()); + stateStore.loadCache(MountTableStoreImpl.class, true); // load cache. + // verify the mount entry is updated to / + MountTable entry = + MountTableProcedure.getMountEntry(mount, mountTable); + assertNotNull(entry); + assertEquals(1, entry.getDestinations().size()); + String nsId = entry.getDestinations().get(0).getNameserviceId(); + String dstPath = entry.getDestinations().get(0).getDest(); + assertEquals(dstNs, nsId); + assertEquals(dst, dstPath); + // Verify the mount table is not readonly. + URI address = routerContext.getFileSystemURI(); + DFSClient routerClient = new DFSClient(address, routerConf); + MountTableProcedure.enableWrite(mount, routerConf); + intercept(RemoteException.class, "No namenode available to invoke mkdirs", + "Expect no namenode exception.", () -> routerClient + .mkdirs(mount + "/file", new FsPermission(020), false)); + } + + @Test + public void testDisableAndEnableWrite() throws Exception { + // Firstly add mount entry: /test-write->{ns0,/test-write}. + String mount = "/test-write"; + MountTable newEntry = MountTable + .newInstance(mount, Collections.singletonMap("ns0", mount), + Time.now(), Time.now()); + MountTableManager mountTable = + routerContext.getAdminClient().getMountTableManager(); + AddMountTableEntryRequest addRequest = + AddMountTableEntryRequest.newInstance(newEntry); + AddMountTableEntryResponse addResponse = + mountTable.addMountTableEntry(addRequest); + assertTrue(addResponse.getStatus()); + stateStore.loadCache(MountTableStoreImpl.class, true); // load cache. + + // Construct client. + URI address = routerContext.getFileSystemURI(); + DFSClient routerClient = new DFSClient(address, routerConf); + // Verify the mount point is not readonly. + intercept(RemoteException.class, "No namenode available to invoke mkdirs", + "Expect no namenode exception.", () -> routerClient + .mkdirs(mount + "/file", new FsPermission(020), false)); + + // Verify disable write. + MountTableProcedure.disableWrite(mount, routerConf); + intercept(RemoteException.class, "is in a read only mount point", + "Expect readonly exception.", () -> routerClient + .mkdirs(mount + "/dir", new FsPermission(020), false)); + + // Verify enable write. + MountTableProcedure.enableWrite(mount, routerConf); + intercept(RemoteException.class, "No namenode available to invoke mkdirs", + "Expect no namenode exception.", () -> routerClient + .mkdirs(mount + "/file", new FsPermission(020), false)); + } + + @Test + public void testSeDeserialize() throws Exception { + String fedPath = "/test-path"; + String dst = "/test-dst"; + String dstNs = "ns1"; + MountTableProcedure smtp = + new MountTableProcedure("single-mount-table-procedure", null, + 1000, fedPath, dst, dstNs, routerConf); + ByteArrayOutputStream bao = new ByteArrayOutputStream(); + DataOutput dataOut = new DataOutputStream(bao); + smtp.write(dataOut); + smtp = new MountTableProcedure(); + smtp.readFields( + new DataInputStream(new ByteArrayInputStream(bao.toByteArray()))); + assertEquals(fedPath, smtp.getMount()); + assertEquals(dst, smtp.getDstPath()); + assertEquals(dstNs, smtp.getDstNs()); + } +} \ No newline at end of file diff --git a/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/TestTrashProcedure.java b/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/TestTrashProcedure.java new file mode 100644 index 0000000000..a128932d52 --- /dev/null +++ b/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/TestTrashProcedure.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.tools.fedbalance; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutput; +import java.io.DataInputStream; +import java.io.ByteArrayInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.TrashOption; +import static org.apache.hadoop.test.GenericTestUtils.getMethodName; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertEquals; + +/** + * Test TrashProcedure. + */ +public class TestTrashProcedure { + + private static Configuration conf; + private static MiniDFSCluster cluster; + private static String nnUri; + + @BeforeClass + public static void beforeClass() throws IOException { + conf = new Configuration(); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build(); + cluster.waitActive(); + nnUri = FileSystem.getDefaultUri(conf).toString(); + } + + @AfterClass + public static void afterClass() { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test + public void testTrashProcedure() throws Exception { + Path src = new Path("/" + getMethodName() + "-src"); + Path dst = new Path("/" + getMethodName() + "-dst"); + FileSystem fs = cluster.getFileSystem(); + fs.mkdirs(src); + fs.mkdirs(new Path(src, "dir")); + assertTrue(fs.exists(src)); + + FedBalanceContext context = + new FedBalanceContext.Builder(src, dst, TestDistCpProcedure.MOUNT, conf) + .setMapNum(10).setBandwidthLimit(1).setTrash(TrashOption.TRASH) + .build(); + TrashProcedure trashProcedure = + new TrashProcedure("trash-procedure", null, 1000, context); + trashProcedure.moveToTrash(); + assertFalse(fs.exists(src)); + } + + @Test + public void testSeDeserialize() throws Exception { + Path src = new Path("/" + getMethodName() + "-src"); + Path dst = new Path("/" + getMethodName() + "-dst"); + FedBalanceContext context = + new FedBalanceContext.Builder(src, dst, TestDistCpProcedure.MOUNT, conf) + .setMapNum(10).setBandwidthLimit(1).setTrash(TrashOption.TRASH) + .build(); + TrashProcedure trashProcedure = + new TrashProcedure("trash-procedure", null, 1000, context); + ByteArrayOutputStream bao = new ByteArrayOutputStream(); + DataOutput dataOut = new DataOutputStream(bao); + trashProcedure.write(dataOut); + trashProcedure = new TrashProcedure(); + trashProcedure.readFields( + new DataInputStream(new ByteArrayInputStream(bao.toByteArray()))); + assertEquals(context, trashProcedure.getContext()); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/procedure/MultiPhaseProcedure.java b/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/procedure/MultiPhaseProcedure.java similarity index 97% rename from hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/procedure/MultiPhaseProcedure.java rename to hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/procedure/MultiPhaseProcedure.java index 27cfebd3a3..b9c9c1e1ee 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/procedure/MultiPhaseProcedure.java +++ b/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/procedure/MultiPhaseProcedure.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hdfs.procedure; +package org.apache.hadoop.tools.fedbalance.procedure; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/procedure/RecordProcedure.java b/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/procedure/RecordProcedure.java similarity index 96% rename from hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/procedure/RecordProcedure.java rename to hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/procedure/RecordProcedure.java index 706d4a1bce..9754b0994c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/procedure/RecordProcedure.java +++ b/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/procedure/RecordProcedure.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hdfs.procedure; +package org.apache.hadoop.tools.fedbalance.procedure; import java.util.ArrayList; import java.util.List; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/procedure/RetryProcedure.java b/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/procedure/RetryProcedure.java similarity index 97% rename from hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/procedure/RetryProcedure.java rename to hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/procedure/RetryProcedure.java index 336873e6a8..faec834f98 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/procedure/RetryProcedure.java +++ b/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/procedure/RetryProcedure.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hdfs.procedure; +package org.apache.hadoop.tools.fedbalance.procedure; import java.io.DataInput; import java.io.DataOutput; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/procedure/TestBalanceProcedureScheduler.java b/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/procedure/TestBalanceProcedureScheduler.java similarity index 98% rename from hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/procedure/TestBalanceProcedureScheduler.java rename to hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/procedure/TestBalanceProcedureScheduler.java index 39e000b644..7a2b449ce4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/procedure/TestBalanceProcedureScheduler.java +++ b/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/procedure/TestBalanceProcedureScheduler.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hdfs.procedure; +package org.apache.hadoop.tools.fedbalance.procedure; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; @@ -43,8 +43,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import static org.apache.hadoop.hdfs.procedure.BalanceProcedureConfigKeys.SCHEDULER_JOURNAL_URI; -import static org.apache.hadoop.hdfs.procedure.BalanceProcedureConfigKeys.WORK_THREAD_NUM; +import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.SCHEDULER_JOURNAL_URI; +import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.WORK_THREAD_NUM; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertEquals; @@ -70,6 +70,7 @@ public static void setup() throws IOException { CONF.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, "hdfs:///"); CONF.setBoolean(DFS_NAMENODE_ACLS_ENABLED_KEY, true); CONF.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE); + CONF.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0); CONF.setInt(WORK_THREAD_NUM, 1); cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(3).build(); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/procedure/UnrecoverableProcedure.java b/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/procedure/UnrecoverableProcedure.java similarity index 96% rename from hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/procedure/UnrecoverableProcedure.java rename to hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/procedure/UnrecoverableProcedure.java index 941d0a0ae7..804f1aa548 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/procedure/UnrecoverableProcedure.java +++ b/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/procedure/UnrecoverableProcedure.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hdfs.procedure; +package org.apache.hadoop.tools.fedbalance.procedure; import java.io.IOException; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/procedure/WaitProcedure.java b/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/procedure/WaitProcedure.java similarity index 97% rename from hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/procedure/WaitProcedure.java rename to hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/procedure/WaitProcedure.java index 8666caf2f6..af46b17afb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/procedure/WaitProcedure.java +++ b/hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/procedure/WaitProcedure.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hdfs.procedure; +package org.apache.hadoop.tools.fedbalance.procedure; import org.apache.hadoop.util.Time; diff --git a/hadoop-tools/hadoop-tools-dist/pom.xml b/hadoop-tools/hadoop-tools-dist/pom.xml index f923bb7c36..cc811fca69 100644 --- a/hadoop-tools/hadoop-tools-dist/pom.xml +++ b/hadoop-tools/hadoop-tools-dist/pom.xml @@ -44,6 +44,11 @@ hadoop-distcp compile + + org.apache.hadoop + hadoop-federation-balance + compile + org.apache.hadoop hadoop-archives diff --git a/hadoop-tools/pom.xml b/hadoop-tools/pom.xml index eb0a31a367..f026bc261e 100644 --- a/hadoop-tools/pom.xml +++ b/hadoop-tools/pom.xml @@ -32,6 +32,7 @@ hadoop-streaming hadoop-distcp + hadoop-federation-balance hadoop-dynamometer hadoop-archives hadoop-archive-logs