diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index ef5bdd05cf..ce87883546 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -202,6 +202,9 @@ Release 0.23.1 - Unreleased HADOOP-7504. Add the missing Ganglia31 opts to hadoop-metrics.properties as a comment. (harsh) + HADOOP-7933. Add a getDelegationTokens api to FileSystem which checks + for known tokens in the passed Credentials object. (sseth) + OPTIMIZATIONS BUG FIXES @@ -235,6 +238,9 @@ Release 0.23.1 - Unreleased HADOOP-7837. no NullAppender in the log4j config. (eli) + HADOOP-7948. Shell scripts created by hadoop-dist/pom.xml to build tar do not + properly propagate failure. (cim_michajlomatijkiw via tucu) + Release 0.23.0 - 2011-11-01 INCOMPATIBLE CHANGES diff --git a/hadoop-common-project/hadoop-common/src/main/docs/src/documentation/content/xdocs/cluster_setup.xml b/hadoop-common-project/hadoop-common/src/main/docs/src/documentation/content/xdocs/cluster_setup.xml index 4fb057ff76..1a2de1dd00 100644 --- a/hadoop-common-project/hadoop-common/src/main/docs/src/documentation/content/xdocs/cluster_setup.xml +++ b/hadoop-common-project/hadoop-common/src/main/docs/src/documentation/content/xdocs/cluster_setup.xml @@ -628,8 +628,11 @@ conf/hdfs-site.xml dfs.blocksize - 134217728 - HDFS blocksize of 128MB for large file-systems. + 128m + + HDFS blocksize of 128 MB for large file-systems. Sizes can be provided + in size-prefixed values (10k, 128m, 1g, etc.) or simply in bytes (134217728 for 128 MB, etc.). + conf/hdfs-site.xml diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java index 71d05f2c3b..4fe9d77573 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java @@ -47,6 +47,7 @@ import org.apache.hadoop.fs.Options.Rename; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.MultipleIOException; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; @@ -393,6 +394,40 @@ public Token getDelegationToken(String renewer) throws IOException { public List> getDelegationTokens(String renewer) throws IOException { return new ArrayList>(0); } + + /** + * @see #getDelegationTokens(String) + * This is similar to getDelegationTokens, with the added restriction that if + * a token is already present in the passed Credentials object - that token + * is returned instead of a new delegation token. + * + * If the token is found to be cached in the Credentials object, this API does + * not verify the token validity or the passed in renewer. + * + * + * @param renewer the account name that is allowed to renew the token. + * @param credentials a Credentials object containing already knowing + * delegationTokens. + * @return a list of delegation tokens. + * @throws IOException + */ + @InterfaceAudience.LimitedPrivate({ "HDFS", "MapReduce" }) + public List> getDelegationTokens(String renewer, + Credentials credentials) throws IOException { + List> allTokens = getDelegationTokens(renewer); + List> newTokens = new ArrayList>(); + if (allTokens != null) { + for (Token token : allTokens) { + Token knownToken = credentials.getToken(token.getService()); + if (knownToken == null) { + newTokens.add(token); + } else { + newTokens.add(knownToken); + } + } + } + return newTokens; + } /** create a file with the provided permission * The permission of the file is set to be the provided permission as in diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java index f0475c95f2..f59085c87a 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java @@ -27,6 +27,7 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.Progressable; @@ -388,4 +389,11 @@ public Token getDelegationToken(String renewer) throws IOException { public List> getDelegationTokens(String renewer) throws IOException { return fs.getDelegationTokens(renewer); } -} + + @Override + // FileSystem + public List> getDelegationTokens(String renewer, + Credentials credentials) throws IOException { + return fs.getDelegationTokens(renewer, credentials); + } +} \ No newline at end of file diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java index b156194928..c1e290c012 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java @@ -24,7 +24,9 @@ import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.StringTokenizer; import java.util.Map.Entry; @@ -45,7 +47,9 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.viewfs.InodeTree.INode; import org.apache.hadoop.fs.viewfs.InodeTree.INodeLink; +import org.apache.hadoop.io.Text; import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.Progressable; @@ -495,7 +499,40 @@ public List> getDelegationTokens(String renewer) throws IOException { } return result; } - + + @Override + public List> getDelegationTokens(String renewer, + Credentials credentials) throws IOException { + List> mountPoints = + fsState.getMountPoints(); + int initialListSize = 0; + for (InodeTree.MountPoint im : mountPoints) { + initialListSize += im.target.targetDirLinkList.length; + } + Set seenServiceNames = new HashSet(); + List> result = new ArrayList>(initialListSize); + for (int i = 0; i < mountPoints.size(); ++i) { + String serviceName = + mountPoints.get(i).target.targetFileSystem.getCanonicalServiceName(); + if (seenServiceNames.contains(serviceName)) { + continue; + } + seenServiceNames.add(serviceName); + Token knownToken = credentials.getToken(new Text(serviceName)); + if (knownToken != null) { + result.add(knownToken); + } else { + List> tokens = + mountPoints.get(i).target.targetFileSystem + .getDelegationTokens(renewer); + if (tokens != null) { + result.addAll(tokens); + } + } + } + return result; + } + /* * An instance of this class represents an internal dir of the viewFs * that is internal dir of the mount table. diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/ViewFileSystemBaseTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/ViewFileSystemBaseTest.java index dd18b14bb1..5276a06207 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/ViewFileSystemBaseTest.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/ViewFileSystemBaseTest.java @@ -34,6 +34,7 @@ import org.apache.hadoop.fs.viewfs.ViewFileSystem; import org.apache.hadoop.fs.viewfs.ViewFileSystem.MountPoint; import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.Token; import org.junit.After; import org.junit.Assert; @@ -89,6 +90,16 @@ public void setUp() throws Exception { // Set up the defaultMT in the config with our mount point links //Configuration conf = new Configuration(); conf = ViewFileSystemTestSetup.configWithViewfsScheme(); + setupMountPoints(); + fsView = FileSystem.get(FsConstants.VIEWFS_URI, conf); + } + + @After + public void tearDown() throws Exception { + fsTarget.delete(FileSystemTestHelper.getTestRootPath(fsTarget), true); + } + + void setupMountPoints() { ConfigUtil.addLink(conf, "/user", new Path(targetTestRoot,"user").toUri()); ConfigUtil.addLink(conf, "/user2", new Path(targetTestRoot,"user").toUri()); ConfigUtil.addLink(conf, "/data", new Path(targetTestRoot,"data").toUri()); @@ -100,20 +111,17 @@ public void setUp() throws Exception { new Path(targetTestRoot,"missingTarget").toUri()); ConfigUtil.addLink(conf, "/linkToAFile", new Path(targetTestRoot,"aFile").toUri()); - - fsView = FileSystem.get(FsConstants.VIEWFS_URI, conf); - } - - @After - public void tearDown() throws Exception { - fsTarget.delete(FileSystemTestHelper.getTestRootPath(fsTarget), true); } @Test public void testGetMountPoints() { ViewFileSystem viewfs = (ViewFileSystem) fsView; MountPoint[] mountPoints = viewfs.getMountPoints(); - Assert.assertEquals(7, mountPoints.length); + Assert.assertEquals(getExpectedMountPoints(), mountPoints.length); + } + + int getExpectedMountPoints() { + return 7; } /** @@ -125,9 +133,46 @@ public void testGetMountPoints() { public void testGetDelegationTokens() throws IOException { List> delTokens = fsView.getDelegationTokens("sanjay"); - Assert.assertEquals(0, delTokens.size()); + Assert.assertEquals(getExpectedDelegationTokenCount(), delTokens.size()); } + int getExpectedDelegationTokenCount() { + return 0; + } + + @Test + public void testGetDelegationTokensWithCredentials() throws IOException { + Credentials credentials = new Credentials(); + List> delTokens = + fsView.getDelegationTokens("sanjay", credentials); + + int expectedTokenCount = getExpectedDelegationTokenCountWithCredentials(); + + Assert.assertEquals(expectedTokenCount, delTokens.size()); + for (int i = 0; i < expectedTokenCount / 2; i++) { + Token token = delTokens.get(i); + credentials.addToken(token.getService(), token); + } + + List> delTokens2 = + fsView.getDelegationTokens("sanjay", credentials); + Assert.assertEquals(expectedTokenCount, delTokens2.size()); + + for (int i = 0; i < delTokens2.size(); i++) { + for (int j = 0; j < delTokens.size(); j++) { + if (delTokens.get(j) == delTokens2.get(i)) { + delTokens.remove(j); + break; + } + } + } + Assert.assertEquals(expectedTokenCount / 2, delTokens.size()); + } + + int getExpectedDelegationTokenCountWithCredentials() { + return 0; + } + @Test public void testBasicPaths() { Assert.assertEquals(FsConstants.VIEWFS_URI, @@ -340,7 +385,7 @@ public void testListOnInternalDirsOfMountTable() throws IOException { FileStatus[] dirPaths = fsView.listStatus(new Path("/")); FileStatus fs; - Assert.assertEquals(6, dirPaths.length); + Assert.assertEquals(getExpectedDirPaths(), dirPaths.length); fs = FileSystemTestHelper.containsPath(fsView, "/user", dirPaths); Assert.assertNotNull(fs); Assert.assertTrue("A mount should appear as symlink", fs.isSymlink()); @@ -372,6 +417,10 @@ public void testListOnInternalDirsOfMountTable() throws IOException { Assert.assertTrue("A mount should appear as symlink", fs.isSymlink()); } + int getExpectedDirPaths() { + return 6; + } + @Test public void testListOnMountTargetDirs() throws IOException { FileStatus[] dirPaths = fsView.listStatus(new Path("/data")); diff --git a/hadoop-dist/pom.xml b/hadoop-dist/pom.xml index 93fe32be25..888d533327 100644 --- a/hadoop-dist/pom.xml +++ b/hadoop-dist/pom.xml @@ -98,11 +98,12 @@ run() { echo "\$ ${@}" "${@}" - if [ $? != 0 ]; then + res=$? + if [ $res != 0 ]; then echo echo "Failed!" echo - exit $? + exit $res fi } @@ -139,11 +140,12 @@ run() { echo "\$ ${@}" "${@}" - if [ $? != 0 ]; then + res=$? + if [ $res != 0 ]; then echo echo "Failed!" echo - exit $? + exit $res fi } diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 2b478692d5..c1e7bd92e3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -121,6 +121,8 @@ Trunk (unreleased changes) HDFS-2729. Update BlockManager's comments regarding the invalid block set (harsh) + HDFS-1314. Make dfs.blocksize accept size-indicating prefixes (Sho Shimauchi via harsh) + OPTIMIZATIONS HDFS-2477. Optimize computing the diff between a block report and the namenode state. (Tomasz Nykiel via hairong) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 529ee7177a..46fa863f3b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -197,7 +197,7 @@ static class Conf { /** dfs.write.packet.size is an internal config variable */ writePacketSize = conf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY, DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT); - defaultBlockSize = conf.getLong(DFS_BLOCK_SIZE_KEY, + defaultBlockSize = conf.getLongBytes(DFS_BLOCK_SIZE_KEY, DFS_BLOCK_SIZE_DEFAULT); defaultReplication = (short) conf.getInt( DFS_REPLICATION_KEY, DFS_REPLICATION_DEFAULT); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java index eed58ecad4..8c932d71cc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java @@ -119,7 +119,7 @@ synchronized void release() { conf.getInt(DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_KEY, DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT); - this.estimateBlockSize = conf.getLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, + this.estimateBlockSize = conf.getLongBytes(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT); //set up parameter for cluster balancing diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 16740fca89..ac20b0ee4d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -714,7 +714,7 @@ private void setConfigurationParameters(Configuration conf) fsOwner.getShortUserName(), supergroup, new FsPermission(filePermission)); this.serverDefaults = new FsServerDefaults( - conf.getLong(DFS_BLOCK_SIZE_KEY, DFS_BLOCK_SIZE_DEFAULT), + conf.getLongBytes(DFS_BLOCK_SIZE_KEY, DFS_BLOCK_SIZE_DEFAULT), conf.getInt(DFS_BYTES_PER_CHECKSUM_KEY, DFS_BYTES_PER_CHECKSUM_DEFAULT), conf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY, DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT), (short) conf.getInt(DFS_REPLICATION_KEY, DFS_REPLICATION_DEFAULT), diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java index 01006d1aee..3a80206516 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java @@ -529,7 +529,7 @@ public void setTimes(final Path p, final long mtime, final long atime @Override public long getDefaultBlockSize() { - return getConf().getLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, + return getConf().getLongBytes(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/BlockSizeParam.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/BlockSizeParam.java index 4076746e34..b6d82c2c37 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/BlockSizeParam.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/BlockSizeParam.java @@ -55,6 +55,6 @@ public String getName() { /** @return the value or, if it is null, return the default from conf. */ public long getValue(final Configuration conf) { return getValue() != null? getValue() - : conf.getLong(DFS_BLOCK_SIZE_KEY, DFS_BLOCK_SIZE_DEFAULT); + : conf.getLongBytes(DFS_BLOCK_SIZE_KEY, DFS_BLOCK_SIZE_DEFAULT); } } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 668170d4da..2682c7f175 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -341,7 +341,12 @@ creations/deletions), or "all". dfs.blocksize 67108864 - The default block size for new files. + + The default block size for new files, in bytes. + You can use the following suffix (case insensitive): + k(kilo), m(mega), g(giga), t(tera), p(peta), e(exa) to specify the size (such as 128k, 512m, 1g, etc.), + Or provide complete size in bytes (such as 134217728 for 128 MB). + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/resources/TestParam.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/resources/TestParam.java index 9834cb74a4..646edd4282 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/resources/TestParam.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/resources/TestParam.java @@ -51,7 +51,7 @@ public void testBlockSizeParam() { final BlockSizeParam p = new BlockSizeParam(BlockSizeParam.DEFAULT); Assert.assertEquals(null, p.getValue()); Assert.assertEquals( - conf.getLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, + conf.getLongBytes(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT), p.getValue(conf)); diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 3917acae28..5e341a1909 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -52,6 +52,9 @@ Trunk (unreleased changes) MAPREDUCE-2944. Improve checking of input for JobClient.displayTasks() (XieXianshan via harsh) BUG FIXES + MAPREDUCE-3462. Fix Gridmix JUnit testcase failures. + (Ravi Prakash and Ravi Gummadi via amarrk) + MAPREDUCE-3349. Log rack-name in JobHistory for unsuccessful tasks. (Devaraj K and Amar Kamat via amarrk) @@ -175,6 +178,8 @@ Release 0.23.1 - Unreleased MAPREDUCE-3547. Added a bunch of unit tests for the the RM/NM webservices. (Thomas Graves via acmurthy) + MAPREDUCE-3610. Remove use of the 'dfs.block.size' config for default block size fetching. Use FS#getDefaultBlocksize instead. (Sho Shimauchi via harsh) + OPTIMIZATIONS MAPREDUCE-3567. Extraneous JobConf objects in AM heap. (Vinod Kumar @@ -384,6 +389,12 @@ Release 0.23.1 - Unreleased MAPREDUCE-3608. Fixed compile issue with MAPREDUCE-3522. (mahadev via acmurthy) + MAPREDUCE-3490. Fixed MapReduce AM to count failed maps also towards Reduce + ramp up. (Sharad Agarwal and Arun C Murthy via vinodkv) + + MAPREDUCE-1744. DistributedCache creates its own FileSytem instance when + adding a file/archive to the path. (Dick King via tucu) + Release 0.23.0 - 2011-11-01 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java index 183f15156c..f498590fdc 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java @@ -858,8 +858,9 @@ private void makeUberDecision(long dataInputLength) { int sysMaxReduces = 1; long sysMaxBytes = conf.getLong(MRJobConfig.JOB_UBERTASK_MAXBYTES, - conf.getLong("dfs.block.size", 64*1024*1024)); //FIXME: this is - // wrong; get FS from [File?]InputFormat and default block size from that + fs.getDefaultBlockSize()); // FIXME: this is wrong; get FS from + // [File?]InputFormat and default block size + // from that long sysMemSizeForUberSlot = conf.getInt(MRJobConfig.MR_AM_VMEM_MB, diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index d55dc2981f..74e2c1b2a8 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -33,6 +33,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.JobCounter; import org.apache.hadoop.mapreduce.JobID; @@ -122,8 +123,6 @@ added to the pending and are ramped up (added to scheduled) based private boolean recalculateReduceSchedule = false; private int mapResourceReqt;//memory private int reduceResourceReqt;//memory - private int completedMaps = 0; - private int completedReduces = 0; private boolean reduceStarted = false; private float maxReduceRampupLimit = 0; @@ -169,7 +168,13 @@ protected synchronized void heartbeat() throws Exception { if (recalculateReduceSchedule) { preemptReducesIfNeeded(); - scheduleReduces(); + scheduleReduces( + getJob().getTotalMaps(), getJob().getCompletedMaps(), + scheduledRequests.maps.size(), scheduledRequests.reduces.size(), + assignedRequests.maps.size(), assignedRequests.reduces.size(), + mapResourceReqt, reduceResourceReqt, + pendingReduces.size(), + maxReduceRampupLimit, reduceSlowStart); recalculateReduceSchedule = false; } } @@ -180,6 +185,14 @@ public void stop() { LOG.info("Final Stats: " + getStat()); } + public boolean getIsReduceStarted() { + return reduceStarted; + } + + public void setIsReduceStarted(boolean reduceStarted) { + this.reduceStarted = reduceStarted; + } + @SuppressWarnings("unchecked") @Override public synchronized void handle(ContainerAllocatorEvent event) { @@ -319,10 +332,17 @@ private void preemptReducesIfNeeded() { } } } - - private void scheduleReduces() { + + @Private + public void scheduleReduces( + int totalMaps, int completedMaps, + int scheduledMaps, int scheduledReduces, + int assignedMaps, int assignedReduces, + int mapResourceReqt, int reduceResourceReqt, + int numPendingReduces, + float maxReduceRampupLimit, float reduceSlowStart) { - if (pendingReduces.size() == 0) { + if (numPendingReduces == 0) { return; } @@ -330,29 +350,25 @@ private void scheduleReduces() { //if all maps are assigned, then ramp up all reduces irrespective of the //headroom - if (scheduledRequests.maps.size() == 0 && pendingReduces.size() > 0) { - LOG.info("All maps assigned. Ramping up all remaining reduces:" + pendingReduces.size()); - for (ContainerRequest req : pendingReduces) { - scheduledRequests.addReduce(req); - } - pendingReduces.clear(); + if (scheduledMaps == 0 && numPendingReduces > 0) { + LOG.info("All maps assigned. " + + "Ramping up all remaining reduces:" + numPendingReduces); + scheduleAllReduces(); return; } - - int totalMaps = assignedRequests.maps.size() + completedMaps + scheduledRequests.maps.size(); - //check for slow start - if (!reduceStarted) {//not set yet + if (!getIsReduceStarted()) {//not set yet int completedMapsForReduceSlowstart = (int)Math.ceil(reduceSlowStart * totalMaps); if(completedMaps < completedMapsForReduceSlowstart) { LOG.info("Reduce slow start threshold not met. " + - "completedMapsForReduceSlowstart " + completedMapsForReduceSlowstart); + "completedMapsForReduceSlowstart " + + completedMapsForReduceSlowstart); return; } else { LOG.info("Reduce slow start threshold reached. Scheduling reduces."); - reduceStarted = true; + setIsReduceStarted(true); } } @@ -363,20 +379,21 @@ private void scheduleReduces() { completedMapPercent = 1; } - int netScheduledMapMem = scheduledRequests.maps.size() * mapResourceReqt - + assignedRequests.maps.size() * mapResourceReqt; + int netScheduledMapMem = + (scheduledMaps + assignedMaps) * mapResourceReqt; - int netScheduledReduceMem = scheduledRequests.reduces.size() - * reduceResourceReqt + assignedRequests.reduces.size() - * reduceResourceReqt; + int netScheduledReduceMem = + (scheduledReduces + assignedReduces) * reduceResourceReqt; int finalMapMemLimit = 0; int finalReduceMemLimit = 0; // ramp up the reduces based on completed map percentage int totalMemLimit = getMemLimit(); - int idealReduceMemLimit = Math.min((int)(completedMapPercent * totalMemLimit), - (int) (maxReduceRampupLimit * totalMemLimit)); + int idealReduceMemLimit = + Math.min( + (int)(completedMapPercent * totalMemLimit), + (int) (maxReduceRampupLimit * totalMemLimit)); int idealMapMemLimit = totalMemLimit - idealReduceMemLimit; // check if there aren't enough maps scheduled, give the free map capacity @@ -397,29 +414,46 @@ private void scheduleReduces() { " netScheduledMapMem:" + netScheduledMapMem + " netScheduledReduceMem:" + netScheduledReduceMem); - int rampUp = (finalReduceMemLimit - netScheduledReduceMem) - / reduceResourceReqt; + int rampUp = + (finalReduceMemLimit - netScheduledReduceMem) / reduceResourceReqt; if (rampUp > 0) { - rampUp = Math.min(rampUp, pendingReduces.size()); + rampUp = Math.min(rampUp, numPendingReduces); LOG.info("Ramping up " + rampUp); - //more reduce to be scheduled - for (int i = 0; i < rampUp; i++) { - ContainerRequest request = pendingReduces.removeFirst(); - scheduledRequests.addReduce(request); - } + rampUpReduces(rampUp); } else if (rampUp < 0){ int rampDown = -1 * rampUp; - rampDown = Math.min(rampDown, scheduledRequests.reduces.size()); + rampDown = Math.min(rampDown, scheduledReduces); LOG.info("Ramping down " + rampDown); - //remove from the scheduled and move back to pending - for (int i = 0; i < rampDown; i++) { - ContainerRequest request = scheduledRequests.removeReduce(); - pendingReduces.add(request); - } + rampDownReduces(rampDown); } } + private void scheduleAllReduces() { + for (ContainerRequest req : pendingReduces) { + scheduledRequests.addReduce(req); + } + pendingReduces.clear(); + } + + @Private + public void rampUpReduces(int rampUp) { + //more reduce to be scheduled + for (int i = 0; i < rampUp; i++) { + ContainerRequest request = pendingReduces.removeFirst(); + scheduledRequests.addReduce(request); + } + } + + @Private + public void rampDownReduces(int rampDown) { + //remove from the scheduled and move back to pending + for (int i = 0; i < rampDown; i++) { + ContainerRequest request = scheduledRequests.removeReduce(); + pendingReduces.add(request); + } + } + /** * Synchronized to avoid findbugs warnings */ @@ -429,8 +463,8 @@ private synchronized String getStat() { " ScheduledReduces:" + scheduledRequests.reduces.size() + " AssignedMaps:" + assignedRequests.maps.size() + " AssignedReduces:" + assignedRequests.reduces.size() + - " completedMaps:" + completedMaps + - " completedReduces:" + completedReduces + + " completedMaps:" + getJob().getCompletedMaps() + + " completedReduces:" + getJob().getCompletedReduces() + " containersAllocated:" + containersAllocated + " containersReleased:" + containersReleased + " hostLocalAssigned:" + hostLocalAssigned + @@ -497,11 +531,7 @@ private List getResources() throws Exception { + cont.getContainerId()); } else { assignedRequests.remove(attemptID); - if (attemptID.getTaskId().getTaskType().equals(TaskType.MAP)) { - completedMaps++; - } else { - completedReduces++; - } + // send the container completed event to Task attempt eventHandler.handle(new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_CONTAINER_COMPLETED)); @@ -514,7 +544,8 @@ private List getResources() throws Exception { return newContainers; } - private int getMemLimit() { + @Private + public int getMemLimit() { int headRoom = getAvailableResources() != null ? getAvailableResources().getMemory() : 0; return headRoom + assignedRequests.maps.size() * mapResourceReqt + assignedRequests.reduces.size() * reduceResourceReqt; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java index de3909ea42..a4b84b2b53 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java @@ -19,8 +19,7 @@ package org.apache.hadoop.mapreduce.v2.app; import static org.mockito.Matchers.isA; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; import java.io.IOException; import java.util.ArrayList; @@ -1218,6 +1217,70 @@ protected void startAllocatorThread() { } + @Test + public void testReduceScheduling() throws Exception { + int totalMaps = 10; + int succeededMaps = 1; + int scheduledMaps = 10; + int scheduledReduces = 0; + int assignedMaps = 2; + int assignedReduces = 0; + int mapResourceReqt = 1024; + int reduceResourceReqt = 2*1024; + int numPendingReduces = 4; + float maxReduceRampupLimit = 0.5f; + float reduceSlowStart = 0.2f; + + RMContainerAllocator allocator = mock(RMContainerAllocator.class); + doCallRealMethod().when(allocator). + scheduleReduces(anyInt(), anyInt(), anyInt(), anyInt(), anyInt(), + anyInt(), anyInt(), anyInt(), anyInt(), anyFloat(), anyFloat()); + + // Test slow-start + allocator.scheduleReduces( + totalMaps, succeededMaps, + scheduledMaps, scheduledReduces, + assignedMaps, assignedReduces, + mapResourceReqt, reduceResourceReqt, + numPendingReduces, + maxReduceRampupLimit, reduceSlowStart); + verify(allocator, never()).setIsReduceStarted(true); + + succeededMaps = 3; + allocator.scheduleReduces( + totalMaps, succeededMaps, + scheduledMaps, scheduledReduces, + assignedMaps, assignedReduces, + mapResourceReqt, reduceResourceReqt, + numPendingReduces, + maxReduceRampupLimit, reduceSlowStart); + verify(allocator, times(1)).setIsReduceStarted(true); + + // Test reduce ramp-up + doReturn(100 * 1024).when(allocator).getMemLimit(); + allocator.scheduleReduces( + totalMaps, succeededMaps, + scheduledMaps, scheduledReduces, + assignedMaps, assignedReduces, + mapResourceReqt, reduceResourceReqt, + numPendingReduces, + maxReduceRampupLimit, reduceSlowStart); + verify(allocator).rampUpReduces(anyInt()); + verify(allocator, never()).rampDownReduces(anyInt()); + + // Test reduce ramp-down + scheduledReduces = 3; + doReturn(10 * 1024).when(allocator).getMemLimit(); + allocator.scheduleReduces( + totalMaps, succeededMaps, + scheduledMaps, scheduledReduces, + assignedMaps, assignedReduces, + mapResourceReqt, reduceResourceReqt, + numPendingReduces, + maxReduceRampupLimit, reduceSlowStart); + verify(allocator).rampDownReduces(anyInt()); + } + public static void main(String[] args) throws Exception { TestRMContainerAllocator t = new TestRMContainerAllocator(); t.testSimple(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java index e4351536c8..88de3f88c1 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java @@ -1030,7 +1030,7 @@ public void addCacheFile(URI uri) { public void addFileToClassPath(Path file) throws IOException { ensureState(JobState.DEFINE); - DistributedCache.addFileToClassPath(file, conf); + DistributedCache.addFileToClassPath(file, conf, file.getFileSystem(conf)); } /** @@ -1045,7 +1045,7 @@ public void addFileToClassPath(Path file) public void addArchiveToClassPath(Path archive) throws IOException { ensureState(JobState.DEFINE); - DistributedCache.addArchiveToClassPath(archive, conf); + DistributedCache.addArchiveToClassPath(archive, conf, archive.getFileSystem(conf)); } /** diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java index 8b1d3a61f4..4040358142 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java @@ -269,7 +269,7 @@ public static void addCacheFile(URI uri, Configuration conf) { /** * Add an file path to the current set of classpath entries It adds the file * to cache as well. Intended to be used by user code. - * + * * @param file Path of the file to be added * @param conf Configuration that contains the classpath setting * @deprecated Use {@link Job#addFileToClassPath(Path)} instead @@ -277,12 +277,25 @@ public static void addCacheFile(URI uri, Configuration conf) { @Deprecated public static void addFileToClassPath(Path file, Configuration conf) throws IOException { + addFileToClassPath(file, conf, file.getFileSystem(conf)); + } + + /** + * Add a file path to the current set of classpath entries. It adds the file + * to cache as well. Intended to be used by user code. + * + * @param file Path of the file to be added + * @param conf Configuration that contains the classpath setting + * @param fs FileSystem with respect to which {@code archivefile} should + * be interpreted. + */ + public static void addFileToClassPath + (Path file, Configuration conf, FileSystem fs) + throws IOException { String classpath = conf.get(MRJobConfig.CLASSPATH_FILES); conf.set(MRJobConfig.CLASSPATH_FILES, classpath == null ? file.toString() : classpath + "," + file.toString()); - FileSystem fs = FileSystem.get(conf); URI uri = fs.makeQualified(file).toUri(); - addCacheFile(uri, conf); } @@ -318,10 +331,23 @@ public static Path[] getFileClassPaths(Configuration conf) { @Deprecated public static void addArchiveToClassPath(Path archive, Configuration conf) throws IOException { + addArchiveToClassPath(archive, conf, archive.getFileSystem(conf)); + } + + /** + * Add an archive path to the current set of classpath entries. It adds the + * archive to cache as well. Intended to be used by user code. + * + * @param archive Path of the archive to be added + * @param conf Configuration that contains the classpath setting + * @param fs FileSystem with respect to which {@code archive} should be interpreted. + */ + public static void addArchiveToClassPath + (Path archive, Configuration conf, FileSystem fs) + throws IOException { String classpath = conf.get(MRJobConfig.CLASSPATH_ARCHIVES); conf.set(MRJobConfig.CLASSPATH_ARCHIVES, classpath == null ? archive .toString() : classpath + "," + archive.toString()); - FileSystem fs = FileSystem.get(conf); URI uri = fs.makeQualified(archive).toUri(); addCacheArchive(uri, conf); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMROldApiJobs.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMROldApiJobs.java index 79a14fc40e..7af592297b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMROldApiJobs.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMROldApiJobs.java @@ -196,7 +196,7 @@ static boolean runJob(JobConf conf, Path inDir, Path outDir, int numMaps, file.close(); } - DistributedCache.addFileToClassPath(TestMRJobs.APP_JAR, conf); + DistributedCache.addFileToClassPath(TestMRJobs.APP_JAR, conf, fs); conf.setOutputCommitter(CustomOutputCommitter.class); conf.setInputFormat(TextInputFormat.class); conf.setOutputKeyClass(LongWritable.class); diff --git a/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestCompressionEmulationUtils.java b/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestCompressionEmulationUtils.java index d1c1b9881d..51071a07a0 100644 --- a/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestCompressionEmulationUtils.java +++ b/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestCompressionEmulationUtils.java @@ -105,6 +105,7 @@ public void testRandomCompressedTextDataGenerator() throws Exception { conf.setInt(RandomTextDataGenerator.GRIDMIX_DATAGEN_RANDOMTEXT_WORDSIZE, wordSize); conf.setLong(GenerateData.GRIDMIX_GEN_BYTES, dataSize); + conf.set("mapreduce.job.hdfs-servers", ""); FileSystem lfs = FileSystem.getLocal(conf); @@ -192,6 +193,7 @@ private void testCompressionRatioConfigure(float ratio) CompressionEmulationUtil.setInputCompressionEmulationEnabled(conf, true); conf.setLong(GenerateData.GRIDMIX_GEN_BYTES, dataSize); + conf.set("mapreduce.job.hdfs-servers", ""); float expectedRatio = CompressionEmulationUtil.DEFAULT_COMPRESSION_RATIO; if (ratio > 0) { diff --git a/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestDistCacheEmulation.java b/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestDistCacheEmulation.java index 163cc8073b..0f1c7e2f71 100644 --- a/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestDistCacheEmulation.java +++ b/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestDistCacheEmulation.java @@ -141,6 +141,7 @@ private long[] configureDummyDistCacheFiles(Configuration conf, boolean useOldProperties) throws IOException { String user = UserGroupInformation.getCurrentUser().getShortUserName(); conf.set(MRJobConfig.USER_NAME, user); + conf.set("mapreduce.job.hdfs-servers", ""); // Set some dummy dist cache files in gridmix configuration so that they go // into the configuration of JobStory objects. String[] distCacheFiles = {"hdfs:///tmp/file1.txt", diff --git a/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java b/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java index 2815f248e3..802745522f 100644 --- a/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java +++ b/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java @@ -521,6 +521,7 @@ private void doSubmission(boolean useDefaultQueue, DebugGridmix client = new DebugGridmix(); conf = new Configuration(); conf.setEnum(GridmixJobSubmissionPolicy.JOB_SUBMISSION_POLICY,policy); + conf.set("mapreduce.job.hdfs-servers", ""); if (useDefaultQueue) { conf.setBoolean(GridmixJob.GRIDMIX_USE_QUEUE_IN_TRACE, false); conf.set(GridmixJob.GRIDMIX_DEFAULT_QUEUE, "q1"); diff --git a/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestSleepJob.java b/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestSleepJob.java index cf39710bd8..84f292e770 100644 --- a/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestSleepJob.java +++ b/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestSleepJob.java @@ -205,6 +205,7 @@ public void testMapTasksOnlySleepJobs() throws Exception { Configuration conf = new Configuration(); conf.setBoolean(SleepJob.SLEEPJOB_MAPTASK_ONLY, true); + conf.set("mapreduce.job.hdfs-servers", ""); DebugJobProducer jobProducer = new DebugJobProducer(5, conf); JobConf jconf = GridmixTestUtils.mrCluster.createJobConf(new JobConf(conf)); UserGroupInformation ugi = UserGroupInformation.getLoginUser(); @@ -253,6 +254,7 @@ private void doSubmission(String...optional) throws Exception { DebugGridmix client = new DebugGridmix(); conf = new Configuration(); conf.setEnum(GridmixJobSubmissionPolicy.JOB_SUBMISSION_POLICY, policy); + conf.set("mapreduce.job.hdfs-servers", ""); conf = GridmixTestUtils.mrCluster.createJobConf(new JobConf(conf)); // allow synthetic users to create home directories GridmixTestUtils.dfs.mkdirs(root, new FsPermission((short) 0777)); diff --git a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java b/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java index 4915be5f86..9369ffc740 100644 --- a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java +++ b/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java @@ -1144,7 +1144,7 @@ private void splitRealFiles(String[] args) throws IOException { if (!(fs instanceof DistributedFileSystem)) { throw new IOException("Wrong file system: " + fs.getClass().getName()); } - int blockSize = conf.getInt("dfs.block.size", 128 * 1024 * 1024); + long blockSize = fs.getDefaultBlockSize(); DummyInputFormat inFormat = new DummyInputFormat(); for (int i = 0; i < args.length; i++) {