diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 04a8ef1c9a..96b0f36f9e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -3509,7 +3509,7 @@ public class DataNode extends ReconfigurableBase
               + " Disk balancing not permitted.",
           DiskBalancerException.Result.DATANODE_STATUS_NOT_REGULAR);
     }
-    // TODO : Support force option
+
     this.diskBalancer.submitPlan(planID, planVersion, planFile, planData,
             skipDateCheck);
   }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/ExecuteCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/ExecuteCommand.java
index 3a348c9fac..c7cb089c5b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/ExecuteCommand.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/ExecuteCommand.java
@@ -48,6 +48,8 @@ public class ExecuteCommand extends Command {
     super(conf);
     addValidCommandParameters(DiskBalancerCLI.EXECUTE,
         "Executes a given plan.");
+    addValidCommandParameters(DiskBalancerCLI.SKIPDATECHECK,
+        "skips the date check and force execute the plan");
   }
 
   /**
@@ -69,7 +71,16 @@ public class ExecuteCommand extends Command {
     try (FSDataInputStream plan = open(planFile)) {
       planData = IOUtils.toString(plan);
     }
-    submitPlan(planFile, planData);
+
+    boolean skipDateCheck = false;
+    if(cmd.hasOption(DiskBalancerCLI.SKIPDATECHECK)) {
+      skipDateCheck = true;
+      LOG.warn("Skipping date check on this plan. This could mean we are " +
+          "executing an old plan and may not be the right plan for this " +
+          "data node.");
+    }
+
+    submitPlan(planFile, planData, skipDateCheck);
   }
 
   /**
@@ -77,9 +88,11 @@ public class ExecuteCommand extends Command {
    *
    * @param planFile - Plan file name
    * @param planData - Plan data in json format
+   * @param skipDateCheck - skips date check
    * @throws IOException
    */
-  private void submitPlan(final String planFile, final String planData)
+  private void submitPlan(final String planFile, final String planData,
+                          boolean skipDateCheck)
           throws IOException {
     Preconditions.checkNotNull(planData);
     NodePlan plan = NodePlan.parseJson(planData);
@@ -88,9 +101,8 @@ public class ExecuteCommand extends Command {
     ClientDatanodeProtocol dataNode = getDataNodeProxy(dataNodeAddress);
     String planHash = DigestUtils.shaHex(planData);
     try {
-      // TODO : Support skipping date check.
       dataNode.submitDiskBalancerPlan(planHash, DiskBalancerCLI.PLAN_VERSION,
-                                      planFile, planData, false);
+                                      planFile, planData, skipDateCheck);
     } catch (DiskBalancerException ex) {
       LOG.error("Submitting plan on  {} failed. Result: {}, Message: {}",
           plan.getNodeName(), ex.getResult().toString(), ex.getMessage());
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DiskBalancerCLI.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DiskBalancerCLI.java
index 9fdc838bf8..00e6f0499a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DiskBalancerCLI.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DiskBalancerCLI.java
@@ -84,6 +84,13 @@ public class DiskBalancerCLI extends Configured implements Tool {
    * Executes a given plan file on the target datanode.
    */
   public static final String EXECUTE = "execute";
+
+  /**
+   * Skips date check(now by default the plan is valid for 24 hours), and force
+   * execute the plan.
+   */
+  public static final String SKIPDATECHECK = "skipDateCheck";
+
   /**
    * The report command prints out a disk fragmentation report about the data
    * cluster. By default it prints the DEFAULT_TOP machines names with high
@@ -342,7 +349,15 @@ public class DiskBalancerCLI extends Configured implements Tool {
             "submits it for execution by the datanode.")
         .create();
     getExecuteOptions().addOption(execute);
+
+
+    Option skipDateCheck = OptionBuilder.withLongOpt(SKIPDATECHECK)
+        .withDescription("skips the date check and force execute the plan")
+        .create();
+    getExecuteOptions().addOption(skipDateCheck);
+
     opt.addOption(execute);
+    opt.addOption(skipDateCheck);
   }
 
   /**
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/command/TestDiskBalancerCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/command/TestDiskBalancerCommand.java
index e999490886..f3100eae56 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/command/TestDiskBalancerCommand.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/command/TestDiskBalancerCommand.java
@@ -27,6 +27,7 @@ import static org.apache.hadoop.hdfs.tools.DiskBalancerCLI.OUTFILE;
 import static org.apache.hadoop.hdfs.tools.DiskBalancerCLI.PLAN;
 import static org.apache.hadoop.hdfs.tools.DiskBalancerCLI.QUERY;
 import static org.apache.hadoop.hdfs.tools.DiskBalancerCLI.REPORT;
+import static org.apache.hadoop.hdfs.tools.DiskBalancerCLI.SKIPDATECHECK;
 import static org.hamcrest.CoreMatchers.allOf;
 import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.CoreMatchers.is;
@@ -276,6 +277,45 @@ public class TestDiskBalancerCommand {
     }
   }
 
+  @Test(timeout = 600000)
+  public void testDiskBalancerForceExecute() throws
+      Exception {
+    final int numDatanodes = 1;
+
+    final Configuration hdfsConf = new HdfsConfiguration();
+    hdfsConf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
+    hdfsConf.set(DFSConfigKeys.DFS_DISK_BALANCER_PLAN_VALID_INTERVAL, "0d");
+
+    /* new cluster with imbalanced capacity */
+    final MiniDFSCluster miniCluster = DiskBalancerTestUtil.
+        newImbalancedCluster(
+            hdfsConf,
+            numDatanodes,
+            CAPACITIES,
+            DEFAULT_BLOCK_SIZE,
+            FILE_LEN);
+
+    try {
+      /* get full path of plan */
+      final String planFileFullName = runAndVerifyPlan(miniCluster, hdfsConf);
+
+      /* run execute command */
+      final String cmdLine = String.format(
+          "hdfs diskbalancer -%s %s -%s",
+          EXECUTE,
+          planFileFullName,
+          SKIPDATECHECK);
+
+      // Disk Balancer should execute the plan, as skipDateCheck Option is
+      // specified
+      runCommand(cmdLine, hdfsConf, miniCluster);
+    }  finally{
+      if (miniCluster != null) {
+        miniCluster.shutdown();
+      }
+    }
+  }
+
 
   @Test(timeout = 600000)
   public void testDiskBalancerExecuteOptionPlanValidity() throws Exception {