From 302e3dfe2c9cd6640996066e890955b06d71bff5 Mon Sep 17 00:00:00 2001 From: Alejandro Abdelnur Date: Thu, 17 Nov 2011 21:13:29 +0000 Subject: [PATCH 01/10] MAPREDUCE-3169. Create a new MiniMRCluster equivalent which only provides client APIs cross MR1 and MR2. (Ahmed via tucu) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1203371 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../hadoop/mapred/MiniMRClientCluster.java | 38 +++ .../mapred/MiniMRClientClusterFactory.java | 70 +++++ .../apache/hadoop/mapred/MiniMRCluster.java | 262 ++++++++++++++++++ .../mapred/MiniMRYarnClusterAdapter.java | 51 ++++ .../mapred/TestMiniMRClientCluster.java | 170 ++++++++++++ 6 files changed, 594 insertions(+) create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRClientCluster.java create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRClientClusterFactory.java create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRCluster.java create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRYarnClusterAdapter.java create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMiniMRClientCluster.java diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index e4e086cb80..f64af4a4a2 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -34,6 +34,9 @@ Trunk (unreleased changes) MAPREDUCE-3149. Add a test to verify that TokenCache handles file system uri with no authority. (John George via jitendra) + MAPREDUCE-3169. Create a new MiniMRCluster equivalent which only provides + client APIs cross MR1 and MR2. (Ahmed via tucu) + BUG FIXES MAPREDUCE-3346. [Rumen] LoggedTaskAttempt#getHostName() returns null. (amarrk) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRClientCluster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRClientCluster.java new file mode 100644 index 0000000000..dc4687b580 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRClientCluster.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; + +/* + * A simple interface for a client MR cluster used for testing. This interface + * provides basic methods which are independent of the underlying Mini Cluster ( + * either through MR1 or MR2). + */ +public interface MiniMRClientCluster { + + public void start() throws IOException; + + public void stop() throws IOException; + + public Configuration getConfig() throws IOException; + +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRClientClusterFactory.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRClientClusterFactory.java new file mode 100644 index 0000000000..3e4b34b556 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRClientClusterFactory.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster; + +/** + * A MiniMRCluster factory. In MR2, it provides a wrapper MiniMRClientCluster + * interface around the MiniMRYarnCluster. While in MR1, it provides such + * wrapper around MiniMRCluster. This factory should be used in tests to provide + * an easy migration of tests across MR1 and MR2. + */ +public class MiniMRClientClusterFactory { + + public static MiniMRClientCluster create(Class caller, int noOfNMs, + Configuration conf) throws IOException { + + if (conf == null) { + conf = new Configuration(); + } + + FileSystem fs = FileSystem.get(conf); + + Path testRootDir = new Path("target", caller.getName() + "-tmpDir") + .makeQualified(fs); + Path appJar = new Path(testRootDir, "MRAppJar.jar"); + + // Copy MRAppJar and make it private. + Path appMasterJar = new Path(MiniMRYarnCluster.APPJAR); + + fs.copyFromLocalFile(appMasterJar, appJar); + fs.setPermission(appJar, new FsPermission("700")); + + Job job = Job.getInstance(conf); + + job.addFileToClassPath(appJar); + job.setJarByClass(caller); + + MiniMRYarnCluster miniMRYarnCluster = new MiniMRYarnCluster(caller + .getName(), noOfNMs); + miniMRYarnCluster.init(job.getConfiguration()); + miniMRYarnCluster.start(); + + return new MiniMRYarnClusterAdapter(miniMRYarnCluster); + } + +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRCluster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRCluster.java new file mode 100644 index 0000000000..a73b959ec3 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRCluster.java @@ -0,0 +1,262 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.security.UserGroupInformation; + +/** + * This class is an MR2 replacement for older MR1 MiniMRCluster, that was used + * by tests prior to MR2. This replacement class uses the new MiniMRYarnCluster + * in MR2 but provides the same old MR1 interface, so tests can be migrated from + * MR1 to MR2 with minimal changes. + * + * Due to major differences between MR1 and MR2, a number of methods are either + * unimplemented/unsupported or were re-implemented to provide wrappers around + * MR2 functionality. + */ +public class MiniMRCluster { + private static final Log LOG = LogFactory.getLog(MiniMRCluster.class); + + private MiniMRClientCluster mrClientCluster; + + public String getTaskTrackerLocalDir(int taskTracker) { + throw new UnsupportedOperationException(); + } + + public String[] getTaskTrackerLocalDirs(int taskTracker) { + throw new UnsupportedOperationException(); + } + + class JobTrackerRunner { + // Mock class + } + + class TaskTrackerRunner { + // Mock class + } + + public JobTrackerRunner getJobTrackerRunner() { + throw new UnsupportedOperationException(); + } + + TaskTrackerRunner getTaskTrackerRunner(int id) { + throw new UnsupportedOperationException(); + } + + public int getNumTaskTrackers() { + throw new UnsupportedOperationException(); + } + + public void setInlineCleanupThreads() { + throw new UnsupportedOperationException(); + } + + public void waitUntilIdle() { + throw new UnsupportedOperationException(); + } + + private void waitTaskTrackers() { + throw new UnsupportedOperationException(); + } + + public int getJobTrackerPort() { + throw new UnsupportedOperationException(); + } + + public JobConf createJobConf() { + JobConf jobConf = null; + try { + jobConf = new JobConf(mrClientCluster.getConfig()); + } catch (IOException e) { + LOG.error(e); + } + return jobConf; + } + + public JobConf createJobConf(JobConf conf) { + JobConf jobConf = null; + try { + jobConf = new JobConf(mrClientCluster.getConfig()); + } catch (IOException e) { + LOG.error(e); + } + return jobConf; + } + + static JobConf configureJobConf(JobConf conf, String namenode, + int jobTrackerPort, int jobTrackerInfoPort, UserGroupInformation ugi) { + throw new UnsupportedOperationException(); + } + + public MiniMRCluster(int numTaskTrackers, String namenode, int numDir, + String[] racks, String[] hosts) throws IOException { + this(0, 0, numTaskTrackers, namenode, numDir, racks, hosts); + } + + public MiniMRCluster(int numTaskTrackers, String namenode, int numDir, + String[] racks, String[] hosts, JobConf conf) throws IOException { + this(0, 0, numTaskTrackers, namenode, numDir, racks, hosts, null, conf); + } + + public MiniMRCluster(int numTaskTrackers, String namenode, int numDir) + throws IOException { + this(0, 0, numTaskTrackers, namenode, numDir); + } + + public MiniMRCluster(int jobTrackerPort, int taskTrackerPort, + int numTaskTrackers, String namenode, int numDir) throws IOException { + this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode, numDir, + null); + } + + public MiniMRCluster(int jobTrackerPort, int taskTrackerPort, + int numTaskTrackers, String namenode, int numDir, String[] racks) + throws IOException { + this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode, numDir, + racks, null); + } + + public MiniMRCluster(int jobTrackerPort, int taskTrackerPort, + int numTaskTrackers, String namenode, int numDir, String[] racks, + String[] hosts) throws IOException { + this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode, numDir, + racks, hosts, null); + } + + public MiniMRCluster(int jobTrackerPort, int taskTrackerPort, + int numTaskTrackers, String namenode, int numDir, String[] racks, + String[] hosts, UserGroupInformation ugi) throws IOException { + this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode, numDir, + racks, hosts, ugi, null); + } + + public MiniMRCluster(int jobTrackerPort, int taskTrackerPort, + int numTaskTrackers, String namenode, int numDir, String[] racks, + String[] hosts, UserGroupInformation ugi, JobConf conf) + throws IOException { + this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode, numDir, + racks, hosts, ugi, conf, 0); + } + + public MiniMRCluster(int jobTrackerPort, int taskTrackerPort, + int numTaskTrackers, String namenode, int numDir, String[] racks, + String[] hosts, UserGroupInformation ugi, JobConf conf, + int numTrackerToExclude) throws IOException { + this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode, numDir, + racks, hosts, ugi, conf, numTrackerToExclude, new Clock()); + } + + public MiniMRCluster(int jobTrackerPort, int taskTrackerPort, + int numTaskTrackers, String namenode, int numDir, String[] racks, + String[] hosts, UserGroupInformation ugi, JobConf conf, + int numTrackerToExclude, Clock clock) throws IOException { + if (conf == null) conf = new JobConf(); + FileSystem.setDefaultUri(conf, namenode); + mrClientCluster = MiniMRClientClusterFactory.create(this.getClass(), + numTaskTrackers, conf); + } + + public UserGroupInformation getUgi() { + throw new UnsupportedOperationException(); + } + + public TaskCompletionEvent[] getTaskCompletionEvents(JobID id, int from, + int max) throws IOException { + throw new UnsupportedOperationException(); + } + + public void setJobPriority(JobID jobId, JobPriority priority) + throws AccessControlException, IOException { + throw new UnsupportedOperationException(); + } + + public JobPriority getJobPriority(JobID jobId) { + throw new UnsupportedOperationException(); + } + + public long getJobFinishTime(JobID jobId) { + throw new UnsupportedOperationException(); + } + + public void initializeJob(JobID jobId) throws IOException { + throw new UnsupportedOperationException(); + } + + public MapTaskCompletionEventsUpdate getMapTaskCompletionEventsUpdates( + int index, JobID jobId, int max) throws IOException { + throw new UnsupportedOperationException(); + } + + public JobConf getJobTrackerConf() { + JobConf jobConf = null; + try { + jobConf = new JobConf(mrClientCluster.getConfig()); + } catch (IOException e) { + LOG.error(e); + } + return jobConf; + } + + public int getFaultCount(String hostName) { + throw new UnsupportedOperationException(); + } + + public void startJobTracker() { + // Do nothing + } + + public void startJobTracker(boolean wait) { + // Do nothing + } + + public void stopJobTracker() { + // Do nothing + } + + public void stopTaskTracker(int id) { + // Do nothing + } + + public void startTaskTracker(String host, String rack, int idx, int numDir) + throws IOException { + // Do nothing + } + + void addTaskTracker(TaskTrackerRunner taskTracker) { + throw new UnsupportedOperationException(); + } + + int getTaskTrackerID(String trackerName) { + throw new UnsupportedOperationException(); + } + + public void shutdown() { + try { + mrClientCluster.stop(); + } catch (IOException e) { + LOG.error(e); + } + } + +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRYarnClusterAdapter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRYarnClusterAdapter.java new file mode 100644 index 0000000000..81329a97c3 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRYarnClusterAdapter.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster; + +/** + * An adapter for MiniMRYarnCluster providing a MiniMRClientCluster interface. + * This interface could be used by tests across both MR1 and MR2. + */ +public class MiniMRYarnClusterAdapter implements MiniMRClientCluster { + + private MiniMRYarnCluster miniMRYarnCluster; + + public MiniMRYarnClusterAdapter(MiniMRYarnCluster miniMRYarnCluster) { + this.miniMRYarnCluster = miniMRYarnCluster; + } + + @Override + public Configuration getConfig() { + return miniMRYarnCluster.getConfig(); + } + + @Override + public void start() { + miniMRYarnCluster.start(); + } + + @Override + public void stop() { + miniMRYarnCluster.stop(); + } + +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMiniMRClientCluster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMiniMRClientCluster.java new file mode 100644 index 0000000000..ddadac9900 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMiniMRClientCluster.java @@ -0,0 +1,170 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.StringTokenizer; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Counters; +import org.apache.hadoop.mapreduce.Job; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Basic testing for the MiniMRClientCluster. This test shows an example class + * that can be used in MR1 or MR2, without any change to the test. The test will + * use MiniMRYarnCluster in MR2, and MiniMRCluster in MR1. + */ +public class TestMiniMRClientCluster { + + private static Path inDir = null; + private static Path outDir = null; + private static Path testdir = null; + private static Path[] inFiles = new Path[5]; + private static MiniMRClientCluster mrCluster; + + @BeforeClass + public static void setup() throws IOException { + final Configuration conf = new Configuration(); + final Path TEST_ROOT_DIR = new Path(System.getProperty("test.build.data", + "/tmp")); + testdir = new Path(TEST_ROOT_DIR, "TestMiniMRClientCluster"); + inDir = new Path(testdir, "in"); + outDir = new Path(testdir, "out"); + + FileSystem fs = FileSystem.getLocal(conf); + if (fs.exists(testdir) && !fs.delete(testdir, true)) { + throw new IOException("Could not delete " + testdir); + } + if (!fs.mkdirs(inDir)) { + throw new IOException("Mkdirs failed to create " + inDir); + } + + for (int i = 0; i < inFiles.length; i++) { + inFiles[i] = new Path(inDir, "part_" + i); + createFile(inFiles[i], conf); + } + + // create the mini cluster to be used for the tests + mrCluster = MiniMRClientClusterFactory.create( + TestMiniMRClientCluster.class, 1, new Configuration()); + } + + @AfterClass + public static void cleanup() throws IOException { + // clean up the input and output files + final Configuration conf = new Configuration(); + final FileSystem fs = testdir.getFileSystem(conf); + if (fs.exists(testdir)) { + fs.delete(testdir, true); + } + // stopping the mini cluster + mrCluster.stop(); + } + + @Test + public void testJob() throws Exception { + final Job job = createJob(); + org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setInputPaths(job, + inDir); + org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.setOutputPath(job, + new Path(outDir, "testJob")); + assertTrue(job.waitForCompletion(true)); + validateCounters(job.getCounters(), 5, 25, 5, 5); + } + + private void validateCounters(Counters counters, long mapInputRecords, + long mapOutputRecords, long reduceInputGroups, long reduceOutputRecords) { + assertEquals("MapInputRecords", mapInputRecords, counters.findCounter( + "MyCounterGroup", "MAP_INPUT_RECORDS").getValue()); + assertEquals("MapOutputRecords", mapOutputRecords, counters.findCounter( + "MyCounterGroup", "MAP_OUTPUT_RECORDS").getValue()); + assertEquals("ReduceInputGroups", reduceInputGroups, counters.findCounter( + "MyCounterGroup", "REDUCE_INPUT_GROUPS").getValue()); + assertEquals("ReduceOutputRecords", reduceOutputRecords, counters + .findCounter("MyCounterGroup", "REDUCE_OUTPUT_RECORDS").getValue()); + } + + private static void createFile(Path inFile, Configuration conf) + throws IOException { + final FileSystem fs = inFile.getFileSystem(conf); + if (fs.exists(inFile)) { + return; + } + FSDataOutputStream out = fs.create(inFile); + out.writeBytes("This is a test file"); + out.close(); + } + + public static Job createJob() throws IOException { + final Job baseJob = new Job(mrCluster.getConfig()); + baseJob.setOutputKeyClass(Text.class); + baseJob.setOutputValueClass(IntWritable.class); + baseJob.setMapperClass(MyMapper.class); + baseJob.setReducerClass(MyReducer.class); + baseJob.setNumReduceTasks(1); + return baseJob; + } + + public static class MyMapper extends + org.apache.hadoop.mapreduce.Mapper { + private final static IntWritable one = new IntWritable(1); + private Text word = new Text(); + + public void map(Object key, Text value, Context context) + throws IOException, InterruptedException { + context.getCounter("MyCounterGroup", "MAP_INPUT_RECORDS").increment(1); + StringTokenizer iter = new StringTokenizer(value.toString()); + while (iter.hasMoreTokens()) { + word.set(iter.nextToken()); + context.write(word, one); + context.getCounter("MyCounterGroup", "MAP_OUTPUT_RECORDS").increment(1); + } + } + } + + public static class MyReducer extends + org.apache.hadoop.mapreduce.Reducer { + private IntWritable result = new IntWritable(); + + public void reduce(Text key, Iterable values, Context context) + throws IOException, InterruptedException { + context.getCounter("MyCounterGroup", "REDUCE_INPUT_GROUPS").increment(1); + int sum = 0; + for (IntWritable val : values) { + sum += val.get(); + } + result.set(sum); + context.write(key, result); + context.getCounter("MyCounterGroup", "REDUCE_OUTPUT_RECORDS") + .increment(1); + } + } + +} From ed45ad0fcc5f510717003667df79702a616c99fc Mon Sep 17 00:00:00 2001 From: Alejandro Abdelnur Date: Thu, 17 Nov 2011 22:58:44 +0000 Subject: [PATCH 02/10] MAPREDUCE-3415. improve MiniMRYarnCluster & DistributedShell JAR resolution (tucu) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1203411 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/hadoop/util/JarFinder.java | 146 ++++++++++++++++++ .../org/apache/hadoop/util/TestJarFinder.java | 42 +++++ hadoop-mapreduce-project/CHANGES.txt | 3 + .../hadoop-mapreduce-client-jobclient/pom.xml | 7 - .../mapred/MiniMRClientClusterFactory.java | 4 +- .../mapreduce/v2/MiniMRYarnCluster.java | 11 +- .../pom.xml | 5 +- .../TestDistributedShell.java | 5 +- 8 files changed, 202 insertions(+), 21 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/JarFinder.java create mode 100644 hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestJarFinder.java diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/JarFinder.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/JarFinder.java new file mode 100644 index 0000000000..37a561eaf7 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/JarFinder.java @@ -0,0 +1,146 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. See accompanying LICENSE file. + */ +package org.apache.hadoop.util; + +import com.google.common.base.Preconditions; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.URL; +import java.net.URLDecoder; +import java.text.MessageFormat; +import java.util.Enumeration; +import java.util.jar.JarOutputStream; +import java.util.jar.Manifest; +import java.util.zip.ZipEntry; +import java.util.zip.ZipOutputStream; + +/** + * Finds the Jar for a class. If the class is in a directory in the + * classpath, it creates a Jar on the fly with the contents of the directory + * and returns the path to that Jar. If a Jar is created, it is created in + * the system temporary directory. + */ +public class JarFinder { + + private static void zipDir(File dir, String relativePath, ZipOutputStream zos) + throws IOException { + Preconditions.checkNotNull(relativePath, "relativePath"); + Preconditions.checkNotNull(zos, "zos"); + zipDir(dir, relativePath, zos, true); + zos.close(); + } + + private static void zipDir(File dir, String relativePath, ZipOutputStream zos, + boolean start) throws IOException { + String[] dirList = dir.list(); + for (String aDirList : dirList) { + File f = new File(dir, aDirList); + if (!f.isHidden()) { + if (f.isDirectory()) { + if (!start) { + ZipEntry dirEntry = new ZipEntry(relativePath + f.getName() + "/"); + zos.putNextEntry(dirEntry); + zos.closeEntry(); + } + String filePath = f.getPath(); + File file = new File(filePath); + zipDir(file, relativePath + f.getName() + "/", zos, false); + } + else { + ZipEntry anEntry = new ZipEntry(relativePath + f.getName()); + zos.putNextEntry(anEntry); + InputStream is = new FileInputStream(f); + byte[] arr = new byte[4096]; + int read = is.read(arr); + while (read > -1) { + zos.write(arr, 0, read); + read = is.read(arr); + } + is.close(); + zos.closeEntry(); + } + } + } + } + + private static void createJar(File dir, File jarFile) throws IOException { + Preconditions.checkNotNull(dir, "dir"); + Preconditions.checkNotNull(jarFile, "jarFile"); + File jarDir = jarFile.getParentFile(); + if (!jarDir.exists()) { + if (!jarDir.mkdirs()) { + throw new IOException(MessageFormat.format("could not create dir [{0}]", + jarDir)); + } + } + JarOutputStream zos = new JarOutputStream(new FileOutputStream(jarFile), + new Manifest()); + zipDir(dir, "", zos); + } + + /** + * Returns the full path to the Jar containing the class. It always return a + * JAR. + * + * @param klass class. + * + * @return path to the Jar containing the class. + */ + public static String getJar(Class klass) { + Preconditions.checkNotNull(klass, "klass"); + ClassLoader loader = klass.getClassLoader(); + if (loader != null) { + String class_file = klass.getName().replaceAll("\\.", "/") + ".class"; + try { + for (Enumeration itr = loader.getResources(class_file); + itr.hasMoreElements(); ) { + URL url = (URL) itr.nextElement(); + String path = url.getPath(); + if (path.startsWith("file:")) { + path = path.substring("file:".length()); + } + path = URLDecoder.decode(path, "UTF-8"); + if ("jar".equals(url.getProtocol())) { + path = URLDecoder.decode(path, "UTF-8"); + return path.replaceAll("!.*$", ""); + } + else if ("file".equals(url.getProtocol())) { + String klassName = klass.getName(); + klassName = klassName.replace(".", "/") + ".class"; + path = path.substring(0, path.length() - klassName.length()); + File baseDir = new File(path); + File testDir = new File(System.getProperty("test.build.dir", "target/test-dir")); + testDir = testDir.getAbsoluteFile(); + if (!testDir.exists()) { + testDir.mkdirs(); + } + File tempJar = File.createTempFile("hadoop-", "", testDir); + tempJar = new File(tempJar.getAbsolutePath() + ".jar"); + createJar(baseDir, tempJar); + return tempJar.getAbsolutePath(); + } + } + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + return null; + } + +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestJarFinder.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestJarFinder.java new file mode 100644 index 0000000000..a311a9f712 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestJarFinder.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.util; + +import org.apache.commons.logging.LogFactory; +import org.junit.Assert; +import org.junit.Test; + +import java.io.File; + +public class TestJarFinder { + + @Test + public void testAppend() throws Exception { + + //picking a class that is for sure in a JAR in the classpath + String jar = JarFinder.getJar(LogFactory.class); + Assert.assertTrue(new File(jar).exists()); + + //picking a class that is for sure in a directory in the classpath + //in this case the JAR is created on the fly + jar = JarFinder.getJar(TestJarFinder.class); + Assert.assertTrue(new File(jar).exists()); + } + +} diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index f64af4a4a2..d2f0e190e5 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -37,6 +37,9 @@ Trunk (unreleased changes) MAPREDUCE-3169. Create a new MiniMRCluster equivalent which only provides client APIs cross MR1 and MR2. (Ahmed via tucu) + MAPREDUCE-3415. improve MiniMRYarnCluster & DistributedShell JAR resolution. + (tucu) + BUG FIXES MAPREDUCE-3346. [Rumen] LoggedTaskAttempt#getHostName() returns null. (amarrk) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml index d1f06ce9b1..29007bd4fd 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml @@ -101,16 +101,9 @@ org.apache.maven.plugins maven-surefire-plugin - - ${project.parent.basedir}/hadoop-mapreduce-client-app/target/hadoop-mapreduce-client-app-${project.version}.jar - ${java.home} - - - ${project.build.directory}/${project.artifactId}-${project.version}-tests.jar - diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRClientClusterFactory.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRClientClusterFactory.java index 3e4b34b556..7ecfc67164 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRClientClusterFactory.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRClientClusterFactory.java @@ -26,6 +26,7 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster; +import org.apache.hadoop.util.JarFinder; /** * A MiniMRCluster factory. In MR2, it provides a wrapper MiniMRClientCluster @@ -57,7 +58,8 @@ public static MiniMRClientCluster create(Class caller, int noOfNMs, Job job = Job.getInstance(conf); job.addFileToClassPath(appJar); - job.setJarByClass(caller); + String callerJar = JarFinder.getJar(caller); + job.setJar(callerJar); MiniMRYarnCluster miniMRYarnCluster = new MiniMRYarnCluster(caller .getName(), noOfNMs); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java index d4d9bb8c3a..845d64f800 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java @@ -24,13 +24,13 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.LocalContainerLauncher; import org.apache.hadoop.mapred.ShuffleHandler; import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.v2.hs.JobHistoryServer; import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; +import org.apache.hadoop.util.JarFinder; import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.MiniYARNCluster; @@ -45,8 +45,7 @@ */ public class MiniMRYarnCluster extends MiniYARNCluster { - public static final String APPJAR = System.getProperty("yarn.mr.jar", JobConf - .findContainingJar(LocalContainerLauncher.class)); + public static final String APPJAR = JarFinder.getJar(LocalContainerLauncher.class); private static final Log LOG = LogFactory.getLog(MiniMRYarnCluster.class); private JobHistoryServer historyServer; @@ -55,7 +54,7 @@ public class MiniMRYarnCluster extends MiniYARNCluster { public MiniMRYarnCluster(String testName) { this(testName, 1); } - + public MiniMRYarnCluster(String testName, int noOfNMs) { super(testName, noOfNMs); //TODO: add the history server @@ -88,9 +87,9 @@ public void init(Configuration conf) { conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); // Set config for JH Server - conf.set(JHAdminConfig.MR_HISTORY_ADDRESS, + conf.set(JHAdminConfig.MR_HISTORY_ADDRESS, JHAdminConfig.DEFAULT_MR_HISTORY_ADDRESS); - + super.init(conf); } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml index a6aff52546..a56d2b6a54 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml @@ -82,7 +82,7 @@ jar - test-compile @@ -109,9 +109,6 @@ org.apache.maven.plugins maven-surefire-plugin - - ${project.build.directory}/${project.artifactId}-${project.version}.jar - ${java.home} diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index 78d9869ef5..472c959eda 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -23,7 +23,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.util.JarFinder; import org.apache.hadoop.yarn.server.MiniYARNCluster; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -37,8 +37,7 @@ public class TestDistributedShell { protected static MiniYARNCluster yarnCluster = null; protected static Configuration conf = new Configuration(); - protected static String APPMASTER_JAR = System.getProperty("yarn.ds.jar", - JobConf.findContainingJar(ApplicationMaster.class)); + protected static String APPMASTER_JAR = JarFinder.getJar(ApplicationMaster.class); @BeforeClass public static void setup() throws InterruptedException, IOException { From 7edfff57954b382926d7c771dc13f0e8a70f215f Mon Sep 17 00:00:00 2001 From: Thomas White Date: Fri, 18 Nov 2011 00:19:53 +0000 Subject: [PATCH 03/10] =?UTF-8?q?HADOOP-7787.=20Make=20source=20tarball=20?= =?UTF-8?q?use=20conventional=20name.=20Contributed=20by=20Bruno=20Mah?= =?UTF-8?q?=C3=A9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1203437 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-common-project/hadoop-common/CHANGES.txt | 3 +++ pom.xml | 4 ++-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 77fa3c68b6..74d683b505 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -117,6 +117,9 @@ Release 0.23.1 - Unreleased HADOOP-7811. TestUserGroupInformation#testGetServerSideGroups test fails in chroot. (Jonathan Eagles via mahadev) + HADOOP-7787. Make source tarball use conventional name. + (Bruno Mahé via tomwhite) + Release 0.23.0 - 2011-11-01 INCOMPATIBLE CHANGES diff --git a/pom.xml b/pom.xml index 23b9d69157..f960b8d0b5 100644 --- a/pom.xml +++ b/pom.xml @@ -264,7 +264,7 @@ false false - hadoop-dist-${project.version}-src + hadoop-${project.version}-src hadoop-dist/target @@ -288,7 +288,7 @@ - Hadoop source tar available at: ${basedir}/hadoop-dist/target/hadoop-dist-${project.version}-src.tar.gz + Hadoop source tar available at: ${basedir}/hadoop-dist/target/hadoop-${project.version}-src.tar.gz From 0864ef19089f703232107d8aa26c4a7571ff132e Mon Sep 17 00:00:00 2001 From: Todd Lipcon Date: Fri, 18 Nov 2011 00:45:31 +0000 Subject: [PATCH 04/10] HDFS-2560. Refactor BPOfferService to be a static inner class. Contributed by Todd Lipcon. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1203444 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 1 + .../block/BlockPoolTokenSecretManager.java | 4 + .../hadoop/hdfs/server/datanode/DataNode.java | 365 ++++++++++-------- .../server/datanode/DataNodeTestUtils.java | 2 +- 4 files changed, 212 insertions(+), 160 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index f9ed841355..0e5147011e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -107,6 +107,7 @@ Release 0.23.1 - UNRELEASED NEW FEATURES IMPROVEMENTS + HDFS-2560. Refactor BPOfferService to be a static inner class (todd) OPTIMIZATIONS diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java index 2cb8b41ffd..05fba79fe6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java @@ -55,6 +55,10 @@ synchronized BlockTokenSecretManager get(String bpid) { } return secretMgr; } + + public synchronized boolean isBlockPoolRegistered(String bpid) { + return map.containsKey(bpid); + } /** Return an empty BlockTokenIdentifer */ @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index d496c6a2cc..32fe56c63f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -136,6 +136,7 @@ import org.apache.hadoop.hdfs.server.common.IncorrectVersionException; import org.apache.hadoop.hdfs.server.common.JspHelper; import org.apache.hadoop.hdfs.server.common.Storage; +import org.apache.hadoop.hdfs.server.common.StorageInfo; import org.apache.hadoop.hdfs.server.common.Util; import org.apache.hadoop.hdfs.server.datanode.FSDataset.VolumeInfo; import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources; @@ -275,7 +276,7 @@ class BlockPoolManager { List isas = DFSUtil.getNNServiceRpcAddresses(conf); for(InetSocketAddress isa : isas) { - BPOfferService bpos = new BPOfferService(isa); + BPOfferService bpos = new BPOfferService(isa, DataNode.this); nameNodeThreads.put(bpos.getNNSocketAddress(), bpos); } } @@ -373,19 +374,19 @@ void refreshNamenodes(Configuration conf) } for (InetSocketAddress nnaddr : toStart) { - BPOfferService bpos = new BPOfferService(nnaddr); + BPOfferService bpos = new BPOfferService(nnaddr, DataNode.this); nameNodeThreads.put(bpos.getNNSocketAddress(), bpos); } - - for (BPOfferService bpos : toShutdown) { - remove(bpos); - } } for (BPOfferService bpos : toShutdown) { bpos.stop(); bpos.join(); } + + // stoping the BPOSes causes them to call remove() on their own when they + // clean up. + // Now start the threads that are not already running. startAll(); } @@ -402,9 +403,7 @@ void refreshNamenodes(Configuration conf) Daemon dataXceiverServer = null; ThreadGroup threadGroup = null; long blockReportInterval; - boolean resetBlockReportTime = true; long deleteReportInterval; - long lastDeletedReport = 0; long initialBlockReportDelay = DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT * 1000L; long heartBeatInterval; private boolean heartbeatsDisabledForTests = false; @@ -653,6 +652,7 @@ private synchronized void initDataBlockScanner(Configuration conf) { return; } String reason = null; + assert data != null; if (conf.getInt(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT) < 0) { reason = "verification is turned off by configuration"; @@ -774,11 +774,15 @@ void setHeartbeatsDisabledForTests( * */ @InterfaceAudience.Private - class BPOfferService implements Runnable { + static class BPOfferService implements Runnable { final InetSocketAddress nnAddr; DatanodeRegistration bpRegistration; NamespaceInfo bpNSInfo; long lastBlockReport = 0; + long lastDeletedReport = 0; + + boolean resetBlockReportTime = true; + private Thread bpThread; private DatanodeProtocol bpNamenode; private String blockPoolId; @@ -788,14 +792,13 @@ class BPOfferService implements Runnable { = new LinkedList(); private volatile int pendingReceivedRequests = 0; private volatile boolean shouldServiceRun = true; - private boolean isBlockTokenInitialized = false; UpgradeManagerDatanode upgradeManager = null; + private final DataNode dn; - BPOfferService(InetSocketAddress isa) { - this.bpRegistration = new DatanodeRegistration(getMachineName()); - bpRegistration.setInfoPort(infoServer.getPort()); - bpRegistration.setIpcPort(getIpcPort()); - this.nnAddr = isa; + BPOfferService(InetSocketAddress nnAddr, DataNode dn) { + this.dn = dn; + this.bpRegistration = dn.createRegistration(); + this.nnAddr = nnAddr; } /** @@ -822,7 +825,6 @@ private InetSocketAddress getNNSocketAddress() { void setNamespaceInfo(NamespaceInfo nsinfo) { bpNSInfo = nsinfo; this.blockPoolId = nsinfo.getBlockPoolID(); - blockPoolManager.addBlockPool(this); } void setNameNode(DatanodeProtocol dnProtocol) { @@ -831,7 +833,7 @@ void setNameNode(DatanodeProtocol dnProtocol) { private NamespaceInfo handshake() throws IOException { NamespaceInfo nsInfo = new NamespaceInfo(); - while (shouldRun && shouldServiceRun) { + while (dn.shouldRun && shouldServiceRun) { try { nsInfo = bpNamenode.versionRequest(); // verify build version @@ -867,7 +869,7 @@ private NamespaceInfo handshake() throws IOException { return nsInfo; } - void setupBP(Configuration conf, AbstractList dataDirs) + void setupBP(Configuration conf) throws IOException { // get NN proxy DatanodeProtocol dnp = @@ -878,52 +880,19 @@ void setupBP(Configuration conf, AbstractList dataDirs) // handshake with NN NamespaceInfo nsInfo = handshake(); setNamespaceInfo(nsInfo); - synchronized(DataNode.this) { - // we do not allow namenode from different cluster to register - if(clusterId != null && !clusterId.equals(nsInfo.clusterID)) { - throw new IOException( - "cannot register with the namenode because clusterid do not match:" - + " nn=" + nsInfo.getBlockPoolID() + "; nn cid=" + nsInfo.clusterID + - ";dn cid=" + clusterId); - } - - setupBPStorage(); - - setClusterId(nsInfo.clusterID); - } - - initPeriodicScanners(conf); - } - - void setupBPStorage() throws IOException { - StartupOption startOpt = getStartupOption(conf); - assert startOpt != null : "Startup option must be set."; - - boolean simulatedFSDataset = conf.getBoolean( - DFS_DATANODE_SIMULATEDDATASTORAGE_KEY, - DFS_DATANODE_SIMULATEDDATASTORAGE_DEFAULT); + dn.initBlockPool(this, nsInfo); - if (simulatedFSDataset) { - initFsDataSet(conf, dataDirs); - bpRegistration.setStorageID(getStorageId()); //same as DN + bpRegistration.setStorageID(dn.getStorageId()); + StorageInfo storageInfo = dn.storage.getBPStorage(blockPoolId); + if (storageInfo == null) { + // it's null in the case of SimulatedDataSet bpRegistration.storageInfo.layoutVersion = HdfsConstants.LAYOUT_VERSION; - bpRegistration.storageInfo.namespaceID = bpNSInfo.namespaceID; - bpRegistration.storageInfo.clusterID = bpNSInfo.clusterID; + bpRegistration.setStorageInfo(nsInfo); } else { - // read storage info, lock data dirs and transition fs state if necessary - storage.recoverTransitionRead(DataNode.this, blockPoolId, bpNSInfo, - dataDirs, startOpt); - LOG.info("setting up storage: nsid=" + storage.namespaceID + ";bpid=" - + blockPoolId + ";lv=" + storage.layoutVersion + ";nsInfo=" - + bpNSInfo); - - bpRegistration.setStorageID(getStorageId()); - bpRegistration.setStorageInfo(storage.getBPStorage(blockPoolId)); - initFsDataSet(conf, dataDirs); + bpRegistration.setStorageInfo(storageInfo); } - data.addBlockPool(blockPoolId, conf); } - + /** * This methods arranges for the data node to send the block report at * the next heartbeat. @@ -931,9 +900,9 @@ void setupBPStorage() throws IOException { void scheduleBlockReport(long delay) { if (delay > 0) { // send BR after random delay lastBlockReport = System.currentTimeMillis() - - ( blockReportInterval - DFSUtil.getRandom().nextInt((int)(delay))); + - ( dn.blockReportInterval - DFSUtil.getRandom().nextInt((int)(delay))); } else { // send at next heartbeat - lastBlockReport = lastHeartbeat - blockReportInterval; + lastBlockReport = lastHeartbeat - dn.blockReportInterval; } resetBlockReportTime = true; // reset future BRs for randomness } @@ -1038,11 +1007,11 @@ DatanodeCommand blockReport() throws IOException { // send block report if timer has expired. DatanodeCommand cmd = null; long startTime = now(); - if (startTime - lastBlockReport > blockReportInterval) { + if (startTime - lastBlockReport > dn.blockReportInterval) { // Create block report long brCreateStartTime = now(); - BlockListAsLongs bReport = data.getBlockReport(blockPoolId); + BlockListAsLongs bReport = dn.data.getBlockReport(blockPoolId); // Send block report long brSendStartTime = now(); @@ -1052,7 +1021,7 @@ DatanodeCommand blockReport() throws IOException { // Log the block report processing stats from Datanode perspective long brSendCost = now() - brSendStartTime; long brCreateCost = brSendStartTime - brCreateStartTime; - metrics.addBlockReport(brSendCost); + dn.metrics.addBlockReport(brSendCost); LOG.info("BlockReport of " + bReport.getNumberOfBlocks() + " blocks took " + brCreateCost + " msec to generate and " + brSendCost + " msecs for RPC and NN processing"); @@ -1060,7 +1029,7 @@ DatanodeCommand blockReport() throws IOException { // If we have sent the first block report, then wait a random // time before we start the periodic block reports. if (resetBlockReportTime) { - lastBlockReport = startTime - DFSUtil.getRandom().nextInt((int)(blockReportInterval)); + lastBlockReport = startTime - DFSUtil.getRandom().nextInt((int)(dn.blockReportInterval)); resetBlockReportTime = false; } else { /* say the last block report was at 8:20:14. The current report @@ -1070,7 +1039,7 @@ DatanodeCommand blockReport() throws IOException { * 2) unexpected like 11:35:43, next report should be at 12:20:14 */ lastBlockReport += (now() - lastBlockReport) / - blockReportInterval * blockReportInterval; + dn.blockReportInterval * dn.blockReportInterval; } LOG.info("sent block report, processed command:" + cmd); } @@ -1080,12 +1049,12 @@ DatanodeCommand blockReport() throws IOException { DatanodeCommand [] sendHeartBeat() throws IOException { return bpNamenode.sendHeartbeat(bpRegistration, - data.getCapacity(), - data.getDfsUsed(), - data.getRemaining(), - data.getBlockPoolUsed(blockPoolId), - xmitsInProgress.get(), - getXceiverCount(), data.getNumFailedVolumes()); + dn.data.getCapacity(), + dn.data.getDfsUsed(), + dn.data.getRemaining(), + dn.data.getBlockPoolUsed(blockPoolId), + dn.xmitsInProgress.get(), + dn.getXceiverCount(), dn.data.getNumFailedVolumes()); } //This must be called only by blockPoolManager @@ -1121,21 +1090,9 @@ private synchronized void cleanUp() { if(upgradeManager != null) upgradeManager.shutdownUpgrade(); - - blockPoolManager.remove(this); shouldServiceRun = false; RPC.stopProxy(bpNamenode); - if (blockScanner != null) { - blockScanner.removeBlockPool(this.getBlockPoolId()); - } - - if (data != null) { - data.shutdownBlockPool(this.getBlockPoolId()); - } - - if (storage != null) { - storage.removeBlockPoolStorage(this.getBlockPoolId()); - } + dn.shutdownBlockPool(this); } /** @@ -1144,22 +1101,22 @@ private synchronized void cleanUp() { */ private void offerService() throws Exception { LOG.info("For namenode " + nnAddr + " using DELETEREPORT_INTERVAL of " - + deleteReportInterval + " msec " + " BLOCKREPORT_INTERVAL of " - + blockReportInterval + "msec" + " Initial delay: " - + initialBlockReportDelay + "msec" + "; heartBeatInterval=" - + heartBeatInterval); + + dn.deleteReportInterval + " msec " + " BLOCKREPORT_INTERVAL of " + + dn.blockReportInterval + "msec" + " Initial delay: " + + dn.initialBlockReportDelay + "msec" + "; heartBeatInterval=" + + dn.heartBeatInterval); // // Now loop for a long time.... // - while (shouldRun && shouldServiceRun) { + while (dn.shouldRun && shouldServiceRun) { try { long startTime = now(); // // Every so often, send heartbeat or block-report // - if (startTime - lastHeartbeat > heartBeatInterval) { + if (startTime - lastHeartbeat > dn.heartBeatInterval) { // // All heartbeat messages include following info: // -- Datanode name @@ -1168,9 +1125,9 @@ private void offerService() throws Exception { // -- Bytes remaining // lastHeartbeat = startTime; - if (!heartbeatsDisabledForTests) { + if (!dn.heartbeatsDisabledForTests) { DatanodeCommand[] cmds = sendHeartBeat(); - metrics.addHeartbeat(now() - startTime); + dn.metrics.addHeartbeat(now() - startTime); long startProcessCommands = now(); if (!processCommand(cmds)) @@ -1183,7 +1140,7 @@ private void offerService() throws Exception { } } if (pendingReceivedRequests > 0 - || (startTime - lastDeletedReport > deleteReportInterval)) { + || (startTime - lastDeletedReport > dn.deleteReportInterval)) { reportReceivedDeletedBlocks(); lastDeletedReport = startTime; } @@ -1192,15 +1149,15 @@ private void offerService() throws Exception { processCommand(cmd); // Now safe to start scanning the block pool - if (blockScanner != null) { - blockScanner.addBlockPool(this.blockPoolId); + if (dn.blockScanner != null) { + dn.blockScanner.addBlockPool(this.blockPoolId); } // // There is no work to do; sleep until hearbeat timer elapses, // or work arrives, and then iterate again. // - long waitTime = heartBeatInterval - + long waitTime = dn.heartBeatInterval - (System.currentTimeMillis() - lastHeartbeat); synchronized(receivedAndDeletedBlockList) { if (waitTime > 0 && pendingReceivedRequests == 0) { @@ -1223,7 +1180,7 @@ private void offerService() throws Exception { } LOG.warn("RemoteException in offerService", re); try { - long sleepTime = Math.min(1000, heartBeatInterval); + long sleepTime = Math.min(1000, dn.heartBeatInterval); Thread.sleep(sleepTime); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); @@ -1269,7 +1226,7 @@ void register() throws IOException { (bpNSInfo.getLayoutVersion(), "namenode"); } - while(shouldRun && shouldServiceRun) { + while(dn.shouldRun && shouldServiceRun) { try { // Use returned registration from namenode with updated machine name. bpRegistration = bpNamenode.registerDatanode(bpRegistration); @@ -1277,8 +1234,6 @@ void register() throws IOException { LOG.info("bpReg after =" + bpRegistration.storageInfo + ";sid=" + bpRegistration.storageID + ";name="+bpRegistration.getName()); - NetUtils.getHostname(); - hostName = bpRegistration.getHost(); break; } catch(SocketTimeoutException e) { // namenode is busy LOG.info("Problem connecting to server: " + nnAddr); @@ -1287,47 +1242,13 @@ void register() throws IOException { } catch (InterruptedException ie) {} } } - - if (storage.getStorageID().equals("")) { - storage.setStorageID(bpRegistration.getStorageID()); - storage.writeAll(); - LOG.info("New storage id " + bpRegistration.getStorageID() - + " is assigned to data-node " + bpRegistration.getName()); - } else if(!storage.getStorageID().equals(bpRegistration.getStorageID())) { - throw new IOException("Inconsistent storage IDs. Name-node returned " - + bpRegistration.getStorageID() - + ". Expecting " + storage.getStorageID()); - } - - if (!isBlockTokenInitialized) { - /* first time registering with NN */ - ExportedBlockKeys keys = bpRegistration.exportedKeys; - isBlockTokenEnabled = keys.isBlockTokenEnabled(); - if (isBlockTokenEnabled) { - long blockKeyUpdateInterval = keys.getKeyUpdateInterval(); - long blockTokenLifetime = keys.getTokenLifetime(); - LOG.info("Block token params received from NN: for block pool " + - blockPoolId + " keyUpdateInterval=" - + blockKeyUpdateInterval / (60 * 1000) - + " min(s), tokenLifetime=" + blockTokenLifetime / (60 * 1000) - + " min(s)"); - final BlockTokenSecretManager secretMgr = - new BlockTokenSecretManager(false, 0, blockTokenLifetime); - blockPoolTokenSecretManager.addBlockPool(blockPoolId, secretMgr); - } - isBlockTokenInitialized = true; - } - - if (isBlockTokenEnabled) { - blockPoolTokenSecretManager.setKeys(blockPoolId, - bpRegistration.exportedKeys); - bpRegistration.exportedKeys = ExportedBlockKeys.DUMMY_KEYS; - } + + dn.bpRegistrationSucceeded(bpRegistration, blockPoolId); LOG.info("in register:" + ";bpDNR="+bpRegistration.storageInfo); // random short delay - helps scatter the BR from all DNs - scheduleBlockReport(initialBlockReportDelay); + scheduleBlockReport(dn.initialBlockReportDelay); } @@ -1341,14 +1262,14 @@ void register() throws IOException { */ @Override public void run() { - LOG.info(bpRegistration + "In BPOfferService.run, data = " + data + LOG.info(bpRegistration + "In BPOfferService.run, data = " + dn.data + ";bp=" + blockPoolId); try { // init stuff try { // setup storage - setupBP(conf, dataDirs); + setupBP(dn.conf); register(); } catch (IOException ioe) { // Initial handshake, storage recovery or registration failed @@ -1360,13 +1281,13 @@ public void run() { initialized = true; // bp is initialized; - while (shouldRun && shouldServiceRun) { + while (dn.shouldRun && shouldServiceRun) { try { startDistributedUpgradeIfNeeded(); offerService(); } catch (Exception ex) { LOG.error("Exception in BPOfferService", ex); - if (shouldRun && shouldServiceRun) { + if (dn.shouldRun && shouldServiceRun) { try { Thread.sleep(5000); } catch (InterruptedException ie) { @@ -1379,7 +1300,7 @@ public void run() { LOG.warn("Unexpected exception", ex); } finally { LOG.warn(bpRegistration + " ending block pool service for: " - + blockPoolId); + + blockPoolId + " thread " + Thread.currentThread().getId()); cleanUp(); } } @@ -1420,8 +1341,8 @@ private boolean processCommand(DatanodeCommand cmd) throws IOException { switch(cmd.getAction()) { case DatanodeProtocol.DNA_TRANSFER: // Send a copy of a block to another datanode - transferBlocks(bcmd.getBlockPoolId(), bcmd.getBlocks(), bcmd.getTargets()); - metrics.incrBlocksReplicated(bcmd.getBlocks().length); + dn.transferBlocks(bcmd.getBlockPoolId(), bcmd.getBlocks(), bcmd.getTargets()); + dn.metrics.incrBlocksReplicated(bcmd.getBlocks().length); break; case DatanodeProtocol.DNA_INVALIDATE: // @@ -1430,16 +1351,16 @@ private boolean processCommand(DatanodeCommand cmd) throws IOException { // Block toDelete[] = bcmd.getBlocks(); try { - if (blockScanner != null) { - blockScanner.deleteBlocks(bcmd.getBlockPoolId(), toDelete); + if (dn.blockScanner != null) { + dn.blockScanner.deleteBlocks(bcmd.getBlockPoolId(), toDelete); } // using global fsdataset - data.invalidate(bcmd.getBlockPoolId(), toDelete); + dn.data.invalidate(bcmd.getBlockPoolId(), toDelete); } catch(IOException e) { - checkDiskError(); + dn.checkDiskError(); throw e; } - metrics.incrBlocksRemoved(toDelete.length); + dn.metrics.incrBlocksRemoved(toDelete.length); break; case DatanodeProtocol.DNA_SHUTDOWN: // shut down the data node @@ -1448,12 +1369,12 @@ private boolean processCommand(DatanodeCommand cmd) throws IOException { case DatanodeProtocol.DNA_REGISTER: // namenode requested a registration - at start or if NN lost contact LOG.info("DatanodeCommand action: DNA_REGISTER"); - if (shouldRun && shouldServiceRun) { + if (dn.shouldRun && shouldServiceRun) { register(); } break; case DatanodeProtocol.DNA_FINALIZE: - storage.finalizeUpgrade(((FinalizeCommand) cmd) + dn.storage.finalizeUpgrade(((FinalizeCommand) cmd) .getBlockPoolId()); break; case UpgradeCommand.UC_ACTION_START_UPGRADE: @@ -1461,12 +1382,12 @@ private boolean processCommand(DatanodeCommand cmd) throws IOException { processDistributedUpgradeCommand((UpgradeCommand)cmd); break; case DatanodeProtocol.DNA_RECOVERBLOCK: - recoverBlocks(((BlockRecoveryCommand)cmd).getRecoveringBlocks()); + dn.recoverBlocks(((BlockRecoveryCommand)cmd).getRecoveringBlocks()); break; case DatanodeProtocol.DNA_ACCESSKEYUPDATE: LOG.info("DatanodeCommand action: DNA_ACCESSKEYUPDATE"); - if (isBlockTokenEnabled) { - blockPoolTokenSecretManager.setKeys(blockPoolId, + if (dn.isBlockTokenEnabled) { + dn.blockPoolTokenSecretManager.setKeys(blockPoolId, ((KeyUpdateCommand) cmd).getExportedKeys()); } break; @@ -1476,7 +1397,7 @@ private boolean processCommand(DatanodeCommand cmd) throws IOException { ((BalancerBandwidthCommand) cmd).getBalancerBandwidthValue(); if (bandwidth > 0) { DataXceiverServer dxcs = - (DataXceiverServer) dataXceiverServer.getRunnable(); + (DataXceiverServer) dn.dataXceiverServer.getRunnable(); dxcs.balanceThrottler.setBandwidth(bandwidth); } break; @@ -1495,7 +1416,7 @@ private void processDistributedUpgradeCommand(UpgradeCommand comm) synchronized UpgradeManagerDatanode getUpgradeManager() { if(upgradeManager == null) upgradeManager = - new UpgradeManagerDatanode(DataNode.this, blockPoolId); + new UpgradeManagerDatanode(dn, blockPoolId); return upgradeManager; } @@ -1555,6 +1476,133 @@ void startDataNode(Configuration conf, blockPoolManager = new BlockPoolManager(conf); } + /** + * Check that the registration returned from a NameNode is consistent + * with the information in the storage. If the storage is fresh/unformatted, + * sets the storage ID based on this registration. + * Also updates the block pool's state in the secret manager. + */ + private synchronized void bpRegistrationSucceeded(DatanodeRegistration bpRegistration, + String blockPoolId) + throws IOException { + hostName = bpRegistration.getHost(); + + if (storage.getStorageID().equals("")) { + // This is a fresh datanode -- take the storage ID provided by the + // NN and persist it. + storage.setStorageID(bpRegistration.getStorageID()); + storage.writeAll(); + LOG.info("New storage id " + bpRegistration.getStorageID() + + " is assigned to data-node " + bpRegistration.getName()); + } else if(!storage.getStorageID().equals(bpRegistration.getStorageID())) { + throw new IOException("Inconsistent storage IDs. Name-node returned " + + bpRegistration.getStorageID() + + ". Expecting " + storage.getStorageID()); + } + + registerBlockPoolWithSecretManager(bpRegistration, blockPoolId); + } + + /** + * After the block pool has contacted the NN, registers that block pool + * with the secret manager, updating it with the secrets provided by the NN. + * @param bpRegistration + * @param blockPoolId + * @throws IOException + */ + private void registerBlockPoolWithSecretManager(DatanodeRegistration bpRegistration, + String blockPoolId) throws IOException { + ExportedBlockKeys keys = bpRegistration.exportedKeys; + isBlockTokenEnabled = keys.isBlockTokenEnabled(); + // TODO should we check that all federated nns are either enabled or + // disabled? + if (!isBlockTokenEnabled) return; + + if (!blockPoolTokenSecretManager.isBlockPoolRegistered(blockPoolId)) { + long blockKeyUpdateInterval = keys.getKeyUpdateInterval(); + long blockTokenLifetime = keys.getTokenLifetime(); + LOG.info("Block token params received from NN: for block pool " + + blockPoolId + " keyUpdateInterval=" + + blockKeyUpdateInterval / (60 * 1000) + + " min(s), tokenLifetime=" + blockTokenLifetime / (60 * 1000) + + " min(s)"); + final BlockTokenSecretManager secretMgr = + new BlockTokenSecretManager(false, 0, blockTokenLifetime); + blockPoolTokenSecretManager.addBlockPool(blockPoolId, secretMgr); + } + + blockPoolTokenSecretManager.setKeys(blockPoolId, + bpRegistration.exportedKeys); + bpRegistration.exportedKeys = ExportedBlockKeys.DUMMY_KEYS; + } + + /** + * Remove the given block pool from the block scanner, dataset, and storage. + */ + private void shutdownBlockPool(BPOfferService bpos) { + blockPoolManager.remove(bpos); + + String bpId = bpos.getBlockPoolId(); + if (blockScanner != null) { + blockScanner.removeBlockPool(bpId); + } + + if (data != null) { + data.shutdownBlockPool(bpId); + } + + if (storage != null) { + storage.removeBlockPoolStorage(bpId); + } + } + + void initBlockPool(BPOfferService bpOfferService, + NamespaceInfo nsInfo) throws IOException { + String blockPoolId = nsInfo.getBlockPoolID(); + + blockPoolManager.addBlockPool(bpOfferService); + + synchronized (this) { + // we do not allow namenode from different cluster to register + if(clusterId != null && !clusterId.equals(nsInfo.clusterID)) { + throw new IOException( + "cannot register with the namenode because clusterid do not match:" + + " nn=" + nsInfo.getBlockPoolID() + "; nn cid=" + nsInfo.clusterID + + ";dn cid=" + clusterId); + } + + setClusterId(nsInfo.clusterID); + } + + StartupOption startOpt = getStartupOption(conf); + assert startOpt != null : "Startup option must be set."; + + boolean simulatedFSDataset = conf.getBoolean( + DFS_DATANODE_SIMULATEDDATASTORAGE_KEY, + DFS_DATANODE_SIMULATEDDATASTORAGE_DEFAULT); + + if (!simulatedFSDataset) { + // read storage info, lock data dirs and transition fs state if necessary + storage.recoverTransitionRead(DataNode.this, blockPoolId, nsInfo, + dataDirs, startOpt); + StorageInfo bpStorage = storage.getBPStorage(blockPoolId); + LOG.info("setting up storage: nsid=" + + bpStorage.getNamespaceID() + ";bpid=" + + blockPoolId + ";lv=" + storage.getLayoutVersion() + + ";nsInfo=" + nsInfo); + } + initFsDataSet(); + initPeriodicScanners(conf); + data.addBlockPool(nsInfo.getBlockPoolID(), conf); + } + + private DatanodeRegistration createRegistration() { + DatanodeRegistration reg = new DatanodeRegistration(getMachineName()); + reg.setInfoPort(infoServer.getPort()); + reg.setIpcPort(getIpcPort()); + return reg; + } + BPOfferService[] getAllBpOs() { return blockPoolManager.getAllNamenodeThreads(); } @@ -1567,8 +1615,7 @@ int getBpOsCount() { * Initializes the {@link #data}. The initialization is done only once, when * handshake with the the first namenode is completed. */ - private synchronized void initFsDataSet(Configuration conf, - AbstractList dataDirs) throws IOException { + private synchronized void initFsDataSet() throws IOException { if (data != null) { // Already initialized return; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java index 4d7740455f..a3d47b623e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java @@ -61,7 +61,7 @@ public static void setBPNamenodeByIndex(DataNode dn, bpos.setNamespaceInfo(nsifno); dn.setBPNamenode(bpid, nn); - bpos.setupBPStorage(); + dn.initBlockPool(bpos, nsifno); } } } From 41cfb68419a563260aa2c99be335e8bb57be89d0 Mon Sep 17 00:00:00 2001 From: Thomas White Date: Fri, 18 Nov 2011 00:48:54 +0000 Subject: [PATCH 05/10] =?UTF-8?q?HADOOP-7802.=20Hadoop=20scripts=20uncondi?= =?UTF-8?q?tionally=20source=20"$bin"/../libexec/hadoop-config.sh.=20Contr?= =?UTF-8?q?ibuted=20by=20Bruno=20Mah=C3=A9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1203449 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-common-project/hadoop-common/CHANGES.txt | 3 +++ hadoop-common-project/hadoop-common/src/main/bin/hadoop | 4 +++- .../hadoop-common/src/main/bin/hadoop-daemon.sh | 4 +++- .../hadoop-common/src/main/bin/hadoop-daemons.sh | 4 +++- hadoop-common-project/hadoop-common/src/main/bin/rcc | 4 +++- hadoop-common-project/hadoop-common/src/main/bin/slaves.sh | 4 +++- .../hadoop-common/src/main/bin/start-all.sh | 4 +++- hadoop-common-project/hadoop-common/src/main/bin/stop-all.sh | 4 +++- .../hadoop-common/src/main/packages/hadoop-create-user.sh | 4 +++- .../src/main/packages/hadoop-setup-applications.sh | 4 +++- .../hadoop-common/src/main/packages/hadoop-setup-conf.sh | 5 ++++- .../hadoop-common/src/main/packages/hadoop-setup-hdfs.sh | 4 +++- .../src/main/packages/hadoop-setup-single-node.sh | 4 +++- .../hadoop-common/src/main/packages/hadoop-validate-setup.sh | 4 +++- 14 files changed, 43 insertions(+), 13 deletions(-) diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 74d683b505..ad6db0ff84 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -110,6 +110,9 @@ Release 0.23.1 - Unreleased HADOOP-7801. HADOOP_PREFIX cannot be overriden. (Bruno Mahé via tomwhite) + HADOOP-7802. Hadoop scripts unconditionally source + "$bin"/../libexec/hadoop-config.sh. (Bruno Mahé via tomwhite) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-common-project/hadoop-common/src/main/bin/hadoop b/hadoop-common-project/hadoop-common/src/main/bin/hadoop index a3bed69247..9e92b5c2ce 100755 --- a/hadoop-common-project/hadoop-common/src/main/bin/hadoop +++ b/hadoop-common-project/hadoop-common/src/main/bin/hadoop @@ -21,7 +21,9 @@ bin=`which $0` bin=`dirname ${bin}` bin=`cd "$bin"; pwd` -. "$bin"/../libexec/hadoop-config.sh +DEFAULT_LIBEXEC_DIR="$bin"/../libexec +HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR} +. $HADOOP_LIBEXEC_DIR/hadoop-config.sh function print_usage(){ echo "Usage: hadoop [--config confdir] COMMAND" diff --git a/hadoop-common-project/hadoop-common/src/main/bin/hadoop-daemon.sh b/hadoop-common-project/hadoop-common/src/main/bin/hadoop-daemon.sh index 01aaad4298..a843c93cbf 100755 --- a/hadoop-common-project/hadoop-common/src/main/bin/hadoop-daemon.sh +++ b/hadoop-common-project/hadoop-common/src/main/bin/hadoop-daemon.sh @@ -39,7 +39,9 @@ fi bin=`dirname "${BASH_SOURCE-$0}"` bin=`cd "$bin"; pwd` -. "$bin"/../libexec/hadoop-config.sh +DEFAULT_LIBEXEC_DIR="$bin"/../libexec +HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR} +. $HADOOP_LIBEXEC_DIR/hadoop-config.sh # get arguments diff --git a/hadoop-common-project/hadoop-common/src/main/bin/hadoop-daemons.sh b/hadoop-common-project/hadoop-common/src/main/bin/hadoop-daemons.sh index 08c7e23ac9..181d7ac101 100755 --- a/hadoop-common-project/hadoop-common/src/main/bin/hadoop-daemons.sh +++ b/hadoop-common-project/hadoop-common/src/main/bin/hadoop-daemons.sh @@ -29,6 +29,8 @@ fi bin=`dirname "${BASH_SOURCE-$0}"` bin=`cd "$bin"; pwd` -. "$bin"/../libexec/hadoop-config.sh +DEFAULT_LIBEXEC_DIR="$bin"/../libexec +HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR} +. $HADOOP_LIBEXEC_DIR/hadoop-config.sh exec "$bin/slaves.sh" --config $HADOOP_CONF_DIR cd "$HADOOP_PREFIX" \; "$bin/hadoop-daemon.sh" --config $HADOOP_CONF_DIR "$@" diff --git a/hadoop-common-project/hadoop-common/src/main/bin/rcc b/hadoop-common-project/hadoop-common/src/main/bin/rcc index ebeebd0ee2..5f75b7c950 100755 --- a/hadoop-common-project/hadoop-common/src/main/bin/rcc +++ b/hadoop-common-project/hadoop-common/src/main/bin/rcc @@ -30,7 +30,9 @@ bin=`dirname "${BASH_SOURCE-$0}"` bin=`cd "$bin"; pwd` -. "$bin"/../libexec/hadoop-config.sh +DEFAULT_LIBEXEC_DIR="$bin"/../libexec +HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR} +. $HADOOP_LIBEXEC_DIR/hadoop-config.sh if [ -f "${HADOOP_CONF_DIR}/hadoop-env.sh" ]; then . "${HADOOP_CONF_DIR}/hadoop-env.sh" diff --git a/hadoop-common-project/hadoop-common/src/main/bin/slaves.sh b/hadoop-common-project/hadoop-common/src/main/bin/slaves.sh index 153f4416f7..016392fedb 100755 --- a/hadoop-common-project/hadoop-common/src/main/bin/slaves.sh +++ b/hadoop-common-project/hadoop-common/src/main/bin/slaves.sh @@ -38,7 +38,9 @@ fi bin=`dirname "${BASH_SOURCE-$0}"` bin=`cd "$bin"; pwd` -. "$bin"/../libexec/hadoop-config.sh +DEFAULT_LIBEXEC_DIR="$bin"/../libexec +HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR} +. $HADOOP_LIBEXEC_DIR/hadoop-config.sh if [ -f "${HADOOP_CONF_DIR}/hadoop-env.sh" ]; then . "${HADOOP_CONF_DIR}/hadoop-env.sh" diff --git a/hadoop-common-project/hadoop-common/src/main/bin/start-all.sh b/hadoop-common-project/hadoop-common/src/main/bin/start-all.sh index 57fb3d6c74..f91d9afef0 100755 --- a/hadoop-common-project/hadoop-common/src/main/bin/start-all.sh +++ b/hadoop-common-project/hadoop-common/src/main/bin/start-all.sh @@ -23,7 +23,9 @@ echo "This script is Deprecated. Instead use start-dfs.sh and start-mapred.sh" bin=`dirname "${BASH_SOURCE-$0}"` bin=`cd "$bin"; pwd` -. "$bin"/../libexec/hadoop-config.sh +DEFAULT_LIBEXEC_DIR="$bin"/../libexec +HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR} +. $HADOOP_LIBEXEC_DIR/hadoop-config.sh # start hdfs daemons if hdfs is present if [ -f "${HADOOP_HDFS_HOME}"/bin/start-dfs.sh ]; then diff --git a/hadoop-common-project/hadoop-common/src/main/bin/stop-all.sh b/hadoop-common-project/hadoop-common/src/main/bin/stop-all.sh index a2b5ddb716..7d8bd591ea 100755 --- a/hadoop-common-project/hadoop-common/src/main/bin/stop-all.sh +++ b/hadoop-common-project/hadoop-common/src/main/bin/stop-all.sh @@ -23,7 +23,9 @@ echo "This script is Deprecated. Instead use stop-dfs.sh and stop-mapred.sh" bin=`dirname "${BASH_SOURCE-$0}"` bin=`cd "$bin"; pwd` -. "$bin"/../libexec/hadoop-config.sh +DEFAULT_LIBEXEC_DIR="$bin"/../libexec +HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR} +. $HADOOP_LIBEXEC_DIR/hadoop-config.sh # stop hdfs daemons if hdfs is present if [ -f "${HADOOP_HDFS_HOME}"/bin/stop-dfs.sh ]; then diff --git a/hadoop-common-project/hadoop-common/src/main/packages/hadoop-create-user.sh b/hadoop-common-project/hadoop-common/src/main/packages/hadoop-create-user.sh index a5d4304af5..ad8ab35aa9 100644 --- a/hadoop-common-project/hadoop-common/src/main/packages/hadoop-create-user.sh +++ b/hadoop-common-project/hadoop-common/src/main/packages/hadoop-create-user.sh @@ -24,7 +24,9 @@ if [ "$HADOOP_HOME" != "" ]; then echo fi -. "$bin"/../libexec/hadoop-config.sh +DEFAULT_LIBEXEC_DIR="$bin"/../libexec +HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR} +. $HADOOP_LIBEXEC_DIR/hadoop-config.sh usage() { echo " diff --git a/hadoop-common-project/hadoop-common/src/main/packages/hadoop-setup-applications.sh b/hadoop-common-project/hadoop-common/src/main/packages/hadoop-setup-applications.sh index 521c1599c3..092e50d468 100644 --- a/hadoop-common-project/hadoop-common/src/main/packages/hadoop-setup-applications.sh +++ b/hadoop-common-project/hadoop-common/src/main/packages/hadoop-setup-applications.sh @@ -19,7 +19,9 @@ bin=$(cd -P -- "$(dirname -- "$this")" && pwd -P) script="$(basename -- "$this")" this="$bin/$script" -. "$bin"/../libexec/hadoop-config.sh +DEFAULT_LIBEXEC_DIR="$bin"/../libexec +HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR} +. $HADOOP_LIBEXEC_DIR/hadoop-config.sh usage() { echo " diff --git a/hadoop-common-project/hadoop-common/src/main/packages/hadoop-setup-conf.sh b/hadoop-common-project/hadoop-common/src/main/packages/hadoop-setup-conf.sh index de47ce2716..0daac37942 100644 --- a/hadoop-common-project/hadoop-common/src/main/packages/hadoop-setup-conf.sh +++ b/hadoop-common-project/hadoop-common/src/main/packages/hadoop-setup-conf.sh @@ -504,7 +504,10 @@ if [ "${AUTOSETUP}" == "1" -o "${AUTOSETUP}" == "y" ]; then fi chmod 755 -R ${HADOOP_PREFIX}/sbin/*hadoop* chmod 755 -R ${HADOOP_PREFIX}/bin/hadoop - chmod 755 -R ${HADOOP_PREFIX}/libexec/hadoop-config.sh + + HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-${HADOOP_PREFIX}/libexec} + chmod 755 -R ${HADOOP_LIBEXEC_DIR}/hadoop-config.sh + mkdir -p /home/${HADOOP_MR_USER} chown ${HADOOP_MR_USER}:${HADOOP_GROUP} /home/${HADOOP_MR_USER} HDFS_DIR=`echo ${HADOOP_HDFS_DIR} | sed -e 's/,/ /g'` diff --git a/hadoop-common-project/hadoop-common/src/main/packages/hadoop-setup-hdfs.sh b/hadoop-common-project/hadoop-common/src/main/packages/hadoop-setup-hdfs.sh index fc4a7325c2..d2e8f1f69f 100644 --- a/hadoop-common-project/hadoop-common/src/main/packages/hadoop-setup-hdfs.sh +++ b/hadoop-common-project/hadoop-common/src/main/packages/hadoop-setup-hdfs.sh @@ -18,7 +18,9 @@ bin=`dirname "$0"` bin=`cd "$bin"; pwd` -. "$bin"/../libexec/hadoop-config.sh +DEFAULT_LIBEXEC_DIR="$bin"/../libexec +HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR} +. $HADOOP_LIBEXEC_DIR/hadoop-config.sh usage() { echo " diff --git a/hadoop-common-project/hadoop-common/src/main/packages/hadoop-setup-single-node.sh b/hadoop-common-project/hadoop-common/src/main/packages/hadoop-setup-single-node.sh index 5b47e4431c..845f815f49 100644 --- a/hadoop-common-project/hadoop-common/src/main/packages/hadoop-setup-single-node.sh +++ b/hadoop-common-project/hadoop-common/src/main/packages/hadoop-setup-single-node.sh @@ -25,7 +25,9 @@ if [ "$HADOOP_HOME" != "" ]; then echo fi -. "$bin"/../libexec/hadoop-config.sh +DEFAULT_LIBEXEC_DIR="$bin"/../libexec +HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR} +. $HADOOP_LIBEXEC_DIR/hadoop-config.sh usage() { echo " diff --git a/hadoop-common-project/hadoop-common/src/main/packages/hadoop-validate-setup.sh b/hadoop-common-project/hadoop-common/src/main/packages/hadoop-validate-setup.sh index 5d3aa1461e..35d8cb82fb 100644 --- a/hadoop-common-project/hadoop-common/src/main/packages/hadoop-validate-setup.sh +++ b/hadoop-common-project/hadoop-common/src/main/packages/hadoop-validate-setup.sh @@ -31,7 +31,9 @@ bin=`dirname "$0"` bin=`cd "$bin"; pwd` -. "$bin"/../libexec/hadoop-config.sh +DEFAULT_LIBEXEC_DIR="$bin"/../libexec +HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR} +. $HADOOP_LIBEXEC_DIR/hadoop-config.sh usage() { echo " From dc5a0ab2453494cbb6cbf9ac18e0cc15c877223b Mon Sep 17 00:00:00 2001 From: Thomas White Date: Fri, 18 Nov 2011 00:53:46 +0000 Subject: [PATCH 06/10] =?UTF-8?q?HDFS-2544.=20Hadoop=20scripts=20unconditi?= =?UTF-8?q?onally=20source=20"$bin"/../libexec/hadoop-config.sh.=20Contrib?= =?UTF-8?q?uted=20by=20Bruno=20Mah=C3=A9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1203452 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +++ .../hadoop-hdfs/src/main/bin/distribute-exclude.sh | 4 +++- hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs | 4 +++- hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs-config.sh | 6 ++++-- .../hadoop-hdfs/src/main/bin/refresh-namenodes.sh | 4 +++- .../hadoop-hdfs/src/main/bin/start-balancer.sh | 4 +++- hadoop-hdfs-project/hadoop-hdfs/src/main/bin/start-dfs.sh | 4 +++- .../hadoop-hdfs/src/main/bin/start-secure-dns.sh | 4 +++- .../hadoop-hdfs/src/main/bin/stop-balancer.sh | 4 +++- hadoop-hdfs-project/hadoop-hdfs/src/main/bin/stop-dfs.sh | 4 +++- .../hadoop-hdfs/src/main/bin/stop-secure-dns.sh | 4 +++- 11 files changed, 34 insertions(+), 11 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 0e5147011e..fe21d840be 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -109,6 +109,9 @@ Release 0.23.1 - UNRELEASED IMPROVEMENTS HDFS-2560. Refactor BPOfferService to be a static inner class (todd) + HDFS-2544. Hadoop scripts unconditionally source + "$bin"/../libexec/hadoop-config.sh. (Bruno Mahé via tomwhite) + OPTIMIZATIONS HDFS-2130. Switch default checksum to CRC32C. (todd) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/distribute-exclude.sh b/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/distribute-exclude.sh index cc538f72d3..66fc14a246 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/distribute-exclude.sh +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/distribute-exclude.sh @@ -36,7 +36,9 @@ bin=`dirname "$0"` bin=`cd "$bin"; pwd` -. "$bin/../libexec/hdfs-config.sh" +DEFAULT_LIBEXEC_DIR="$bin"/../libexec +HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR} +. $HADOOP_LIBEXEC_DIR/hdfs-config.sh if [ "$1" = '' ] ; then "Error: please specify local exclude file as a first argument" diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs b/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs index d9b8f61abc..314fac8fd8 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs @@ -19,7 +19,9 @@ bin=`which $0` bin=`dirname ${bin}` bin=`cd "$bin"; pwd` -. "$bin"/../libexec/hdfs-config.sh +DEFAULT_LIBEXEC_DIR="$bin"/../libexec +HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR} +. $HADOOP_LIBEXEC_DIR/hdfs-config.sh function print_usage(){ echo "Usage: hdfs [--config confdir] COMMAND" diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs-config.sh b/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs-config.sh index 48aa20c94d..09eec6e5de 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs-config.sh +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs-config.sh @@ -24,8 +24,10 @@ bin=`cd "$bin"; pwd` export HADOOP_PREFIX="${HADOOP_PREFIX:-$bin/..}" -if [ -e "$bin/../libexec/hadoop-config.sh" ]; then - . $bin/../libexec/hadoop-config.sh +DEFAULT_LIBEXEC_DIR="$bin"/../libexec +HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR} +if [ -e "${HADOOP_LIBEXEC_DIR}/hadoop-config.sh" ]; then + . ${HADOOP_LIBEXEC_DIR}/hadoop-config.sh elif [ -e "${HADOOP_COMMON_HOME}/libexec/hadoop-config.sh" ]; then . "$HADOOP_COMMON_HOME"/libexec/hadoop-config.sh elif [ -e "${HADOOP_HOME}/libexec/hadoop-config.sh" ]; then diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/refresh-namenodes.sh b/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/refresh-namenodes.sh index 2092764a1d..d3f67598b6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/refresh-namenodes.sh +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/refresh-namenodes.sh @@ -23,7 +23,9 @@ bin=`dirname "$0"` bin=`cd "$bin"; pwd` -. "$bin/../libexec/hdfs-config.sh" +DEFAULT_LIBEXEC_DIR="$bin"/../libexec +HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR} +. $HADOOP_LIBEXEC_DIR/hdfs-config.sh namenodes=$("$HADOOP_PREFIX/bin/hdfs" getconf -nnRpcAddresses) if [ "$?" != '0' ] ; then errorFlag='1' ; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/start-balancer.sh b/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/start-balancer.sh index b6b3aa7f8b..24c622764e 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/start-balancer.sh +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/start-balancer.sh @@ -18,7 +18,9 @@ bin=`dirname "${BASH_SOURCE-$0}"` bin=`cd "$bin"; pwd` -. "$bin"/../libexec/hdfs-config.sh +DEFAULT_LIBEXEC_DIR="$bin"/../libexec +HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR} +. $HADOOP_LIBEXEC_DIR/hdfs-config.sh # Start balancer daemon. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/start-dfs.sh b/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/start-dfs.sh index d6ed5f99e6..d6d03f7f8f 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/start-dfs.sh +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/start-dfs.sh @@ -25,7 +25,9 @@ usage="Usage: start-dfs.sh [-upgrade|-rollback]" bin=`dirname "${BASH_SOURCE-$0}"` bin=`cd "$bin"; pwd` -. "$bin"/../libexec/hdfs-config.sh +DEFAULT_LIBEXEC_DIR="$bin"/../libexec +HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR} +. $HADOOP_LIBEXEC_DIR/hdfs-config.sh # get arguments if [ $# -ge 1 ]; then diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/start-secure-dns.sh b/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/start-secure-dns.sh index bcb9b8f82d..7ddf687922 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/start-secure-dns.sh +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/start-secure-dns.sh @@ -22,7 +22,9 @@ usage="Usage (run as root in order to start secure datanodes): start-secure-dns. bin=`dirname "${BASH_SOURCE-$0}"` bin=`cd "$bin"; pwd` -. "$bin"/../libexec/hdfs-config.sh +DEFAULT_LIBEXEC_DIR="$bin"/../libexec +HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR} +. $HADOOP_LIBEXEC_DIR/hdfs-config.sh if [ "$EUID" -eq 0 ] && [ -n "$HADOOP_SECURE_DN_USER" ]; then "$HADOOP_PREFIX"/sbin/hadoop-daemons.sh --config $HADOOP_CONF_DIR --script "$bin"/hdfs start datanode $dataStartOpt diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/stop-balancer.sh b/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/stop-balancer.sh index 7edd0bd311..5026c8c7cc 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/stop-balancer.sh +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/stop-balancer.sh @@ -18,7 +18,9 @@ bin=`dirname "${BASH_SOURCE-$0}"` bin=`cd "$bin"; pwd` -. "$bin"/../libexec/hdfs-config.sh +DEFAULT_LIBEXEC_DIR="$bin"/../libexec +HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR} +. $HADOOP_LIBEXEC_DIR/hdfs-config.sh # Stop balancer daemon. # Run this on the machine where the balancer is running diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/stop-dfs.sh b/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/stop-dfs.sh index e64a5ea879..11788e24b7 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/stop-dfs.sh +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/stop-dfs.sh @@ -18,7 +18,9 @@ bin=`dirname "${BASH_SOURCE-$0}"` bin=`cd "$bin"; pwd` -. "$bin"/../libexec/hdfs-config.sh +DEFAULT_LIBEXEC_DIR="$bin"/../libexec +HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR} +. $HADOOP_LIBEXEC_DIR/hdfs-config.sh #--------------------------------------------------------- # namenodes diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/stop-secure-dns.sh b/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/stop-secure-dns.sh index a0ae1728d4..fdd47c3891 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/stop-secure-dns.sh +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/stop-secure-dns.sh @@ -22,7 +22,9 @@ usage="Usage (run as root in order to stop secure datanodes): stop-secure-dns.sh bin=`dirname "${BASH_SOURCE-$0}"` bin=`cd "$bin"; pwd` -. "$bin"/../libexec/hdfs-config.sh +DEFAULT_LIBEXEC_DIR="$bin"/../libexec +HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR} +. $HADOOP_LIBEXEC_DIR/hdfs-config.sh if [ "$EUID" -eq 0 ] && [ -n "$HADOOP_SECURE_DN_USER" ]; then "$HADOOP_PREFIX"/sbin/hadoop-daemons.sh --config $HADOOP_CONF_DIR --script "$bin"/hdfs stop datanode From bd2e2aaf996b54d93f5d7b648ec1ed2b70969f00 Mon Sep 17 00:00:00 2001 From: Thomas White Date: Fri, 18 Nov 2011 00:57:08 +0000 Subject: [PATCH 07/10] =?UTF-8?q?MAPREDUCE-3373.=20Hadoop=20scripts=20unco?= =?UTF-8?q?nditionally=20source=20"$bin"/../libexec/hadoop-config.sh.=20Co?= =?UTF-8?q?ntributed=20by=20Bruno=20Mah=C3=A9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1203455 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 +++ hadoop-mapreduce-project/bin/mapred | 6 ++++-- hadoop-mapreduce-project/bin/mapred-config.sh | 6 ++++-- hadoop-mapreduce-project/hadoop-yarn/bin/slaves.sh | 4 +++- hadoop-mapreduce-project/hadoop-yarn/bin/start-all.sh | 5 ++++- hadoop-mapreduce-project/hadoop-yarn/bin/stop-all.sh | 5 ++++- hadoop-mapreduce-project/hadoop-yarn/bin/yarn | 4 +++- hadoop-mapreduce-project/hadoop-yarn/bin/yarn-daemon.sh | 4 +++- hadoop-mapreduce-project/hadoop-yarn/bin/yarn-daemons.sh | 4 +++- .../src/contrib/raid/bin/start-raidnode-remote.sh | 4 +++- .../src/contrib/raid/bin/start-raidnode.sh | 4 +++- .../src/contrib/raid/bin/stop-raidnode-remote.sh | 4 +++- .../src/contrib/raid/bin/stop-raidnode.sh | 4 +++- 13 files changed, 43 insertions(+), 14 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index d2f0e190e5..d4a1197fb6 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -100,6 +100,9 @@ Release 0.23.1 - Unreleased MAPREDUCE-3102. Changed NodeManager to fail fast when LinuxContainerExecutor has wrong configuration or permissions. (Hitesh Shah via vinodkv) + MAPREDUCE-3373. Hadoop scripts unconditionally source + "$bin"/../libexec/hadoop-config.sh. (Bruno Mahé via tomwhite) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-mapreduce-project/bin/mapred b/hadoop-mapreduce-project/bin/mapred index e5e9efb413..de82a03422 100755 --- a/hadoop-mapreduce-project/bin/mapred +++ b/hadoop-mapreduce-project/bin/mapred @@ -19,8 +19,10 @@ bin=`which $0` bin=`dirname ${bin}` bin=`cd "$bin"; pwd` -if [ -e $bin/../libexec/mapred-config.sh ]; then - . $bin/../libexec/mapred-config.sh +DEFAULT_LIBEXEC_DIR="$bin"/../libexec +HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR} +if [ -e ${HADOOP_LIBEXEC_DIR}/mapred-config.sh ]; then + . ${HADOOP_LIBEXEC_DIR}/mapred-config.sh else . "$bin/mapred-config.sh" fi diff --git a/hadoop-mapreduce-project/bin/mapred-config.sh b/hadoop-mapreduce-project/bin/mapred-config.sh index c79f4fb62f..d1eb627adb 100644 --- a/hadoop-mapreduce-project/bin/mapred-config.sh +++ b/hadoop-mapreduce-project/bin/mapred-config.sh @@ -22,8 +22,10 @@ bin=`which "$0"` bin=`dirname "${bin}"` bin=`cd "$bin"; pwd` -if [ -e "$bin/../libexec/hadoop-config.sh" ]; then - . "$bin/../libexec/hadoop-config.sh" +DEFAULT_LIBEXEC_DIR="$bin"/../libexec +HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR} +if [ -e "${HADOOP_LIBEXEC_DIR}/hadoop-config.sh" ]; then + . "${HADOOP_LIBEXEC_DIR}/hadoop-config.sh" elif [ -e "${HADOOP_COMMON_HOME}/libexec/hadoop-config.sh" ]; then . "$HADOOP_COMMON_HOME"/libexec/hadoop-config.sh elif [ -e "${HADOOP_COMMON_HOME}/bin/hadoop-config.sh" ]; then diff --git a/hadoop-mapreduce-project/hadoop-yarn/bin/slaves.sh b/hadoop-mapreduce-project/hadoop-yarn/bin/slaves.sh index e32a4514a3..ee83477901 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/bin/slaves.sh +++ b/hadoop-mapreduce-project/hadoop-yarn/bin/slaves.sh @@ -38,7 +38,9 @@ fi bin=`dirname "${BASH_SOURCE-$0}"` bin=`cd "$bin"; pwd` -. "$bin"/yarn-config.sh +DEFAULT_LIBEXEC_DIR="$bin" +HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR} +. $HADOOP_LIBEXEC_DIR/yarn-config.sh # If the slaves file is specified in the command line, # then it takes precedence over the definition in diff --git a/hadoop-mapreduce-project/hadoop-yarn/bin/start-all.sh b/hadoop-mapreduce-project/hadoop-yarn/bin/start-all.sh index e1a798f2d4..fa4fcf3d0d 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/bin/start-all.sh +++ b/hadoop-mapreduce-project/hadoop-yarn/bin/start-all.sh @@ -23,7 +23,10 @@ echo "starting yarn daemons" bin=`dirname "${BASH_SOURCE-$0}"` bin=`cd "$bin"; pwd` -. "$bin"/yarn-config.sh +DEFAULT_LIBEXEC_DIR="$bin" +HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR} +. $HADOOP_LIBEXEC_DIR/yarn-config.sh + # start resourceManager "$bin"/yarn-daemon.sh --config $YARN_CONF_DIR start resourcemanager # start nodeManager diff --git a/hadoop-mapreduce-project/hadoop-yarn/bin/stop-all.sh b/hadoop-mapreduce-project/hadoop-yarn/bin/stop-all.sh index 850af3eb01..546b67f5c9 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/bin/stop-all.sh +++ b/hadoop-mapreduce-project/hadoop-yarn/bin/stop-all.sh @@ -23,7 +23,10 @@ echo "stopping yarn daemons" bin=`dirname "${BASH_SOURCE-$0}"` bin=`cd "$bin"; pwd` -. "$bin"/yarn-config.sh +DEFAULT_LIBEXEC_DIR="$bin" +HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR} +. $HADOOP_LIBEXEC_DIR/yarn-config.sh + # stop resourceManager "$bin"/yarn-daemon.sh --config $YARN_CONF_DIR stop resourcemanager # stop nodeManager diff --git a/hadoop-mapreduce-project/hadoop-yarn/bin/yarn b/hadoop-mapreduce-project/hadoop-yarn/bin/yarn index 059bf10d07..b11c94b37e 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/bin/yarn +++ b/hadoop-mapreduce-project/hadoop-yarn/bin/yarn @@ -44,7 +44,9 @@ bin=`dirname "${BASH_SOURCE-$0}"` bin=`cd "$bin"; pwd` -. "$bin"/yarn-config.sh +DEFAULT_LIBEXEC_DIR="$bin" +HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR} +. $HADOOP_LIBEXEC_DIR/yarn-config.sh cygwin=false case "`uname`" in diff --git a/hadoop-mapreduce-project/hadoop-yarn/bin/yarn-daemon.sh b/hadoop-mapreduce-project/hadoop-yarn/bin/yarn-daemon.sh index c3d1724270..39bb76cbe9 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/bin/yarn-daemon.sh +++ b/hadoop-mapreduce-project/hadoop-yarn/bin/yarn-daemon.sh @@ -39,7 +39,9 @@ fi bin=`dirname "${BASH_SOURCE-$0}"` bin=`cd "$bin"; pwd` -. "$bin"/yarn-config.sh +DEFAULT_LIBEXEC_DIR="$bin" +HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR} +. $HADOOP_LIBEXEC_DIR/yarn-config.sh # get arguments startStop=$1 diff --git a/hadoop-mapreduce-project/hadoop-yarn/bin/yarn-daemons.sh b/hadoop-mapreduce-project/hadoop-yarn/bin/yarn-daemons.sh index 4f89a6850c..e34e4ca8b1 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/bin/yarn-daemons.sh +++ b/hadoop-mapreduce-project/hadoop-yarn/bin/yarn-daemons.sh @@ -30,7 +30,9 @@ fi bin=`dirname "${BASH_SOURCE-$0}"` bin=`cd "$bin"; pwd` -. $bin/yarn-config.sh +DEFAULT_LIBEXEC_DIR="$bin" +HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR} +. $HADOOP_LIBEXEC_DIR/yarn-config.sh exec "$bin/slaves.sh" --config $YARN_CONF_DIR cd "$YARN_HOME" \; "$bin/yarn-daemon.sh" --config $YARN_CONF_DIR "$@" diff --git a/hadoop-mapreduce-project/src/contrib/raid/bin/start-raidnode-remote.sh b/hadoop-mapreduce-project/src/contrib/raid/bin/start-raidnode-remote.sh index c0aefd31ce..3e9405eb72 100644 --- a/hadoop-mapreduce-project/src/contrib/raid/bin/start-raidnode-remote.sh +++ b/hadoop-mapreduce-project/src/contrib/raid/bin/start-raidnode-remote.sh @@ -24,7 +24,9 @@ params=$# bin=`dirname "$0"` bin=`cd "$bin"; pwd` -. "$bin"/hadoop-config.sh +DEFAULT_LIBEXEC_DIR="$bin" +HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR} +. $HADOOP_LIBEXEC_DIR/hadoop-config.sh # get arguments if [ $# -ge 1 ]; then diff --git a/hadoop-mapreduce-project/src/contrib/raid/bin/start-raidnode.sh b/hadoop-mapreduce-project/src/contrib/raid/bin/start-raidnode.sh index 846aa80c68..d193bb1aeb 100644 --- a/hadoop-mapreduce-project/src/contrib/raid/bin/start-raidnode.sh +++ b/hadoop-mapreduce-project/src/contrib/raid/bin/start-raidnode.sh @@ -25,7 +25,9 @@ params=$# bin=`dirname "$0"` bin=`cd "$bin"; pwd` -. "$bin"/hadoop-config.sh +DEFAULT_LIBEXEC_DIR="$bin" +HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR} +. $HADOOP_LIBEXEC_DIR/hadoop-config.sh # get arguments if [ $# -ge 1 ]; then diff --git a/hadoop-mapreduce-project/src/contrib/raid/bin/stop-raidnode-remote.sh b/hadoop-mapreduce-project/src/contrib/raid/bin/stop-raidnode-remote.sh index ed870ca5f2..d185629b23 100644 --- a/hadoop-mapreduce-project/src/contrib/raid/bin/stop-raidnode-remote.sh +++ b/hadoop-mapreduce-project/src/contrib/raid/bin/stop-raidnode-remote.sh @@ -24,7 +24,9 @@ params=$# bin=`dirname "$0"` bin=`cd "$bin"; pwd` -. "$bin"/hadoop-config.sh +DEFAULT_LIBEXEC_DIR="$bin" +HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR} +. $HADOOP_LIBEXEC_DIR/hadoop-config.sh # get arguments if [ $# -ge 1 ]; then diff --git a/hadoop-mapreduce-project/src/contrib/raid/bin/stop-raidnode.sh b/hadoop-mapreduce-project/src/contrib/raid/bin/stop-raidnode.sh index d207e5aefe..02a61a87f7 100644 --- a/hadoop-mapreduce-project/src/contrib/raid/bin/stop-raidnode.sh +++ b/hadoop-mapreduce-project/src/contrib/raid/bin/stop-raidnode.sh @@ -25,7 +25,9 @@ params=$# bin=`dirname "$0"` bin=`cd "$bin"; pwd` -. "$bin"/hadoop-config.sh +DEFAULT_LIBEXEC_DIR="$bin" +HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR} +. $HADOOP_LIBEXEC_DIR/hadoop-config.sh # get arguments if [ $# -ge 1 ]; then From 6a01c22755cbeae86d181e3c09897d4827d5a3d4 Mon Sep 17 00:00:00 2001 From: Thomas White Date: Fri, 18 Nov 2011 05:10:40 +0000 Subject: [PATCH 08/10] =?UTF-8?q?HDFS-2543.=20HADOOP=5FPREFIX=20cannot=20b?= =?UTF-8?q?e=20overridden.=20Contributed=20by=20Bruno=20Mah=C3=A9.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1203486 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 ++ .../hadoop-hdfs/src/contrib/fuse-dfs/src/fuse_dfs_wrapper.sh | 4 +--- .../src/main/packages/deb/init.d/hadoop-secondarynamenode | 2 +- .../src/main/packages/rpm/init.d/hadoop-secondarynamenode | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index fe21d840be..a27543d301 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -112,6 +112,8 @@ Release 0.23.1 - UNRELEASED HDFS-2544. Hadoop scripts unconditionally source "$bin"/../libexec/hadoop-config.sh. (Bruno Mahé via tomwhite) + HDFS-2543. HADOOP_PREFIX cannot be overridden. (Bruno Mahé via tomwhite) + OPTIMIZATIONS HDFS-2130. Switch default checksum to CRC32C. (todd) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/fuse-dfs/src/fuse_dfs_wrapper.sh b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/fuse-dfs/src/fuse_dfs_wrapper.sh index 549b435e7f..97239cc480 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/fuse-dfs/src/fuse_dfs_wrapper.sh +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/fuse-dfs/src/fuse_dfs_wrapper.sh @@ -16,9 +16,7 @@ # limitations under the License. # -if [ "$HADOOP_PREFIX" = "" ]; then -export HADOOP_PREFIX=/usr/local/share/hadoop -fi +export HADOOP_PREFIX=${HADOOP_PREFIX:-/usr/local/share/hadoop} if [ "$OS_ARCH" = "" ]; then export OS_ARCH=amd64 diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/packages/deb/init.d/hadoop-secondarynamenode b/hadoop-hdfs-project/hadoop-hdfs/src/main/packages/deb/init.d/hadoop-secondarynamenode index 1b08cd38b8..089e372b71 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/packages/deb/init.d/hadoop-secondarynamenode +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/packages/deb/init.d/hadoop-secondarynamenode @@ -67,7 +67,7 @@ check_privsep_dir() { } export PATH="${PATH:+$PATH:}/usr/sbin:/usr/bin" -export HADOOP_PREFIX="/usr" +export HADOOP_PREFIX=${HADOOP_PREFIX:-/usr} case "$1" in start) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/packages/rpm/init.d/hadoop-secondarynamenode b/hadoop-hdfs-project/hadoop-hdfs/src/main/packages/rpm/init.d/hadoop-secondarynamenode index 81fb7445cd..d9ccfc3a9f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/packages/rpm/init.d/hadoop-secondarynamenode +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/packages/rpm/init.d/hadoop-secondarynamenode @@ -27,7 +27,7 @@ source /etc/default/hadoop-env.sh RETVAL=0 PIDFILE="${HADOOP_PID_DIR}/hadoop-hdfs-secondarynamenode.pid" desc="Hadoop secondary namenode daemon" -export HADOOP_PREFIX="/usr" +export HADOOP_PREFIX=${HADOOP_PREFIX:-/usr} start() { echo -n $"Starting $desc (hadoop-secondarynamenode): " From bd0fbbb9429e9c82ed4ef52e33fec8afb63dd9c2 Mon Sep 17 00:00:00 2001 From: Thomas White Date: Fri, 18 Nov 2011 05:14:15 +0000 Subject: [PATCH 09/10] =?UTF-8?q?MAPREDUCE-3372.=20HADOOP=5FPREFIX=20canno?= =?UTF-8?q?t=20be=20overridden.=20Contributed=20by=20Bruno=20Mah=C3=A9.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1203488 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 +++ hadoop-mapreduce-project/src/examples/python/compile | 2 +- hadoop-mapreduce-project/src/examples/python/pyAbacus/compile | 2 +- .../src/packages/deb/init.d/hadoop-historyserver | 2 +- .../src/packages/rpm/init.d/hadoop-historyserver | 2 +- 5 files changed, 7 insertions(+), 4 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index d4a1197fb6..8e4a8b3b45 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -103,6 +103,9 @@ Release 0.23.1 - Unreleased MAPREDUCE-3373. Hadoop scripts unconditionally source "$bin"/../libexec/hadoop-config.sh. (Bruno Mahé via tomwhite) + MAPREDUCE-3372. HADOOP_PREFIX cannot be overridden. + (Bruno Mahé via tomwhite) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-mapreduce-project/src/examples/python/compile b/hadoop-mapreduce-project/src/examples/python/compile index e202a1ccbb..32c80ed921 100644 --- a/hadoop-mapreduce-project/src/examples/python/compile +++ b/hadoop-mapreduce-project/src/examples/python/compile @@ -12,7 +12,7 @@ # limitations under the License. -export HADOOP_PREFIX=../../.. +export HADOOP_PREFIX=${HADOOP_PREFIX:-../../..} export CLASSPATH="$HADOOP_PREFIX/build/classes" diff --git a/hadoop-mapreduce-project/src/examples/python/pyAbacus/compile b/hadoop-mapreduce-project/src/examples/python/pyAbacus/compile index c06573b8da..641d5ff729 100644 --- a/hadoop-mapreduce-project/src/examples/python/pyAbacus/compile +++ b/hadoop-mapreduce-project/src/examples/python/pyAbacus/compile @@ -12,7 +12,7 @@ # limitations under the License. -export HADOOP_PREFIX=../../../../.. +export HADOOP_PREFIX=${HADOOP_PREFIX:-../../../../..} export CLASSPATH="$HADOOP_PREFIX/build/classes" export CLASSPATH=${CLASSPATH}:"$HADOOP_PREFIX/build/contrib/abacus/classes" diff --git a/hadoop-mapreduce-project/src/packages/deb/init.d/hadoop-historyserver b/hadoop-mapreduce-project/src/packages/deb/init.d/hadoop-historyserver index 4421f5538e..6334729cd9 100644 --- a/hadoop-mapreduce-project/src/packages/deb/init.d/hadoop-historyserver +++ b/hadoop-mapreduce-project/src/packages/deb/init.d/hadoop-historyserver @@ -67,7 +67,7 @@ check_privsep_dir() { } export PATH="${PATH:+$PATH:}/usr/sbin:/usr/bin" -export HADOOP_PREFIX="/usr" +export HADOOP_PREFIX=${HADOOP_PREFIX:-/usr} case "$1" in start) diff --git a/hadoop-mapreduce-project/src/packages/rpm/init.d/hadoop-historyserver b/hadoop-mapreduce-project/src/packages/rpm/init.d/hadoop-historyserver index 71d1658327..5677593870 100644 --- a/hadoop-mapreduce-project/src/packages/rpm/init.d/hadoop-historyserver +++ b/hadoop-mapreduce-project/src/packages/rpm/init.d/hadoop-historyserver @@ -27,7 +27,7 @@ source /etc/default/hadoop-env.sh RETVAL=0 PIDFILE="${HADOOP_PID_DIR}/hadoop-mapred-historyserver.pid" desc="Hadoop historyserver daemon" -export HADOOP_PREFIX="/usr" +export HADOOP_PREFIX=${HADOOP_PREFIX:-/usr} start() { echo -n $"Starting $desc (hadoop-historyserver): " From 905a127850d5e0cba85c2e075f989fa0f5cf129a Mon Sep 17 00:00:00 2001 From: Todd Lipcon Date: Fri, 18 Nov 2011 09:04:20 +0000 Subject: [PATCH 10/10] HDFS-2562. Refactor DN configuration variables out of DataNode class. Contributed by Todd Lipcon. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1203543 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../hdfs/server/datanode/BlockReceiver.java | 10 +- .../hdfs/server/datanode/BlockSender.java | 6 +- .../hadoop/hdfs/server/datanode/DNConf.java | 115 +++++++++++++++ .../hadoop/hdfs/server/datanode/DataNode.java | 134 ++++-------------- .../hdfs/server/datanode/DataXceiver.java | 50 +++---- .../datanode/TestInterDatanodeProtocol.java | 2 +- 7 files changed, 176 insertions(+), 144 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index a27543d301..3ee413ea7f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -114,6 +114,9 @@ Release 0.23.1 - UNRELEASED HDFS-2543. HADOOP_PREFIX cannot be overridden. (Bruno Mahé via tomwhite) + HDFS-2562. Refactor DN configuration variables out of DataNode class + (todd) + OPTIMIZATIONS HDFS-2130. Switch default checksum to CRC32C. (todd) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index 9277956e1f..4b961522d6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -185,8 +185,8 @@ class BlockReceiver implements Closeable { " while receiving block " + block + " from " + inAddr); } } - this.dropCacheBehindWrites = datanode.shouldDropCacheBehindWrites(); - this.syncBehindWrites = datanode.shouldSyncBehindWrites(); + this.dropCacheBehindWrites = datanode.getDnConf().dropCacheBehindWrites; + this.syncBehindWrites = datanode.getDnConf().syncBehindWrites; final boolean isCreate = isDatanode || isTransfer || stage == BlockConstructionStage.PIPELINE_SETUP_CREATE; @@ -249,7 +249,7 @@ public void close() throws IOException { try { if (checksumOut != null) { checksumOut.flush(); - if (datanode.syncOnClose && (cout instanceof FileOutputStream)) { + if (datanode.getDnConf().syncOnClose && (cout instanceof FileOutputStream)) { ((FileOutputStream)cout).getChannel().force(true); } checksumOut.close(); @@ -265,7 +265,7 @@ public void close() throws IOException { try { if (out != null) { out.flush(); - if (datanode.syncOnClose && (out instanceof FileOutputStream)) { + if (datanode.getDnConf().syncOnClose && (out instanceof FileOutputStream)) { ((FileOutputStream)out).getChannel().force(true); } out.close(); @@ -435,7 +435,7 @@ private void readNextPacket() throws IOException { * calculation in DFSClient to make the guess accurate. */ int chunkSize = bytesPerChecksum + checksumSize; - int chunksPerPacket = (datanode.writePacketSize - PacketHeader.PKT_HEADER_LEN + int chunksPerPacket = (datanode.getDnConf().writePacketSize - PacketHeader.PKT_HEADER_LEN + chunkSize - 1)/chunkSize; buf = ByteBuffer.allocate(PacketHeader.PKT_HEADER_LEN + Math.max(chunksPerPacket, 1) * chunkSize); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java index f4168ee1c9..cf4e803260 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java @@ -185,8 +185,8 @@ class BlockSender implements java.io.Closeable { this.corruptChecksumOk = corruptChecksumOk; this.verifyChecksum = verifyChecksum; this.clientTraceFmt = clientTraceFmt; - this.readaheadLength = datanode.getReadaheadLength(); - this.shouldDropCacheBehindRead = datanode.shouldDropCacheBehindReads(); + this.readaheadLength = datanode.getDnConf().readaheadLength; + this.shouldDropCacheBehindRead = datanode.getDnConf().dropCacheBehindReads; synchronized(datanode.data) { this.replica = getReplica(block, datanode); @@ -215,7 +215,7 @@ class BlockSender implements java.io.Closeable { // transferToFully() fails on 32 bit platforms for block sizes >= 2GB, // use normal transfer in those cases - this.transferToAllowed = datanode.transferToAllowed && + this.transferToAllowed = datanode.getDnConf().transferToAllowed && (!is32Bit || length <= Integer.MAX_VALUE); DataChecksum csum; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java new file mode 100644 index 0000000000..e4bf9a676d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SYNCONCLOSE_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SYNCONCLOSE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_TRANSFERTO_ALLOWED_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_TRANSFERTO_ALLOWED_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; + +/** + * Simple class encapsulating all of the configuration that the DataNode + * loads at startup time. + */ +class DNConf { + final int socketTimeout; + final int socketWriteTimeout; + final int socketKeepaliveTimeout; + + final boolean transferToAllowed; + final boolean dropCacheBehindWrites; + final boolean syncBehindWrites; + final boolean dropCacheBehindReads; + final boolean syncOnClose; + + + final long readaheadLength; + final long heartBeatInterval; + final long blockReportInterval; + final long deleteReportInterval; + final long initialBlockReportDelay; + final int writePacketSize; + + public DNConf(Configuration conf) { + socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, + HdfsServerConstants.READ_TIMEOUT); + socketWriteTimeout = conf.getInt(DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY, + HdfsServerConstants.WRITE_TIMEOUT); + socketKeepaliveTimeout = conf.getInt( + DFSConfigKeys.DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY, + DFSConfigKeys.DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_DEFAULT); + + /* Based on results on different platforms, we might need set the default + * to false on some of them. */ + transferToAllowed = conf.getBoolean( + DFS_DATANODE_TRANSFERTO_ALLOWED_KEY, + DFS_DATANODE_TRANSFERTO_ALLOWED_DEFAULT); + + writePacketSize = conf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY, + DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT); + + readaheadLength = conf.getLong( + DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_KEY, + DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); + dropCacheBehindWrites = conf.getBoolean( + DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_KEY, + DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_DEFAULT); + syncBehindWrites = conf.getBoolean( + DFSConfigKeys.DFS_DATANODE_SYNC_BEHIND_WRITES_KEY, + DFSConfigKeys.DFS_DATANODE_SYNC_BEHIND_WRITES_DEFAULT); + dropCacheBehindReads = conf.getBoolean( + DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_KEY, + DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_DEFAULT); + + this.blockReportInterval = conf.getLong(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, + DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT); + + long initBRDelay = conf.getLong( + DFS_BLOCKREPORT_INITIAL_DELAY_KEY, + DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT) * 1000L; + if (initBRDelay >= blockReportInterval) { + initBRDelay = 0; + DataNode.LOG.info("dfs.blockreport.initialDelay is greater than " + + "dfs.blockreport.intervalMsec." + " Setting initial delay to 0 msec:"); + } + initialBlockReportDelay = initBRDelay; + + heartBeatInterval = conf.getLong(DFS_HEARTBEAT_INTERVAL_KEY, + DFS_HEARTBEAT_INTERVAL_DEFAULT) * 1000L; + + this.deleteReportInterval = 100 * heartBeatInterval; + // do we need to sync block file contents to disk when blockfile is closed? + this.syncOnClose = conf.getBoolean(DFS_DATANODE_SYNCONCLOSE_KEY, + DFS_DATANODE_SYNCONCLOSE_DEFAULT); + + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 32fe56c63f..df8a2adcca 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -19,15 +19,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ADMIN; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY; @@ -51,17 +44,10 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SIMULATEDDATASTORAGE_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SIMULATEDDATASTORAGE_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STORAGEID_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SYNCONCLOSE_DEFAULT; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SYNCONCLOSE_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_TRANSFERTO_ALLOWED_DEFAULT; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_TRANSFERTO_ALLOWED_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_FEDERATION_NAMESERVICES; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEBHDFS_ENABLED_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEBHDFS_ENABLED_DEFAULT; import static org.apache.hadoop.hdfs.server.common.Util.now; @@ -104,7 +90,6 @@ import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HDFSPolicyProvider; import org.apache.hadoop.hdfs.HdfsConfiguration; @@ -402,10 +387,7 @@ void refreshNamenodes(Configuration conf) AtomicInteger xmitsInProgress = new AtomicInteger(); Daemon dataXceiverServer = null; ThreadGroup threadGroup = null; - long blockReportInterval; - long deleteReportInterval; - long initialBlockReportDelay = DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT * 1000L; - long heartBeatInterval; + private DNConf dnConf; private boolean heartbeatsDisabledForTests = false; private DataStorage storage = null; private HttpServer infoServer = null; @@ -415,18 +397,9 @@ void refreshNamenodes(Configuration conf) private volatile String hostName; // Host name of this datanode private static String dnThreadName; - int socketTimeout; - int socketWriteTimeout = 0; - boolean transferToAllowed = true; - private boolean dropCacheBehindWrites = false; - private boolean syncBehindWrites = false; - private boolean dropCacheBehindReads = false; - private long readaheadLength = 0; - int writePacketSize = 0; boolean isBlockTokenEnabled; BlockPoolTokenSecretManager blockPoolTokenSecretManager; - boolean syncOnClose; public DataBlockScanner blockScanner = null; private DirectoryScanner directoryScanner = null; @@ -494,51 +467,6 @@ private static String getHostName(Configuration config) return name; } - private void initConfig(Configuration conf) { - this.socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, - HdfsServerConstants.READ_TIMEOUT); - this.socketWriteTimeout = conf.getInt(DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY, - HdfsServerConstants.WRITE_TIMEOUT); - /* Based on results on different platforms, we might need set the default - * to false on some of them. */ - this.transferToAllowed = conf.getBoolean( - DFS_DATANODE_TRANSFERTO_ALLOWED_KEY, - DFS_DATANODE_TRANSFERTO_ALLOWED_DEFAULT); - this.writePacketSize = conf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY, - DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT); - - this.readaheadLength = conf.getLong( - DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_KEY, - DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); - this.dropCacheBehindWrites = conf.getBoolean( - DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_KEY, - DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_DEFAULT); - this.syncBehindWrites = conf.getBoolean( - DFSConfigKeys.DFS_DATANODE_SYNC_BEHIND_WRITES_KEY, - DFSConfigKeys.DFS_DATANODE_SYNC_BEHIND_WRITES_DEFAULT); - this.dropCacheBehindReads = conf.getBoolean( - DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_KEY, - DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_DEFAULT); - - this.blockReportInterval = conf.getLong(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, - DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT); - this.initialBlockReportDelay = conf.getLong( - DFS_BLOCKREPORT_INITIAL_DELAY_KEY, - DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT) * 1000L; - if (this.initialBlockReportDelay >= blockReportInterval) { - this.initialBlockReportDelay = 0; - LOG.info("dfs.blockreport.initialDelay is greater than " + - "dfs.blockreport.intervalMsec." + " Setting initial delay to 0 msec:"); - } - this.heartBeatInterval = conf.getLong(DFS_HEARTBEAT_INTERVAL_KEY, - DFS_HEARTBEAT_INTERVAL_DEFAULT) * 1000L; - - this.deleteReportInterval = 100 * heartBeatInterval; - // do we need to sync block file contents to disk when blockfile is closed? - this.syncOnClose = conf.getBoolean(DFS_DATANODE_SYNCONCLOSE_KEY, - DFS_DATANODE_SYNCONCLOSE_DEFAULT); - } - private void startInfoServer(Configuration conf) throws IOException { // create a servlet to serve full-file content InetSocketAddress infoSocAddr = DataNode.getInfoAddr(conf); @@ -709,7 +637,7 @@ private void initDataXceiver(Configuration conf) throws IOException { // find free port or use privileged port provided ServerSocket ss; if(secureResources == null) { - ss = (socketWriteTimeout > 0) ? + ss = (dnConf.socketWriteTimeout > 0) ? ServerSocketChannel.open().socket() : new ServerSocket(); Server.bind(ss, socAddr, 0); } else { @@ -794,11 +722,13 @@ static class BPOfferService implements Runnable { private volatile boolean shouldServiceRun = true; UpgradeManagerDatanode upgradeManager = null; private final DataNode dn; + private final DNConf dnConf; BPOfferService(InetSocketAddress nnAddr, DataNode dn) { this.dn = dn; this.bpRegistration = dn.createRegistration(); this.nnAddr = nnAddr; + this.dnConf = dn.getDnConf(); } /** @@ -900,9 +830,9 @@ void setupBP(Configuration conf) void scheduleBlockReport(long delay) { if (delay > 0) { // send BR after random delay lastBlockReport = System.currentTimeMillis() - - ( dn.blockReportInterval - DFSUtil.getRandom().nextInt((int)(delay))); + - ( dnConf.blockReportInterval - DFSUtil.getRandom().nextInt((int)(delay))); } else { // send at next heartbeat - lastBlockReport = lastHeartbeat - dn.blockReportInterval; + lastBlockReport = lastHeartbeat - dnConf.blockReportInterval; } resetBlockReportTime = true; // reset future BRs for randomness } @@ -1007,7 +937,7 @@ DatanodeCommand blockReport() throws IOException { // send block report if timer has expired. DatanodeCommand cmd = null; long startTime = now(); - if (startTime - lastBlockReport > dn.blockReportInterval) { + if (startTime - lastBlockReport > dnConf.blockReportInterval) { // Create block report long brCreateStartTime = now(); @@ -1029,7 +959,7 @@ DatanodeCommand blockReport() throws IOException { // If we have sent the first block report, then wait a random // time before we start the periodic block reports. if (resetBlockReportTime) { - lastBlockReport = startTime - DFSUtil.getRandom().nextInt((int)(dn.blockReportInterval)); + lastBlockReport = startTime - DFSUtil.getRandom().nextInt((int)(dnConf.blockReportInterval)); resetBlockReportTime = false; } else { /* say the last block report was at 8:20:14. The current report @@ -1039,7 +969,7 @@ DatanodeCommand blockReport() throws IOException { * 2) unexpected like 11:35:43, next report should be at 12:20:14 */ lastBlockReport += (now() - lastBlockReport) / - dn.blockReportInterval * dn.blockReportInterval; + dnConf.blockReportInterval * dnConf.blockReportInterval; } LOG.info("sent block report, processed command:" + cmd); } @@ -1101,10 +1031,10 @@ private synchronized void cleanUp() { */ private void offerService() throws Exception { LOG.info("For namenode " + nnAddr + " using DELETEREPORT_INTERVAL of " - + dn.deleteReportInterval + " msec " + " BLOCKREPORT_INTERVAL of " - + dn.blockReportInterval + "msec" + " Initial delay: " - + dn.initialBlockReportDelay + "msec" + "; heartBeatInterval=" - + dn.heartBeatInterval); + + dnConf.deleteReportInterval + " msec " + " BLOCKREPORT_INTERVAL of " + + dnConf.blockReportInterval + "msec" + " Initial delay: " + + dnConf.initialBlockReportDelay + "msec" + "; heartBeatInterval=" + + dnConf.heartBeatInterval); // // Now loop for a long time.... @@ -1116,7 +1046,7 @@ private void offerService() throws Exception { // // Every so often, send heartbeat or block-report // - if (startTime - lastHeartbeat > dn.heartBeatInterval) { + if (startTime - lastHeartbeat > dnConf.heartBeatInterval) { // // All heartbeat messages include following info: // -- Datanode name @@ -1140,7 +1070,7 @@ private void offerService() throws Exception { } } if (pendingReceivedRequests > 0 - || (startTime - lastDeletedReport > dn.deleteReportInterval)) { + || (startTime - lastDeletedReport > dnConf.deleteReportInterval)) { reportReceivedDeletedBlocks(); lastDeletedReport = startTime; } @@ -1157,7 +1087,7 @@ private void offerService() throws Exception { // There is no work to do; sleep until hearbeat timer elapses, // or work arrives, and then iterate again. // - long waitTime = dn.heartBeatInterval - + long waitTime = dnConf.heartBeatInterval - (System.currentTimeMillis() - lastHeartbeat); synchronized(receivedAndDeletedBlockList) { if (waitTime > 0 && pendingReceivedRequests == 0) { @@ -1180,7 +1110,7 @@ private void offerService() throws Exception { } LOG.warn("RemoteException in offerService", re); try { - long sleepTime = Math.min(1000, dn.heartBeatInterval); + long sleepTime = Math.min(1000, dnConf.heartBeatInterval); Thread.sleep(sleepTime); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); @@ -1248,7 +1178,7 @@ void register() throws IOException { LOG.info("in register:" + ";bpDNR="+bpRegistration.storageInfo); // random short delay - helps scatter the BR from all DNs - scheduleBlockReport(dn.initialBlockReportDelay); + scheduleBlockReport(dnConf.initialBlockReportDelay); } @@ -1458,11 +1388,11 @@ void startDataNode(Configuration conf, this.secureResources = resources; this.dataDirs = dataDirs; this.conf = conf; + this.dnConf = new DNConf(conf); storage = new DataStorage(); // global DN settings - initConfig(conf); registerMXBean(); initDataXceiver(conf); startInfoServer(conf); @@ -1710,7 +1640,7 @@ DatanodeRegistration getDNRegistrationByMachineName(String mName) { * Creates either NIO or regular depending on socketWriteTimeout. */ protected Socket newSocket() throws IOException { - return (socketWriteTimeout > 0) ? + return (dnConf.socketWriteTimeout > 0) ? SocketChannel.open().socket() : new Socket(); } @@ -2135,10 +2065,10 @@ public void run() { InetSocketAddress curTarget = NetUtils.createSocketAddr(targets[0].getName()); sock = newSocket(); - NetUtils.connect(sock, curTarget, socketTimeout); - sock.setSoTimeout(targets.length * socketTimeout); + NetUtils.connect(sock, curTarget, dnConf.socketTimeout); + sock.setSoTimeout(targets.length * dnConf.socketTimeout); - long writeTimeout = socketWriteTimeout + + long writeTimeout = dnConf.socketWriteTimeout + HdfsServerConstants.WRITE_TIMEOUT_EXTENSION * (targets.length-1); OutputStream baseStream = NetUtils.getOutputStream(sock, writeTimeout); out = new DataOutputStream(new BufferedOutputStream(baseStream, @@ -2581,7 +2511,7 @@ private void recoverBlock(RecoveringBlock rBlock) throws IOException { DatanodeRegistration bpReg = bpos.bpRegistration; InterDatanodeProtocol datanode = bpReg.equals(id)? this: DataNode.createInterDataNodeProtocolProxy(id, getConf(), - socketTimeout); + dnConf.socketTimeout); ReplicaRecoveryInfo info = callInitReplicaRecovery(datanode, rBlock); if (info != null && info.getGenerationStamp() >= block.getGenerationStamp() && @@ -2970,20 +2900,8 @@ public Long getBalancerBandwidth() { (DataXceiverServer) this.dataXceiverServer.getRunnable(); return dxcs.balanceThrottler.getBandwidth(); } - - long getReadaheadLength() { - return readaheadLength; - } - - boolean shouldDropCacheBehindWrites() { - return dropCacheBehindWrites; - } - - boolean shouldDropCacheBehindReads() { - return dropCacheBehindReads; - } - boolean shouldSyncBehindWrites() { - return syncBehindWrites; + DNConf getDnConf() { + return dnConf; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index d6a3963c0b..11282a5b7a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -38,7 +38,6 @@ import java.util.Arrays; import org.apache.commons.logging.Log; -import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; @@ -82,9 +81,9 @@ class DataXceiver extends Receiver implements Runnable { private final String remoteAddress; // address of remote side private final String localAddress; // local address of this daemon private final DataNode datanode; + private final DNConf dnConf; private final DataXceiverServer dataXceiverServer; - private int socketKeepaliveTimeout; private long opStartTime; //the start time of receiving an Op public DataXceiver(Socket s, DataNode datanode, @@ -95,14 +94,11 @@ public DataXceiver(Socket s, DataNode datanode, this.s = s; this.isLocal = s.getInetAddress().equals(s.getLocalAddress()); this.datanode = datanode; + this.dnConf = datanode.getDnConf(); this.dataXceiverServer = dataXceiverServer; remoteAddress = s.getRemoteSocketAddress().toString(); localAddress = s.getLocalSocketAddress().toString(); - socketKeepaliveTimeout = datanode.getConf().getInt( - DFSConfigKeys.DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY, - DFSConfigKeys.DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_DEFAULT); - if (LOG.isDebugEnabled()) { LOG.debug("Number of active connections is: " + datanode.getXceiverCount()); @@ -144,8 +140,8 @@ public void run() { try { if (opsProcessed != 0) { - assert socketKeepaliveTimeout > 0; - s.setSoTimeout(socketKeepaliveTimeout); + assert dnConf.socketKeepaliveTimeout > 0; + s.setSoTimeout(dnConf.socketKeepaliveTimeout); } op = readOp(); } catch (InterruptedIOException ignored) { @@ -180,7 +176,7 @@ public void run() { opStartTime = now(); processOp(op); ++opsProcessed; - } while (!s.isClosed() && socketKeepaliveTimeout > 0); + } while (!s.isClosed() && dnConf.socketKeepaliveTimeout > 0); } catch (Throwable t) { LOG.error(datanode.getMachineName() + ":DataXceiver error processing " + ((op == null) ? "unknown" : op.name()) + " operation " + @@ -205,7 +201,7 @@ public void readBlock(final ExtendedBlock block, final long blockOffset, final long length) throws IOException { OutputStream baseStream = NetUtils.getOutputStream(s, - datanode.socketWriteTimeout); + dnConf.socketWriteTimeout); DataOutputStream out = new DataOutputStream(new BufferedOutputStream( baseStream, HdfsConstants.SMALL_BUFFER_SIZE)); checkAccess(out, true, block, blockToken, @@ -231,13 +227,13 @@ public void readBlock(final ExtendedBlock block, } catch(IOException e) { String msg = "opReadBlock " + block + " received exception " + e; LOG.info(msg); - sendResponse(s, ERROR, msg, datanode.socketWriteTimeout); + sendResponse(s, ERROR, msg, dnConf.socketWriteTimeout); throw e; } // send op status writeSuccessWithChecksumInfo(blockSender, - getStreamWithTimeout(s, datanode.socketWriteTimeout)); + getStreamWithTimeout(s, dnConf.socketWriteTimeout)); long read = blockSender.sendBlock(out, baseStream, null); // send data @@ -335,7 +331,7 @@ public void writeBlock(final ExtendedBlock block, // reply to upstream datanode or client final DataOutputStream replyOut = new DataOutputStream( new BufferedOutputStream( - NetUtils.getOutputStream(s, datanode.socketWriteTimeout), + NetUtils.getOutputStream(s, dnConf.socketWriteTimeout), HdfsConstants.SMALL_BUFFER_SIZE)); checkAccess(replyOut, isClient, block, blockToken, Op.WRITE_BLOCK, BlockTokenSecretManager.AccessMode.WRITE); @@ -370,9 +366,9 @@ public void writeBlock(final ExtendedBlock block, mirrorTarget = NetUtils.createSocketAddr(mirrorNode); mirrorSock = datanode.newSocket(); try { - int timeoutValue = datanode.socketTimeout + int timeoutValue = dnConf.socketTimeout + (HdfsServerConstants.READ_TIMEOUT_EXTENSION * targets.length); - int writeTimeout = datanode.socketWriteTimeout + + int writeTimeout = dnConf.socketWriteTimeout + (HdfsServerConstants.WRITE_TIMEOUT_EXTENSION * targets.length); NetUtils.connect(mirrorSock, mirrorTarget, timeoutValue); mirrorSock.setSoTimeout(timeoutValue); @@ -508,7 +504,7 @@ public void transferBlock(final ExtendedBlock blk, updateCurrentThreadName(Op.TRANSFER_BLOCK + " " + blk); final DataOutputStream out = new DataOutputStream( - NetUtils.getOutputStream(s, datanode.socketWriteTimeout)); + NetUtils.getOutputStream(s, dnConf.socketWriteTimeout)); try { datanode.transferReplicaForPipelineRecovery(blk, targets, clientName); writeResponse(Status.SUCCESS, null, out); @@ -521,7 +517,7 @@ public void transferBlock(final ExtendedBlock blk, public void blockChecksum(final ExtendedBlock block, final Token blockToken) throws IOException { final DataOutputStream out = new DataOutputStream( - NetUtils.getOutputStream(s, datanode.socketWriteTimeout)); + NetUtils.getOutputStream(s, dnConf.socketWriteTimeout)); checkAccess(out, true, block, blockToken, Op.BLOCK_CHECKSUM, BlockTokenSecretManager.AccessMode.READ); updateCurrentThreadName("Reading metadata for block " + block); @@ -581,7 +577,7 @@ public void copyBlock(final ExtendedBlock block, LOG.warn("Invalid access token in request from " + remoteAddress + " for OP_COPY_BLOCK for block " + block + " : " + e.getLocalizedMessage()); - sendResponse(s, ERROR_ACCESS_TOKEN, "Invalid access token", datanode.socketWriteTimeout); + sendResponse(s, ERROR_ACCESS_TOKEN, "Invalid access token", dnConf.socketWriteTimeout); return; } @@ -591,7 +587,7 @@ public void copyBlock(final ExtendedBlock block, String msg = "Not able to copy block " + block.getBlockId() + " to " + s.getRemoteSocketAddress() + " because threads quota is exceeded."; LOG.info(msg); - sendResponse(s, ERROR, msg, datanode.socketWriteTimeout); + sendResponse(s, ERROR, msg, dnConf.socketWriteTimeout); return; } @@ -606,7 +602,7 @@ public void copyBlock(final ExtendedBlock block, // set up response stream OutputStream baseStream = NetUtils.getOutputStream( - s, datanode.socketWriteTimeout); + s, dnConf.socketWriteTimeout); reply = new DataOutputStream(new BufferedOutputStream( baseStream, HdfsConstants.SMALL_BUFFER_SIZE)); @@ -659,7 +655,7 @@ public void replaceBlock(final ExtendedBlock block, + " for OP_REPLACE_BLOCK for block " + block + " : " + e.getLocalizedMessage()); sendResponse(s, ERROR_ACCESS_TOKEN, "Invalid access token", - datanode.socketWriteTimeout); + dnConf.socketWriteTimeout); return; } } @@ -668,7 +664,7 @@ public void replaceBlock(final ExtendedBlock block, String msg = "Not able to receive block " + block.getBlockId() + " from " + s.getRemoteSocketAddress() + " because threads quota is exceeded."; LOG.warn(msg); - sendResponse(s, ERROR, msg, datanode.socketWriteTimeout); + sendResponse(s, ERROR, msg, dnConf.socketWriteTimeout); return; } @@ -684,11 +680,11 @@ public void replaceBlock(final ExtendedBlock block, InetSocketAddress proxyAddr = NetUtils.createSocketAddr( proxySource.getName()); proxySock = datanode.newSocket(); - NetUtils.connect(proxySock, proxyAddr, datanode.socketTimeout); - proxySock.setSoTimeout(datanode.socketTimeout); + NetUtils.connect(proxySock, proxyAddr, dnConf.socketTimeout); + proxySock.setSoTimeout(dnConf.socketTimeout); OutputStream baseStream = NetUtils.getOutputStream(proxySock, - datanode.socketWriteTimeout); + dnConf.socketWriteTimeout); proxyOut = new DataOutputStream(new BufferedOutputStream(baseStream, HdfsConstants.SMALL_BUFFER_SIZE)); @@ -750,7 +746,7 @@ public void replaceBlock(final ExtendedBlock block, // send response back try { - sendResponse(s, opStatus, errMsg, datanode.socketWriteTimeout); + sendResponse(s, opStatus, errMsg, dnConf.socketWriteTimeout); } catch (IOException ioe) { LOG.warn("Error writing reply back to " + s.getRemoteSocketAddress()); } @@ -826,7 +822,7 @@ private void checkAccess(DataOutputStream out, final boolean reply, if (reply) { if (out == null) { out = new DataOutputStream( - NetUtils.getOutputStream(s, datanode.socketWriteTimeout)); + NetUtils.getOutputStream(s, dnConf.socketWriteTimeout)); } BlockOpResponseProto.Builder resp = BlockOpResponseProto.newBuilder() diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java index 90869a2637..487adfe5bb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java @@ -154,7 +154,7 @@ public void testBlockMetaDataInfo() throws Exception { //connect to a data node DataNode datanode = cluster.getDataNode(datanodeinfo[0].getIpcPort()); InterDatanodeProtocol idp = DataNode.createInterDataNodeProtocolProxy( - datanodeinfo[0], conf, datanode.socketTimeout); + datanodeinfo[0], conf, datanode.getDnConf().socketTimeout); assertTrue(datanode != null); //stop block scanner, so we could compare lastScanTime