diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CachingGetSpaceUsed.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CachingGetSpaceUsed.java new file mode 100644 index 0000000000..6ef75d2836 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CachingGetSpaceUsed.java @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *
+ * http://www.apache.org/licenses/LICENSE-2.0 + *
+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Interface for class that can tell estimate much space + * is used in a directory. + *
+ * The implementor is fee to cache space used. As such there + * are methods to update the cached value with any known changes. + */ +@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"}) +@InterfaceStability.Evolving +public abstract class CachingGetSpaceUsed implements Closeable, GetSpaceUsed { + static final Logger LOG = LoggerFactory.getLogger(CachingGetSpaceUsed.class); + + protected final AtomicLong used = new AtomicLong(); + private final AtomicBoolean running = new AtomicBoolean(true); + private final long refreshInterval; + private final String dirPath; + private Thread refreshUsed; + + /** + * This is the constructor used by the builder. + * All overriding classes should implement this. + */ + public CachingGetSpaceUsed(CachingGetSpaceUsed.Builder builder) + throws IOException { + this(builder.getPath(), builder.getInterval(), builder.getInitialUsed()); + } + + /** + * Keeps track of disk usage. + * + * @param path the path to check disk usage in + * @param interval refresh the disk usage at this interval + * @param initialUsed use this value until next refresh + * @throws IOException if we fail to refresh the disk usage + */ + CachingGetSpaceUsed(File path, + long interval, + long initialUsed) throws IOException { + dirPath = path.getCanonicalPath(); + refreshInterval = interval; + used.set(initialUsed); + } + + void init() { + if (used.get() < 0) { + used.set(0); + refresh(); + } + + if (refreshInterval > 0) { + refreshUsed = new Thread(new RefreshThread(this), + "refreshUsed-" + dirPath); + refreshUsed.setDaemon(true); + refreshUsed.start(); + } else { + running.set(false); + refreshUsed = null; + } + } + + protected abstract void refresh(); + + /** + * @return an estimate of space used in the directory path. + */ + @Override public long getUsed() throws IOException { + return Math.max(used.get(), 0); + } + + /** + * @return The directory path being monitored. + */ + public String getDirPath() { + return dirPath; + } + + /** + * Increment the cached value of used space. + */ + public void incDfsUsed(long value) { + used.addAndGet(value); + } + + /** + * Is the background thread running. + */ + boolean running() { + return running.get(); + } + + /** + * How long in between runs of the background refresh. + */ + long getRefreshInterval() { + return refreshInterval; + } + + /** + * Reset the current used data amount. This should be called + * when the cached value is re-computed. + * + * @param usedValue new value that should be the disk usage. + */ + protected void setUsed(long usedValue) { + this.used.set(usedValue); + } + + @Override + public void close() throws IOException { + running.set(false); + if (refreshUsed != null) { + refreshUsed.interrupt(); + } + } + + private static final class RefreshThread implements Runnable { + + final CachingGetSpaceUsed spaceUsed; + + RefreshThread(CachingGetSpaceUsed spaceUsed) { + this.spaceUsed = spaceUsed; + } + + @Override + public void run() { + while (spaceUsed.running()) { + try { + Thread.sleep(spaceUsed.getRefreshInterval()); + // update the used variable + spaceUsed.refresh(); + } catch (InterruptedException e) { + LOG.warn("Thread Interrupted waiting to refresh disk information", e); + Thread.currentThread().interrupt(); + } + } + } + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DU.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DU.java index 5a4f52648b..f700e4f8e5 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DU.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DU.java @@ -17,227 +17,73 @@ */ package org.apache.hadoop.fs; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.util.Shell; import java.io.BufferedReader; import java.io.File; import java.io.IOException; -import java.util.concurrent.atomic.AtomicLong; -/** Filesystem disk space usage statistics. Uses the unix 'du' program*/ +/** Filesystem disk space usage statistics. Uses the unix 'du' program */ @InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"}) @InterfaceStability.Evolving -public class DU extends Shell { - private String dirPath; +public class DU extends CachingGetSpaceUsed { + private DUShell duShell; - private AtomicLong used = new AtomicLong(); - private volatile boolean shouldRun = true; - private Thread refreshUsed; - private IOException duException = null; - private long refreshInterval; - - /** - * Keeps track of disk usage. - * @param path the path to check disk usage in - * @param interval refresh the disk usage at this interval - * @throws IOException if we fail to refresh the disk usage - */ - public DU(File path, long interval) throws IOException { - this(path, interval, -1L); + @VisibleForTesting + public DU(File path, long interval, long initialUsed) throws IOException { + super(path, interval, initialUsed); } - - /** - * Keeps track of disk usage. - * @param path the path to check disk usage in - * @param interval refresh the disk usage at this interval - * @param initialUsed use this value until next refresh - * @throws IOException if we fail to refresh the disk usage - */ - public DU(File path, long interval, long initialUsed) throws IOException { - super(0); - //we set the Shell interval to 0 so it will always run our command - //and use this one to set the thread sleep interval - this.refreshInterval = interval; - this.dirPath = path.getCanonicalPath(); + public DU(CachingGetSpaceUsed.Builder builder) throws IOException { + this(builder.getPath(), builder.getInterval(), builder.getInitialUsed()); + } - //populate the used variable if the initial value is not specified. - if (initialUsed < 0) { - run(); - } else { - this.used.set(initialUsed); + @Override + protected synchronized void refresh() { + if (duShell == null) { + duShell = new DUShell(); + } + try { + duShell.startRefresh(); + } catch (IOException ioe) { + LOG.warn("Could not get disk usage information", ioe); } } - /** - * Keeps track of disk usage. - * @param path the path to check disk usage in - * @param conf configuration object - * @throws IOException if we fail to refresh the disk usage - */ - public DU(File path, Configuration conf) throws IOException { - this(path, conf, -1L); - } - - /** - * Keeps track of disk usage. - * @param path the path to check disk usage in - * @param conf configuration object - * @param initialUsed use it until the next refresh. - * @throws IOException if we fail to refresh the disk usage - */ - public DU(File path, Configuration conf, long initialUsed) - throws IOException { - this(path, conf.getLong(CommonConfigurationKeys.FS_DU_INTERVAL_KEY, - CommonConfigurationKeys.FS_DU_INTERVAL_DEFAULT), initialUsed); - } - - - - /** - * This thread refreshes the "used" variable. - * - * Future improvements could be to not permanently - * run this thread, instead run when getUsed is called. - **/ - class DURefreshThread implements Runnable { - + private final class DUShell extends Shell { + void startRefresh() throws IOException { + run(); + } @Override - public void run() { - - while(shouldRun) { + public String toString() { + return + "du -sk " + getDirPath() + "\n" + used.get() + "\t" + getDirPath(); + } - try { - Thread.sleep(refreshInterval); - - try { - //update the used variable - DU.this.run(); - } catch (IOException e) { - synchronized (DU.this) { - //save the latest exception so we can return it in getUsed() - duException = e; - } - - LOG.warn("Could not get disk usage information", e); - } - } catch (InterruptedException e) { - } + @Override + protected String[] getExecString() { + return new String[]{"du", "-sk", getDirPath()}; + } + + @Override + protected void parseExecResult(BufferedReader lines) throws IOException { + String line = lines.readLine(); + if (line == null) { + throw new IOException("Expecting a line not the end of stream"); } - } - } - - /** - * Decrease how much disk space we use. - * @param value decrease by this value - */ - public void decDfsUsed(long value) { - used.addAndGet(-value); - } - - /** - * Increase how much disk space we use. - * @param value increase by this value - */ - public void incDfsUsed(long value) { - used.addAndGet(value); - } - - /** - * @return disk space used - * @throws IOException if the shell command fails - */ - public long getUsed() throws IOException { - //if the updating thread isn't started, update on demand - if(refreshUsed == null) { - run(); - } else { - synchronized (DU.this) { - //if an exception was thrown in the last run, rethrow - if(duException != null) { - IOException tmp = duException; - duException = null; - throw tmp; - } + String[] tokens = line.split("\t"); + if (tokens.length == 0) { + throw new IOException("Illegal du output"); } + setUsed(Long.parseLong(tokens[0]) * 1024); } - - return Math.max(used.longValue(), 0L); + } - /** - * @return the path of which we're keeping track of disk usage - */ - public String getDirPath() { - return dirPath; - } - - - /** - * Override to hook in DUHelper class. Maybe this can be used more - * generally as well on Unix/Linux based systems - */ - @Override - protected void run() throws IOException { - if (WINDOWS) { - used.set(DUHelper.getFolderUsage(dirPath)); - return; - } - super.run(); - } - - /** - * Start the disk usage checking thread. - */ - public void start() { - //only start the thread if the interval is sane - if(refreshInterval > 0) { - refreshUsed = new Thread(new DURefreshThread(), - "refreshUsed-"+dirPath); - refreshUsed.setDaemon(true); - refreshUsed.start(); - } - } - - /** - * Shut down the refreshing thread. - */ - public void shutdown() { - this.shouldRun = false; - - if(this.refreshUsed != null) { - this.refreshUsed.interrupt(); - } - } - - @Override - public String toString() { - return - "du -sk " + dirPath +"\n" + - used + "\t" + dirPath; - } - - @Override - protected String[] getExecString() { - return new String[] {"du", "-sk", dirPath}; - } - - @Override - protected void parseExecResult(BufferedReader lines) throws IOException { - String line = lines.readLine(); - if (line == null) { - throw new IOException("Expecting a line not the end of stream"); - } - String[] tokens = line.split("\t"); - if(tokens.length == 0) { - throw new IOException("Illegal du output"); - } - this.used.set(Long.parseLong(tokens[0])*1024); - } public static void main(String[] args) throws Exception { String path = "."; @@ -245,6 +91,10 @@ public class DU extends Shell { path = args[0]; } - System.out.println(new DU(new File(path), new Configuration()).toString()); + GetSpaceUsed du = new GetSpaceUsed.Builder().setPath(new File(path)) + .setConf(new Configuration()) + .build(); + String duResult = du.toString(); + System.out.println(duResult); } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/GetSpaceUsed.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/GetSpaceUsed.java new file mode 100644 index 0000000000..aebc3f7969 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/GetSpaceUsed.java @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *
+ * http://www.apache.org/licenses/LICENSE-2.0 + *
+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.Shell; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; + +public interface GetSpaceUsed { + long getUsed() throws IOException; + + /** + * The builder class + */ + final class Builder { + static final Logger LOG = LoggerFactory.getLogger(Builder.class); + + static final String CLASSNAME_KEY = "fs.getspaceused.classname"; + + private Configuration conf; + private Class extends GetSpaceUsed> klass = null; + private File path = null; + private Long interval = null; + private Long initialUsed = null; + + public Configuration getConf() { + return conf; + } + + public Builder setConf(Configuration conf) { + this.conf = conf; + return this; + } + + public long getInterval() { + if (interval != null) { + return interval; + } + long result = CommonConfigurationKeys.FS_DU_INTERVAL_DEFAULT; + if (conf == null) { + return result; + } + return conf.getLong(CommonConfigurationKeys.FS_DU_INTERVAL_KEY, result); + } + + public Builder setInterval(long interval) { + this.interval = interval; + return this; + } + + public Class extends GetSpaceUsed> getKlass() { + if (klass != null) { + return klass; + } + Class extends GetSpaceUsed> result = null; + if (Shell.WINDOWS) { + result = WindowsGetSpaceUsed.class; + } else { + result = DU.class; + } + if (conf == null) { + return result; + } + return conf.getClass(CLASSNAME_KEY, result, GetSpaceUsed.class); + } + + public Builder setKlass(Class extends GetSpaceUsed> klass) { + this.klass = klass; + return this; + } + + public File getPath() { + return path; + } + + public Builder setPath(File path) { + this.path = path; + return this; + } + + public long getInitialUsed() { + if (initialUsed == null) { + return -1; + } + return initialUsed; + } + + public Builder setInitialUsed(long initialUsed) { + this.initialUsed = initialUsed; + return this; + } + + public GetSpaceUsed build() throws IOException { + GetSpaceUsed getSpaceUsed = null; + try { + Constructor extends GetSpaceUsed> cons = + getKlass().getConstructor(Builder.class); + getSpaceUsed = cons.newInstance(this); + } catch (InstantiationException e) { + LOG.warn("Error trying to create an instance of " + getKlass(), e); + } catch (IllegalAccessException e) { + LOG.warn("Error trying to create " + getKlass(), e); + } catch (InvocationTargetException e) { + LOG.warn("Error trying to create " + getKlass(), e); + } catch (NoSuchMethodException e) { + LOG.warn("Doesn't look like the class " + getKlass() + + " have the needed constructor", e); + } + // If there were any exceptions then du will be null. + // Construct our best guess fallback. + if (getSpaceUsed == null) { + if (Shell.WINDOWS) { + getSpaceUsed = new WindowsGetSpaceUsed(this); + } else { + getSpaceUsed = new DU(this); + } + } + // Call init after classes constructors have finished. + if (getSpaceUsed instanceof CachingGetSpaceUsed) { + ((CachingGetSpaceUsed) getSpaceUsed).init(); + } + return getSpaceUsed; + } + + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/WindowsGetSpaceUsed.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/WindowsGetSpaceUsed.java new file mode 100644 index 0000000000..deb1343bc6 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/WindowsGetSpaceUsed.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *
+ * http://www.apache.org/licenses/LICENSE-2.0 + *
+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +import java.io.IOException; + +/** + * Class to tell the size of a path on windows. + * Rather than shelling out, on windows this uses DUHelper.getFolderUsage + */ +@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"}) +@InterfaceStability.Evolving +public class WindowsGetSpaceUsed extends CachingGetSpaceUsed { + + + WindowsGetSpaceUsed(CachingGetSpaceUsed.Builder builder) throws IOException { + super(builder.getPath(), builder.getInterval(), builder.getInitialUsed()); + } + + /** + * Override to hook in DUHelper class. + */ + @Override + protected void refresh() { + used.set(DUHelper.getFolderUsage(getDirPath())); + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestDU.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestDU.java index dded9fbd12..739263fab5 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestDU.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestDU.java @@ -28,7 +28,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.test.GenericTestUtils; -/** This test makes sure that "DU" does not get to run on each call to getUsed */ +/** This test makes sure that "DU" does not get to run on each call to getUsed */ public class TestDU extends TestCase { final static private File DU_DIR = GenericTestUtils.getTestDir("dutmp"); @@ -42,7 +42,7 @@ public class TestDU extends TestCase { public void tearDown() throws IOException { FileUtil.fullyDelete(DU_DIR); } - + private void createFile(File newFile, int size) throws IOException { // write random data so that filesystems with compression enabled (e.g., ZFS) // can't compress the file @@ -54,18 +54,18 @@ public class TestDU extends TestCase { RandomAccessFile file = new RandomAccessFile(newFile, "rws"); file.write(data); - + file.getFD().sync(); file.close(); } /** * Verify that du returns expected used space for a file. - * We assume here that if a file system crates a file of size + * We assume here that if a file system crates a file of size * that is a multiple of the block size in this file system, * then the used size for the file will be exactly that size. * This is true for most file systems. - * + * * @throws IOException * @throws InterruptedException */ @@ -78,28 +78,29 @@ public class TestDU extends TestCase { createFile(file, writtenSize); Thread.sleep(5000); // let the metadata updater catch up - - DU du = new DU(file, 10000); - du.start(); + + DU du = new DU(file, 10000, -1); + du.init(); long duSize = du.getUsed(); - du.shutdown(); + du.close(); assertTrue("Invalid on-disk size", duSize >= writtenSize && writtenSize <= (duSize + slack)); - - //test with 0 interval, will not launch thread - du = new DU(file, 0); - du.start(); + + //test with 0 interval, will not launch thread + du = new DU(file, 0, -1); + du.init(); duSize = du.getUsed(); - du.shutdown(); - + du.close(); + assertTrue("Invalid on-disk size", duSize >= writtenSize && writtenSize <= (duSize + slack)); - - //test without launching thread - du = new DU(file, 10000); + + //test without launching thread + du = new DU(file, 10000, -1); + du.init(); duSize = du.getUsed(); assertTrue("Invalid on-disk size", @@ -111,8 +112,8 @@ public class TestDU extends TestCase { assertTrue(file.createNewFile()); Configuration conf = new Configuration(); conf.setLong(CommonConfigurationKeys.FS_DU_INTERVAL_KEY, 10000L); - DU du = new DU(file, conf); - du.decDfsUsed(Long.MAX_VALUE); + DU du = new DU(file, 10000L, -1); + du.incDfsUsed(-Long.MAX_VALUE); long duSize = du.getUsed(); assertTrue(String.valueOf(duSize), duSize >= 0L); } @@ -121,7 +122,7 @@ public class TestDU extends TestCase { File file = new File(DU_DIR, "dataX"); createFile(file, 8192); DU du = new DU(file, 3000, 1024); - du.start(); + du.init(); assertTrue("Initial usage setting not honored", du.getUsed() == 1024); // wait until the first du runs. @@ -131,4 +132,7 @@ public class TestDU extends TestCase { assertTrue("Usage didn't get updated", du.getUsed() == 8192); } + + + } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestGetSpaceUsed.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestGetSpaceUsed.java new file mode 100644 index 0000000000..f436713ea6 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestGetSpaceUsed.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *
+ * http://www.apache.org/licenses/LICENSE-2.0 + *
+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import org.apache.hadoop.conf.Configuration; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; + +import static org.junit.Assert.*; + +public class TestGetSpaceUsed { + final static private File DIR = new File( + System.getProperty("test.build.data", "/tmp"), "TestGetSpaceUsed"); + + @Before + public void setUp() { + FileUtil.fullyDelete(DIR); + assertTrue(DIR.mkdirs()); + } + + @After + public void tearDown() throws IOException { + FileUtil.fullyDelete(DIR); + } + + /** + * Test that the builder can create a class specified through the class. + */ + @Test + public void testBuilderConf() throws Exception { + File file = new File(DIR, "testBuilderConf"); + assertTrue(file.createNewFile()); + Configuration conf = new Configuration(); + conf.set("fs.getspaceused.classname", DummyDU.class.getName()); + CachingGetSpaceUsed instance = + (CachingGetSpaceUsed) new CachingGetSpaceUsed.Builder() + .setPath(file) + .setInterval(0) + .setConf(conf) + .build(); + assertNotNull(instance); + assertTrue(instance instanceof DummyDU); + assertFalse(instance.running()); + instance.close(); + } + + @Test + public void testBuildInitial() throws Exception { + File file = new File(DIR, "testBuildInitial"); + assertTrue(file.createNewFile()); + CachingGetSpaceUsed instance = + (CachingGetSpaceUsed) new CachingGetSpaceUsed.Builder() + .setPath(file) + .setInitialUsed(90210) + .setKlass(DummyDU.class) + .build(); + assertEquals(90210, instance.getUsed()); + instance.close(); + } + + @Test + public void testBuildInterval() throws Exception { + File file = new File(DIR, "testBuildInitial"); + assertTrue(file.createNewFile()); + CachingGetSpaceUsed instance = + (CachingGetSpaceUsed) new CachingGetSpaceUsed.Builder() + .setPath(file) + .setInitialUsed(90210) + .setInterval(50060) + .setKlass(DummyDU.class) + .build(); + assertEquals(50060, instance.getRefreshInterval()); + instance.close(); + } + + @Test + public void testBuildNonCaching() throws Exception { + File file = new File(DIR, "testBuildNonCaching"); + assertTrue(file.createNewFile()); + GetSpaceUsed instance = new CachingGetSpaceUsed.Builder() + .setPath(file) + .setInitialUsed(90210) + .setInterval(50060) + .setKlass(DummyGetSpaceUsed.class) + .build(); + assertEquals(300, instance.getUsed()); + assertTrue(instance instanceof DummyGetSpaceUsed); + } + + private static class DummyDU extends CachingGetSpaceUsed { + + public DummyDU(Builder builder) throws IOException { + // Push to the base class. + // Most times that's all that will need to be done. + super(builder); + } + + @Override + protected void refresh() { + // This is a test so don't du anything. + } + } + + private static class DummyGetSpaceUsed implements GetSpaceUsed { + + public DummyGetSpaceUsed(GetSpaceUsed.Builder builder) { + + } + + @Override public long getUsed() throws IOException { + return 300; + } + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java index aa33851cca..cb8e4838aa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java @@ -36,8 +36,9 @@ import org.apache.commons.io.FileUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.DU; +import org.apache.hadoop.fs.CachingGetSpaceUsed; import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.GetSpaceUsed; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.protocol.Block; @@ -62,10 +63,10 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.io.Files; /** - * A block pool slice represents a portion of a block pool stored on a volume. - * Taken together, all BlockPoolSlices sharing a block pool ID across a + * A block pool slice represents a portion of a block pool stored on a volume. + * Taken together, all BlockPoolSlices sharing a block pool ID across a * cluster represent a single block pool. - * + * * This class is synchronized by {@link FsVolumeImpl}. */ class BlockPoolSlice { @@ -92,10 +93,10 @@ class BlockPoolSlice { private final Timer timer; // TODO:FEDERATION scalability issue - a thread per DU is needed - private final DU dfsUsage; + private final GetSpaceUsed dfsUsage; /** - * Create a blook pool slice + * Create a blook pool slice * @param bpid Block pool Id * @param volume {@link FsVolumeImpl} to which this BlockPool belongs to * @param bpDir directory corresponding to the BlockPool @@ -107,7 +108,7 @@ class BlockPoolSlice { Configuration conf, Timer timer) throws IOException { this.bpid = bpid; this.volume = volume; - this.currentDir = new File(bpDir, DataStorage.STORAGE_DIR_CURRENT); + this.currentDir = new File(bpDir, DataStorage.STORAGE_DIR_CURRENT); this.finalizedDir = new File( currentDir, DataStorage.STORAGE_DIR_FINALIZED); this.lazypersistDir = new File(currentDir, DataStorage.STORAGE_DIR_LAZY_PERSIST); @@ -151,8 +152,10 @@ class BlockPoolSlice { } // Use cached value initially if available. Or the following call will // block until the initial du command completes. - this.dfsUsage = new DU(bpDir, conf, loadDfsUsed()); - this.dfsUsage.start(); + this.dfsUsage = new CachingGetSpaceUsed.Builder().setPath(bpDir) + .setConf(conf) + .setInitialUsed(loadDfsUsed()) + .build(); // Make the dfs usage to be saved during shutdown. ShutdownHookManager.get().addShutdownHook( @@ -173,7 +176,7 @@ class BlockPoolSlice { File getFinalizedDir() { return finalizedDir; } - + File getLazypersistDir() { return lazypersistDir; } @@ -188,17 +191,21 @@ class BlockPoolSlice { /** Run DU on local drives. It must be synchronized from caller. */ void decDfsUsed(long value) { - dfsUsage.decDfsUsed(value); + if (dfsUsage instanceof CachingGetSpaceUsed) { + ((CachingGetSpaceUsed)dfsUsage).incDfsUsed(-value); + } } - + long getDfsUsed() throws IOException { return dfsUsage.getUsed(); } void incDfsUsed(long value) { - dfsUsage.incDfsUsed(value); + if (dfsUsage instanceof CachingGetSpaceUsed) { + ((CachingGetSpaceUsed)dfsUsage).incDfsUsed(value); + } } - + /** * Read in the cached DU value and return it if it is less than * cachedDfsUsedCheckTime which is set by @@ -304,7 +311,10 @@ class BlockPoolSlice { } File blockFile = FsDatasetImpl.moveBlockFiles(b, f, blockDir); File metaFile = FsDatasetUtil.getMetaFile(blockFile, b.getGenerationStamp()); - dfsUsage.incDfsUsed(b.getNumBytes()+metaFile.length()); + if (dfsUsage instanceof CachingGetSpaceUsed) { + ((CachingGetSpaceUsed) dfsUsage).incDfsUsed( + b.getNumBytes() + metaFile.length()); + } return blockFile; } @@ -331,7 +341,7 @@ class BlockPoolSlice { } - + void getVolumeMap(ReplicaMap volumeMap, final RamDiskReplicaTracker lazyWriteReplicaMap) throws IOException { @@ -342,7 +352,7 @@ class BlockPoolSlice { FsDatasetImpl.LOG.info( "Recovered " + numRecovered + " replicas from " + lazypersistDir); } - + boolean success = readReplicasFromCache(volumeMap, lazyWriteReplicaMap); if (!success) { // add finalized replicas @@ -436,7 +446,7 @@ class BlockPoolSlice { FileUtil.fullyDelete(source); return numRecovered; } - + private void addReplicaToReplicasMap(Block block, ReplicaMap volumeMap, final RamDiskReplicaTracker lazyWriteReplicaMap,boolean isFinalized) throws IOException { @@ -444,7 +454,7 @@ class BlockPoolSlice { long blockId = block.getBlockId(); long genStamp = block.getGenerationStamp(); if (isFinalized) { - newReplica = new FinalizedReplica(blockId, + newReplica = new FinalizedReplica(blockId, block.getNumBytes(), genStamp, volume, DatanodeUtil .idToBlockDir(finalizedDir, blockId)); } else { @@ -461,7 +471,7 @@ class BlockPoolSlice { // We don't know the expected block length, so just use 0 // and don't reserve any more space for writes. newReplica = new ReplicaBeingWritten(blockId, - validateIntegrityAndSetLength(file, genStamp), + validateIntegrityAndSetLength(file, genStamp), genStamp, volume, file.getParentFile(), null, 0); loadRwr = false; } @@ -507,7 +517,7 @@ class BlockPoolSlice { incrNumBlocks(); } } - + /** * Add replicas under the given directory to the volume map @@ -537,12 +547,12 @@ class BlockPoolSlice { } if (!Block.isBlockFilename(file)) continue; - + long genStamp = FsDatasetUtil.getGenerationStampFromFile( files, file); long blockId = Block.filename2id(file.getName()); - Block block = new Block(blockId, file.length(), genStamp); - addReplicaToReplicasMap(block, volumeMap, lazyWriteReplicaMap, + Block block = new Block(blockId, file.length(), genStamp); + addReplicaToReplicasMap(block, volumeMap, lazyWriteReplicaMap, isFinalized); } } @@ -636,11 +646,11 @@ class BlockPoolSlice { /** * Find out the number of bytes in the block that match its crc. - * - * This algorithm assumes that data corruption caused by unexpected + * + * This algorithm assumes that data corruption caused by unexpected * datanode shutdown occurs only in the last crc chunk. So it checks * only the last chunk. - * + * * @param blockFile the block file * @param genStamp generation stamp of the block * @return the number of valid bytes @@ -667,7 +677,7 @@ class BlockPoolSlice { int bytesPerChecksum = checksum.getBytesPerChecksum(); int checksumSize = checksum.getChecksumSize(); long numChunks = Math.min( - (blockFileLen + bytesPerChecksum - 1)/bytesPerChecksum, + (blockFileLen + bytesPerChecksum - 1)/bytesPerChecksum, (metaFileLen - crcHeaderLen)/checksumSize); if (numChunks == 0) { return 0; @@ -710,17 +720,20 @@ class BlockPoolSlice { IOUtils.closeStream(blockIn); } } - + @Override public String toString() { return currentDir.getAbsolutePath(); } - + void shutdown(BlockListAsLongs blocksListToPersist) { saveReplicas(blocksListToPersist); saveDfsUsed(); dfsUsedSaved = true; - dfsUsage.shutdown(); + + if (dfsUsage instanceof CachingGetSpaceUsed) { + IOUtils.cleanup(LOG, ((CachingGetSpaceUsed) dfsUsage)); + } } private boolean readReplicasFromCache(ReplicaMap volumeMap, @@ -729,17 +742,17 @@ class BlockPoolSlice { File replicaFile = new File(currentDir, REPLICA_CACHE_FILE); // Check whether the file exists or not. if (!replicaFile.exists()) { - LOG.info("Replica Cache file: "+ replicaFile.getPath() + + LOG.info("Replica Cache file: "+ replicaFile.getPath() + " doesn't exist "); return false; } long fileLastModifiedTime = replicaFile.lastModified(); if (System.currentTimeMillis() > fileLastModifiedTime + replicaCacheExpiry) { - LOG.info("Replica Cache file: " + replicaFile.getPath() + + LOG.info("Replica Cache file: " + replicaFile.getPath() + " has gone stale"); // Just to make findbugs happy if (!replicaFile.delete()) { - LOG.info("Replica Cache file: " + replicaFile.getPath() + + LOG.info("Replica Cache file: " + replicaFile.getPath() + " cannot be deleted"); } return false; @@ -776,7 +789,7 @@ class BlockPoolSlice { iter.remove(); volumeMap.add(bpid, info); } - LOG.info("Successfully read replica from cache file : " + LOG.info("Successfully read replica from cache file : " + replicaFile.getPath()); return true; } catch (Exception e) { @@ -794,10 +807,10 @@ class BlockPoolSlice { // close the inputStream IOUtils.closeStream(inputStream); } - } - + } + private void saveReplicas(BlockListAsLongs blocksListToPersist) { - if (blocksListToPersist == null || + if (blocksListToPersist == null || blocksListToPersist.getNumberOfBlocks()== 0) { return; } @@ -813,7 +826,7 @@ class BlockPoolSlice { replicaCacheFile.getPath()); return; } - + FileOutputStream out = null; try { out = new FileOutputStream(tmpFile); @@ -827,7 +840,7 @@ class BlockPoolSlice { // and continue. LOG.warn("Failed to write replicas to cache ", e); if (replicaCacheFile.exists() && !replicaCacheFile.delete()) { - LOG.warn("Failed to delete replicas file: " + + LOG.warn("Failed to delete replicas file: " + replicaCacheFile.getPath()); } } finally {