From 266a4ff6a502ebb27d1ec7468c539e304b4bcf47 Mon Sep 17 00:00:00 2001 From: Karthik Kambatla Date: Tue, 22 Jul 2014 01:30:36 +0000 Subject: [PATCH 1/4] YARN-2131. Addendum. Add a way to format the RMStateStore. (Robert Kanter via kasha) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1612443 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/hadoop-yarn/bin/yarn | 36 ++++++++++--------- .../resourcemanager/ResourceManager.java | 4 +-- 2 files changed, 22 insertions(+), 18 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/bin/yarn b/hadoop-yarn-project/hadoop-yarn/bin/yarn index a16026c7b9..200ab27d38 100644 --- a/hadoop-yarn-project/hadoop-yarn/bin/yarn +++ b/hadoop-yarn-project/hadoop-yarn/bin/yarn @@ -61,23 +61,27 @@ HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR} function print_usage(){ echo "Usage: yarn [--config confdir] COMMAND" echo "where COMMAND is one of:" - echo " resourcemanager -format deletes the RMStateStore" - echo " resourcemanager run the ResourceManager" - echo " nodemanager run a nodemanager on each slave" - echo " timelineserver run the timeline server" - echo " rmadmin admin tools" - echo " version print the version" - echo " jar run a jar file" - echo " application prints application(s) report/kill application" - echo " applicationattempt prints applicationattempt(s) report" - echo " container prints container(s) report" - echo " node prints node report(s)" - echo " logs dump container logs" - echo " classpath prints the class path needed to get the" - echo " Hadoop jar and the required libraries" - echo " daemonlog get/set the log level for each daemon" + echo " resourcemanager -format-state-store deletes the RMStateStore" + echo " resourcemanager run the ResourceManager" + echo " nodemanager run a nodemanager on each slave" + echo " timelineserver run the timeline server" + echo " rmadmin admin tools" + echo " version print the version" + echo " jar run a jar file" + echo " application prints application(s)" + echo " report/kill application" + echo " applicationattempt prints applicationattempt(s)" + echo " report" + echo " container prints container(s) report" + echo " node prints node report(s)" + echo " logs dump container logs" + echo " classpath prints the class path needed to" + echo " get the Hadoop jar and the" + echo " required libraries" + echo " daemonlog get/set the log level for each" + echo " daemon" echo " or" - echo " CLASSNAME run the class named CLASSNAME" + echo " CLASSNAME run the class named CLASSNAME" echo "Most commands print help when invoked w/o parameters." } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index e2deaa11c6..409ba4893b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -1035,8 +1035,8 @@ public static void main(String argv[]) { StringUtils.startupShutdownMessage(ResourceManager.class, argv, LOG); try { Configuration conf = new YarnConfiguration(); - // If -format, then delete RMStateStore; else startup normally - if (argv.length == 1 && argv[0].equals("-format")) { + // If -format-state-store, then delete RMStateStore; else startup normally + if (argv.length == 1 && argv[0].equals("-format-state-store")) { deleteRMStateStore(conf); } else { ResourceManager resourceManager = new ResourceManager(); From 537c361f5bc888dd74fbe59afe18d08d0746894f Mon Sep 17 00:00:00 2001 From: Junping Du Date: Tue, 22 Jul 2014 03:01:58 +0000 Subject: [PATCH 2/4] YARN-2013. The diagnostics is always the ExitCodeException stack when the container crashes. (Contributed by Tsuyoshi OZAWA) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1612449 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 + .../nodemanager/DefaultContainerExecutor.java | 20 ++- .../nodemanager/LinuxContainerExecutor.java | 19 ++- .../TestDefaultContainerExecutor.java | 121 ++++++++++++++++-- .../TestLinuxContainerExecutorWithMocks.java | 41 +++++- 5 files changed, 185 insertions(+), 19 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 19bf0b4c45..a601bb2bbc 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -56,6 +56,9 @@ Release 2.6.0 - UNRELEASED YARN-2045. Data persisted in NM should be versioned (Junping Du via jlowe) + YARN-2013. The diagnostics is always the ExitCodeException stack when the container + crashes. (Tsuyoshi OZAWA via junping_du) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java index 9e2e1113ae..d45371ad0d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.nodemanager; +import com.google.common.base.Optional; import static org.apache.hadoop.fs.CreateFlag.CREATE; import static org.apache.hadoop.fs.CreateFlag.OVERWRITE; @@ -212,10 +213,21 @@ public int launchContainer(Container container, && exitCode != ExitCode.TERMINATED.getExitCode()) { LOG.warn("Exception from container-launch with container ID: " + containerId + " and exit code: " + exitCode , e); - logOutput(shExec.getOutput()); - String diagnostics = "Exception from container-launch: " - + e + "\n" - + StringUtils.stringifyException(e) + "\n" + shExec.getOutput(); + + StringBuilder builder = new StringBuilder(); + builder.append("Exception from container-launch.\n"); + builder.append("Container id: " + containerId + "\n"); + builder.append("Exit code: " + exitCode + "\n"); + if (!Optional.fromNullable(e.getMessage()).or("").isEmpty()) { + builder.append("Exception message: " + e.getMessage() + "\n"); + } + builder.append("Stack trace: " + + StringUtils.stringifyException(e) + "\n"); + if (!shExec.getOutput().isEmpty()) { + builder.append("Shell output: " + shExec.getOutput() + "\n"); + } + String diagnostics = builder.toString(); + logOutput(diagnostics); container.handle(new ContainerDiagnosticsUpdateEvent(containerId, diagnostics)); } else { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java index cbdcb13d40..e03562678c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.nodemanager; +import com.google.common.base.Optional; import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; @@ -296,9 +297,21 @@ public int launchContainer(Container container, && exitCode != ExitCode.TERMINATED.getExitCode()) { LOG.warn("Exception from container-launch with container ID: " + containerId + " and exit code: " + exitCode , e); - logOutput(shExec.getOutput()); - String diagnostics = "Exception from container-launch: \n" - + StringUtils.stringifyException(e) + "\n" + shExec.getOutput(); + + StringBuilder builder = new StringBuilder(); + builder.append("Exception from container-launch.\n"); + builder.append("Container id: " + containerId + "\n"); + builder.append("Exit code: " + exitCode + "\n"); + if (!Optional.fromNullable(e.getMessage()).or("").isEmpty()) { + builder.append("Exception message: " + e.getMessage() + "\n"); + } + builder.append("Stack trace: " + + StringUtils.stringifyException(e) + "\n"); + if (!shExec.getOutput().isEmpty()) { + builder.append("Shell output: " + shExec.getOutput() + "\n"); + } + String diagnostics = builder.toString(); + logOutput(diagnostics); container.handle(new ContainerDiagnosticsUpdateEvent(containerId, diagnostics)); } else { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDefaultContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDefaultContainerExecutor.java index 63f3a9f127..9c86c71c83 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDefaultContainerExecutor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDefaultContainerExecutor.java @@ -18,16 +18,37 @@ package org.apache.hadoop.yarn.server.nodemanager; +import static org.apache.hadoop.fs.CreateFlag.CREATE; +import static org.apache.hadoop.fs.CreateFlag.OVERWRITE; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.doAnswer; +import static org.junit.Assert.assertTrue; + +import java.io.File; import java.io.FileNotFoundException; +import java.io.FileReader; import java.io.InputStream; import java.io.IOException; +import java.io.LineNumberReader; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.EnumSet; +import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Random; -import org.junit.Assert; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.AbstractFileSystem; @@ -45,15 +66,13 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.FakeFSDataInputStream; -import static org.apache.hadoop.fs.CreateFlag.*; - - import org.junit.AfterClass; +import org.junit.Before; import org.junit.Test; -import static org.junit.Assert.*; -import org.mockito.ArgumentMatcher; -import org.mockito.Matchers; -import static org.mockito.Mockito.*; +import org.junit.After; +import org.junit.Assert; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; public class TestDefaultContainerExecutor { @@ -191,6 +210,92 @@ public void testDirPermissions() throws Exception { } } + @Test + public void testContainerLaunchError() + throws IOException, InterruptedException { + + Path localDir = new Path(BASE_TMP_PATH, "localDir"); + List localDirs = new ArrayList(); + localDirs.add(localDir.toString()); + List logDirs = new ArrayList(); + Path logDir = new Path(BASE_TMP_PATH, "logDir"); + logDirs.add(logDir.toString()); + + Configuration conf = new Configuration(); + conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077"); + conf.set(YarnConfiguration.NM_LOCAL_DIRS, localDir.toString()); + conf.set(YarnConfiguration.NM_LOG_DIRS, logDir.toString()); + + FileContext lfs = FileContext.getLocalFSFileContext(conf); + DefaultContainerExecutor mockExec = spy(new DefaultContainerExecutor(lfs)); + mockExec.setConf(conf); + doAnswer( + new Answer() { + @Override + public Object answer(InvocationOnMock invocationOnMock) + throws Throwable { + String diagnostics = (String) invocationOnMock.getArguments()[0]; + assertTrue("Invalid Diagnostics message: " + diagnostics, + diagnostics.contains("No such file or directory")); + return null; + } + } + ).when(mockExec).logOutput(any(String.class)); + + String appSubmitter = "nobody"; + String appId = "APP_ID"; + String containerId = "CONTAINER_ID"; + Container container = mock(Container.class); + ContainerId cId = mock(ContainerId.class); + ContainerLaunchContext context = mock(ContainerLaunchContext.class); + HashMap env = new HashMap(); + + when(container.getContainerId()).thenReturn(cId); + when(container.getLaunchContext()).thenReturn(context); + try { + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocationOnMock) + throws Throwable { + ContainerDiagnosticsUpdateEvent event = + (ContainerDiagnosticsUpdateEvent) invocationOnMock + .getArguments()[0]; + assertTrue("Invalid Diagnostics message: " + + event.getDiagnosticsUpdate(), + event.getDiagnosticsUpdate().contains("No such file or directory") + ); + return null; + } + }).when(container).handle(any(ContainerDiagnosticsUpdateEvent.class)); + + when(cId.toString()).thenReturn(containerId); + when(cId.getApplicationAttemptId()).thenReturn( + ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 1), 0)); + + when(context.getEnvironment()).thenReturn(env); + + mockExec.createUserLocalDirs(localDirs, appSubmitter); + mockExec.createUserCacheDirs(localDirs, appSubmitter); + mockExec.createAppDirs(localDirs, appSubmitter, appId); + mockExec.createAppLogDirs(appId, logDirs); + + Path scriptPath = new Path("file:///bin/echo"); + Path tokensPath = new Path("file:///dev/null"); + Path workDir = localDir; + Path pidFile = new Path(workDir, "pid.txt"); + + mockExec.init(); + mockExec.activateContainer(cId, pidFile); + int ret = mockExec + .launchContainer(container, scriptPath, tokensPath, appSubmitter, + appId, workDir, localDirs, localDirs); + Assert.assertNotSame(0, ret); + } finally { + mockExec.deleteAsUser(appSubmitter, localDir); + mockExec.deleteAsUser(appSubmitter, logDir); + } + } + // @Test // public void testInit() throws IOException, InterruptedException { // Configuration conf = new Configuration(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutorWithMocks.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutorWithMocks.java index ddffa270be..2e9e8b1df9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutorWithMocks.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutorWithMocks.java @@ -19,8 +19,12 @@ package org.apache.hadoop.yarn.server.nodemanager; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.junit.Assume.assumeTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; import java.io.File; @@ -34,8 +38,6 @@ import java.util.LinkedList; import java.util.List; -import org.junit.Assert; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -46,9 +48,13 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent; +import org.junit.Assert; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; public class TestLinuxContainerExecutorWithMocks { @@ -216,7 +222,19 @@ public void testContainerLaunchError() throws IOException { conf.set(YarnConfiguration.NM_LOCAL_DIRS, "file:///bin/echo"); conf.set(YarnConfiguration.NM_LOG_DIRS, "file:///dev/null"); - mockExec = new LinuxContainerExecutor(); + mockExec = spy(new LinuxContainerExecutor()); + doAnswer( + new Answer() { + @Override + public Object answer(InvocationOnMock invocationOnMock) + throws Throwable { + String diagnostics = (String) invocationOnMock.getArguments()[0]; + assertTrue("Invalid Diagnostics message: " + diagnostics, + diagnostics.contains("badcommand")); + return null; + } + } + ).when(mockExec).logOutput(any(String.class)); dirsHandler = new LocalDirsHandlerService(); dirsHandler.init(conf); mockExec.setConf(conf); @@ -233,7 +251,22 @@ public void testContainerLaunchError() throws IOException { when(container.getContainerId()).thenReturn(cId); when(container.getLaunchContext()).thenReturn(context); - + doAnswer( + new Answer() { + @Override + public Object answer(InvocationOnMock invocationOnMock) + throws Throwable { + ContainerDiagnosticsUpdateEvent event = + (ContainerDiagnosticsUpdateEvent) invocationOnMock + .getArguments()[0]; + assertTrue("Invalid Diagnostics message: " + + event.getDiagnosticsUpdate(), + event.getDiagnosticsUpdate().contains("badcommand")); + return null; + } + } + ).when(container).handle(any(ContainerDiagnosticsUpdateEvent.class)); + when(cId.toString()).thenReturn(containerId); when(context.getEnvironment()).thenReturn(env); From 70dededdc983646f65176d979f3a0616f49f86eb Mon Sep 17 00:00:00 2001 From: Zhijie Shen Date: Tue, 22 Jul 2014 04:58:00 +0000 Subject: [PATCH 3/4] =?UTF-8?q?YARN-2270.=20Made=20TestFSDownload#testDown?= =?UTF-8?q?loadPublicWithStatCache=20be=20skipped=20when=20there=E2=80=99s?= =?UTF-8?q?=20no=20ancestor=20permissions.=20Contributed=20by=20Akira=20Aj?= =?UTF-8?q?isaka.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1612460 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 +++ .../main/java/org/apache/hadoop/yarn/util/FSDownload.java | 5 +++-- .../java/org/apache/hadoop/yarn/util/TestFSDownload.java | 7 +++++++ 3 files changed, 13 insertions(+), 2 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index a601bb2bbc..7880af5848 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -412,6 +412,9 @@ Release 2.5.0 - UNRELEASED YARN-2269. Remove external links from YARN UI. (Craig Welch via xgong) + YARN-2270. Made TestFSDownload#testDownloadPublicWithStatCache be skipped + when there’s no ancestor permissions. (Akira Ajisaka via zjshen) + Release 2.4.1 - 2014-06-23 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java index 67c3bc7948..8cc5ed3606 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java @@ -177,9 +177,10 @@ private static boolean checkPublicPermsForAll(FileSystem fs, /** * Returns true if all ancestors of the specified path have the 'execute' * permission set for all users (i.e. that other users can traverse - * the directory heirarchy to the given path) + * the directory hierarchy to the given path) */ - private static boolean ancestorsHaveExecutePermissions(FileSystem fs, + @VisibleForTesting + static boolean ancestorsHaveExecutePermissions(FileSystem fs, Path path, LoadingCache> statCache) throws IOException { Path current = path; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java index 4d0c8da21c..02ba5fa3b4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; +import static org.junit.Assume.assumeTrue; import java.io.File; import java.io.FileOutputStream; @@ -66,6 +67,7 @@ import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.util.Shell; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; @@ -308,6 +310,11 @@ public void testDownloadPublicWithStatCache() throws IOException, FileContext files = FileContext.getLocalFSFileContext(conf); Path basedir = files.makeQualified(new Path("target", TestFSDownload.class.getSimpleName())); + + // if test directory doesn't have ancestor permission, skip this test + FileSystem f = basedir.getFileSystem(conf); + assumeTrue(FSDownload.ancestorsHaveExecutePermissions(f, basedir, null)); + files.mkdir(basedir, null, true); conf.setStrings(TestFSDownload.class.getName(), basedir.toString()); From 25b0e8471ed744578b2d8e3f0debe5477b268e54 Mon Sep 17 00:00:00 2001 From: Tsz-wo Sze Date: Tue, 22 Jul 2014 07:41:24 +0000 Subject: [PATCH 4/4] HDFS-6702. Change DFSClient to pass the StorageType from the namenode to datanodes and change datanode to write block replicas using the specified storage type. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1612493 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 4 + .../apache/hadoop/hdfs/DFSOutputStream.java | 50 +++--- .../datatransfer/DataTransferProtocol.java | 21 ++- .../hdfs/protocol/datatransfer/Receiver.java | 11 +- .../hdfs/protocol/datatransfer/Sender.java | 11 +- .../hadoop/hdfs/protocolPB/PBHelper.java | 67 +++++--- .../hadoop/hdfs/server/balancer/Balancer.java | 6 +- .../hdfs/server/datanode/BPOfferService.java | 3 +- .../hdfs/server/datanode/BlockReceiver.java | 10 +- .../hadoop/hdfs/server/datanode/DataNode.java | 150 ++++++++++++++---- .../hdfs/server/datanode/DataXceiver.java | 25 +-- .../datanode/fsdataset/FsDatasetSpi.java | 9 +- .../fsdataset/impl/FsDatasetImpl.java | 74 ++++++--- .../datanode/fsdataset/impl/FsVolumeList.java | 21 ++- .../hdfs/server/protocol/BlockCommand.java | 15 +- .../src/main/proto/DatanodeProtocol.proto | 1 + .../src/main/proto/datatransfer.proto | 4 + .../hadoop-hdfs/src/main/proto/hdfs.proto | 7 + .../org/apache/hadoop/hdfs/DFSTestUtil.java | 7 +- .../hadoop/hdfs/TestDataTransferProtocol.java | 54 +++---- .../hadoop/hdfs/protocolPB/TestPBHelper.java | 4 +- .../server/datanode/BlockReportTestBase.java | 5 +- .../server/datanode/SimulatedFSDataset.java | 10 +- .../server/datanode/TestBlockRecovery.java | 6 +- .../server/datanode/TestBlockReplacement.java | 4 +- .../hdfs/server/datanode/TestDiskError.java | 5 +- .../datanode/TestSimulatedFSDataset.java | 4 +- .../fsdataset/impl/TestWriteToReplica.java | 27 ++-- 28 files changed, 436 insertions(+), 179 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index ec75643e53..acea2a3b1f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -307,6 +307,10 @@ Release 2.6.0 - UNRELEASED HDFS-6616. Add exclude-datanodes feature to WebHDFS redirection so that it will not redirect retries to the same datanode. (zhaoyunjiong via szetszwo) + HDFS-6702. Change DFSClient to pass the StorageType from the namenode to + datanodes and change datanode to write block replicas using the specified + storage type. (szetszwo) + OPTIMIZATIONS HDFS-6690. Deduplicate xattr names in memory. (wang) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index e78ee160ba..a7cb92fa26 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -313,6 +313,7 @@ class DataStreamer extends Daemon { private DataInputStream blockReplyStream; private ResponseProcessor response = null; private volatile DatanodeInfo[] nodes = null; // list of targets for current block + private volatile StorageType[] storageTypes = null; private volatile String[] storageIDs = null; private final LoadingCache excludedNodes = CacheBuilder.newBuilder() @@ -417,10 +418,12 @@ private DataStreamer(LocatedBlock lastBlock, HdfsFileStatus stat, } private void setPipeline(LocatedBlock lb) { - setPipeline(lb.getLocations(), lb.getStorageIDs()); + setPipeline(lb.getLocations(), lb.getStorageTypes(), lb.getStorageIDs()); } - private void setPipeline(DatanodeInfo[] nodes, String[] storageIDs) { + private void setPipeline(DatanodeInfo[] nodes, StorageType[] storageTypes, + String[] storageIDs) { this.nodes = nodes; + this.storageTypes = storageTypes; this.storageIDs = storageIDs; } @@ -446,7 +449,7 @@ private void endBlock() { this.setName("DataStreamer for file " + src); closeResponder(); closeStream(); - setPipeline(null, null); + setPipeline(null, null, null); stage = BlockConstructionStage.PIPELINE_SETUP_CREATE; } @@ -1031,10 +1034,12 @@ private void addDatanode2ExistingPipeline() throws IOException { //transfer replica final DatanodeInfo src = d == 0? nodes[1]: nodes[d - 1]; final DatanodeInfo[] targets = {nodes[d]}; - transfer(src, targets, lb.getBlockToken()); + final StorageType[] targetStorageTypes = {storageTypes[d]}; + transfer(src, targets, targetStorageTypes, lb.getBlockToken()); } private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets, + final StorageType[] targetStorageTypes, final Token blockToken) throws IOException { //transfer replica to the new datanode Socket sock = null; @@ -1056,7 +1061,7 @@ private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets, //send the TRANSFER_BLOCK request new Sender(out).transferBlock(block, blockToken, dfsClient.clientName, - targets); + targets, targetStorageTypes); out.flush(); //ack @@ -1135,16 +1140,15 @@ private boolean setupPipelineForAppendOrRecovery() throws IOException { failed.add(nodes[errorIndex]); DatanodeInfo[] newnodes = new DatanodeInfo[nodes.length-1]; - System.arraycopy(nodes, 0, newnodes, 0, errorIndex); - System.arraycopy(nodes, errorIndex+1, newnodes, errorIndex, - newnodes.length-errorIndex); + arraycopy(nodes, newnodes, errorIndex); + + final StorageType[] newStorageTypes = new StorageType[newnodes.length]; + arraycopy(storageTypes, newStorageTypes, errorIndex); final String[] newStorageIDs = new String[newnodes.length]; - System.arraycopy(storageIDs, 0, newStorageIDs, 0, errorIndex); - System.arraycopy(storageIDs, errorIndex+1, newStorageIDs, errorIndex, - newStorageIDs.length-errorIndex); + arraycopy(storageIDs, newStorageIDs, errorIndex); - setPipeline(newnodes, newStorageIDs); + setPipeline(newnodes, newStorageTypes, newStorageIDs); // Just took care of a node error while waiting for a node restart if (restartingNodeIndex >= 0) { @@ -1181,7 +1185,7 @@ private boolean setupPipelineForAppendOrRecovery() throws IOException { // set up the pipeline again with the remaining nodes if (failPacket) { // for testing - success = createBlockOutputStream(nodes, newGS, isRecovery); + success = createBlockOutputStream(nodes, storageTypes, newGS, isRecovery); failPacket = false; try { // Give DNs time to send in bad reports. In real situations, @@ -1190,7 +1194,7 @@ private boolean setupPipelineForAppendOrRecovery() throws IOException { Thread.sleep(2000); } catch (InterruptedException ie) {} } else { - success = createBlockOutputStream(nodes, newGS, isRecovery); + success = createBlockOutputStream(nodes, storageTypes, newGS, isRecovery); } if (restartingNodeIndex >= 0) { @@ -1242,6 +1246,7 @@ private boolean setupPipelineForAppendOrRecovery() throws IOException { private LocatedBlock nextBlockOutputStream() throws IOException { LocatedBlock lb = null; DatanodeInfo[] nodes = null; + StorageType[] storageTypes = null; int count = dfsClient.getConf().nBlockWriteRetry; boolean success = false; ExtendedBlock oldBlock = block; @@ -1264,11 +1269,12 @@ private LocatedBlock nextBlockOutputStream() throws IOException { bytesSent = 0; accessToken = lb.getBlockToken(); nodes = lb.getLocations(); + storageTypes = lb.getStorageTypes(); // // Connect to first DataNode in the list. // - success = createBlockOutputStream(nodes, 0L, false); + success = createBlockOutputStream(nodes, storageTypes, 0L, false); if (!success) { DFSClient.LOG.info("Abandoning " + block); @@ -1289,8 +1295,8 @@ private LocatedBlock nextBlockOutputStream() throws IOException { // connects to the first datanode in the pipeline // Returns true if success, otherwise return failure. // - private boolean createBlockOutputStream(DatanodeInfo[] nodes, long newGS, - boolean recoveryFlag) { + private boolean createBlockOutputStream(DatanodeInfo[] nodes, + StorageType[] nodeStorageTypes, long newGS, boolean recoveryFlag) { if (nodes.length == 0) { DFSClient.LOG.info("nodes are empty for write pipeline of block " + block); @@ -1332,9 +1338,10 @@ private boolean createBlockOutputStream(DatanodeInfo[] nodes, long newGS, // Xmit header info to datanode // + BlockConstructionStage bcs = recoveryFlag? stage.getRecoveryStage(): stage; // send the request - new Sender(out).writeBlock(block, accessToken, dfsClient.clientName, - nodes, null, recoveryFlag? stage.getRecoveryStage() : stage, + new Sender(out).writeBlock(block, nodeStorageTypes[0], accessToken, + dfsClient.clientName, nodes, nodeStorageTypes, null, bcs, nodes.length, block.getNumBytes(), bytesSent, newGS, checksum, cachingStrategy.get()); @@ -2197,4 +2204,9 @@ ExtendedBlock getBlock() { public long getFileId() { return fileId; } + + private static void arraycopy(T[] srcs, T[] dsts, int skipIndex) { + System.arraycopy(srcs, 0, dsts, 0, skipIndex); + System.arraycopy(srcs, skipIndex+1, dsts, skipIndex, dsts.length-skipIndex); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java index d620c6b502..d54d5bed00 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java @@ -23,6 +23,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; @@ -71,11 +72,20 @@ public void readBlock(final ExtendedBlock blk, /** * Write a block to a datanode pipeline. - * + * The receiver datanode of this call is the next datanode in the pipeline. + * The other downstream datanodes are specified by the targets parameter. + * Note that the receiver {@link DatanodeInfo} is not required in the + * parameter list since the receiver datanode knows its info. However, the + * {@link StorageType} for storing the replica in the receiver datanode is a + * parameter since the receiver datanode may support multiple storage types. + * * @param blk the block being written. + * @param storageType for storing the replica in the receiver datanode. * @param blockToken security token for accessing the block. * @param clientName client's name. - * @param targets target datanodes in the pipeline. + * @param targets other downstream datanodes in the pipeline. + * @param targetStorageTypes target {@link StorageType}s corresponding + * to the target datanodes. * @param source source datanode. * @param stage pipeline stage. * @param pipelineSize the size of the pipeline. @@ -84,9 +94,11 @@ public void readBlock(final ExtendedBlock blk, * @param latestGenerationStamp the latest generation stamp of the block. */ public void writeBlock(final ExtendedBlock blk, + final StorageType storageType, final Token blockToken, final String clientName, final DatanodeInfo[] targets, + final StorageType[] targetStorageTypes, final DatanodeInfo source, final BlockConstructionStage stage, final int pipelineSize, @@ -110,7 +122,8 @@ public void writeBlock(final ExtendedBlock blk, public void transferBlock(final ExtendedBlock blk, final Token blockToken, final String clientName, - final DatanodeInfo[] targets) throws IOException; + final DatanodeInfo[] targets, + final StorageType[] targetStorageTypes) throws IOException; /** * Request short circuit access file descriptors from a DataNode. @@ -148,11 +161,13 @@ public void requestShortCircuitFds(final ExtendedBlock blk, * It is used for balancing purpose. * * @param blk the block being replaced. + * @param storageType the {@link StorageType} for storing the block. * @param blockToken security token for accessing the block. * @param delHint the hint for deleting the block in the original datanode. * @param source the source datanode for receiving the block. */ public void replaceBlock(final ExtendedBlock blk, + final StorageType storageType, final Token blockToken, final String delHint, final DatanodeInfo source) throws IOException; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java index 5a80f689da..a09437c0b0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java @@ -25,6 +25,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto; @@ -121,10 +122,13 @@ private void opReadBlock() throws IOException { /** Receive OP_WRITE_BLOCK */ private void opWriteBlock(DataInputStream in) throws IOException { final OpWriteBlockProto proto = OpWriteBlockProto.parseFrom(vintPrefixed(in)); + final DatanodeInfo[] targets = PBHelper.convert(proto.getTargetsList()); writeBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()), + PBHelper.convertStorageType(proto.getStorageType()), PBHelper.convert(proto.getHeader().getBaseHeader().getToken()), proto.getHeader().getClientName(), - PBHelper.convert(proto.getTargetsList()), + targets, + PBHelper.convertStorageTypes(proto.getTargetStorageTypesList(), targets.length), PBHelper.convert(proto.getSource()), fromProto(proto.getStage()), proto.getPipelineSize(), @@ -140,10 +144,12 @@ private void opWriteBlock(DataInputStream in) throws IOException { private void opTransferBlock(DataInputStream in) throws IOException { final OpTransferBlockProto proto = OpTransferBlockProto.parseFrom(vintPrefixed(in)); + final DatanodeInfo[] targets = PBHelper.convert(proto.getTargetsList()); transferBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()), PBHelper.convert(proto.getHeader().getBaseHeader().getToken()), proto.getHeader().getClientName(), - PBHelper.convert(proto.getTargetsList())); + targets, + PBHelper.convertStorageTypes(proto.getTargetStorageTypesList(), targets.length)); } /** Receive {@link Op#REQUEST_SHORT_CIRCUIT_FDS} */ @@ -176,6 +182,7 @@ private void opRequestShortCircuitShm(DataInputStream in) throws IOException { private void opReplaceBlock(DataInputStream in) throws IOException { OpReplaceBlockProto proto = OpReplaceBlockProto.parseFrom(vintPrefixed(in)); replaceBlock(PBHelper.convert(proto.getHeader().getBlock()), + PBHelper.convertStorageType(proto.getStorageType()), PBHelper.convert(proto.getHeader().getToken()), proto.getDelHint(), PBHelper.convert(proto.getSource())); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java index c03f33fb27..68da52399c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java @@ -25,6 +25,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto; @@ -111,9 +112,11 @@ public void readBlock(final ExtendedBlock blk, @Override public void writeBlock(final ExtendedBlock blk, + final StorageType storageType, final Token blockToken, final String clientName, final DatanodeInfo[] targets, + final StorageType[] targetStorageTypes, final DatanodeInfo source, final BlockConstructionStage stage, final int pipelineSize, @@ -130,7 +133,9 @@ public void writeBlock(final ExtendedBlock blk, OpWriteBlockProto.Builder proto = OpWriteBlockProto.newBuilder() .setHeader(header) + .setStorageType(PBHelper.convertStorageType(storageType)) .addAllTargets(PBHelper.convert(targets, 1)) + .addAllTargetStorageTypes(PBHelper.convertStorageTypes(targetStorageTypes, 1)) .setStage(toProto(stage)) .setPipelineSize(pipelineSize) .setMinBytesRcvd(minBytesRcvd) @@ -150,12 +155,14 @@ public void writeBlock(final ExtendedBlock blk, public void transferBlock(final ExtendedBlock blk, final Token blockToken, final String clientName, - final DatanodeInfo[] targets) throws IOException { + final DatanodeInfo[] targets, + final StorageType[] targetStorageTypes) throws IOException { OpTransferBlockProto proto = OpTransferBlockProto.newBuilder() .setHeader(DataTransferProtoUtil.buildClientHeader( blk, clientName, blockToken)) .addAllTargets(PBHelper.convert(targets)) + .addAllTargetStorageTypes(PBHelper.convertStorageTypes(targetStorageTypes)) .build(); send(out, Op.TRANSFER_BLOCK, proto); @@ -196,11 +203,13 @@ public void requestShortCircuitShm(String clientName) throws IOException { @Override public void replaceBlock(final ExtendedBlock blk, + final StorageType storageType, final Token blockToken, final String delHint, final DatanodeInfo source) throws IOException { OpReplaceBlockProto proto = OpReplaceBlockProto.newBuilder() .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken)) + .setStorageType(PBHelper.convertStorageType(storageType)) .setDelHint(delHint) .setSource(PBHelper.convertDatanodeInfo(source)) .build(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index e9435024be..bd7e5f1391 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@ -150,6 +150,7 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.SnapshottableDirectoryStatusProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageInfoProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypeProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypesProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageUuidsProto; import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalInfoProto; import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.GetXAttrsResponseProto; @@ -674,14 +675,8 @@ public static LocatedBlock convert(LocatedBlockProto proto) { targets[i] = PBHelper.convert(locs.get(i)); } - final int storageTypesCount = proto.getStorageTypesCount(); - final StorageType[] storageTypes; - if (storageTypesCount == 0) { - storageTypes = null; - } else { - Preconditions.checkState(storageTypesCount == locs.size()); - storageTypes = convertStorageTypeProtos(proto.getStorageTypesList()); - } + final StorageType[] storageTypes = convertStorageTypes( + proto.getStorageTypesList(), locs.size()); final int storageIDsCount = proto.getStorageIDsCount(); final String[] storageIDs; @@ -969,6 +964,20 @@ public static BlockCommand convert(BlockCommandProto blkCmd) { targets[i] = PBHelper.convert(targetList.get(i)); } + StorageType[][] targetStorageTypes = new StorageType[targetList.size()][]; + List targetStorageTypesList = blkCmd.getTargetStorageTypesList(); + if (targetStorageTypesList.isEmpty()) { // missing storage types + for(int i = 0; i < targetStorageTypes.length; i++) { + targetStorageTypes[i] = new StorageType[targets[i].length]; + Arrays.fill(targetStorageTypes[i], StorageType.DEFAULT); + } + } else { + for(int i = 0; i < targetStorageTypes.length; i++) { + List p = targetStorageTypesList.get(i).getStorageTypesList(); + targetStorageTypes[i] = p.toArray(new StorageType[p.size()]); + } + } + List targetStorageUuidsList = blkCmd.getTargetStorageUuidsList(); String[][] targetStorageIDs = new String[targetStorageUuidsList.size()][]; for(int i = 0; i < targetStorageIDs.length; i++) { @@ -991,7 +1000,7 @@ public static BlockCommand convert(BlockCommandProto blkCmd) { throw new AssertionError("Unknown action type: " + blkCmd.getAction()); } return new BlockCommand(action, blkCmd.getBlockPoolId(), blocks, targets, - targetStorageIDs); + targetStorageTypes, targetStorageIDs); } public static BlockIdCommand convert(BlockIdCommandProto blkIdCmd) { @@ -1605,8 +1614,25 @@ private static StorageState convertState(State state) { } } - private static StorageTypeProto convertStorageType( - StorageType type) { + public static List convertStorageTypes( + StorageType[] types) { + return convertStorageTypes(types, 0); + } + + public static List convertStorageTypes( + StorageType[] types, int startIdx) { + if (types == null) { + return null; + } + final List protos = new ArrayList( + types.length); + for (int i = startIdx; i < types.length; ++i) { + protos.add(convertStorageType(types[i])); + } + return protos; + } + + public static StorageTypeProto convertStorageType(StorageType type) { switch(type) { case DISK: return StorageTypeProto.DISK; @@ -1621,7 +1647,7 @@ private static StorageTypeProto convertStorageType( public static DatanodeStorage convert(DatanodeStorageProto s) { return new DatanodeStorage(s.getStorageUuid(), PBHelper.convertState(s.getState()), - PBHelper.convertType(s.getStorageType())); + PBHelper.convertStorageType(s.getStorageType())); } private static State convertState(StorageState state) { @@ -1634,7 +1660,7 @@ private static State convertState(StorageState state) { } } - private static StorageType convertType(StorageTypeProto type) { + public static StorageType convertStorageType(StorageTypeProto type) { switch(type) { case DISK: return StorageType.DISK; @@ -1646,11 +1672,16 @@ private static StorageType convertType(StorageTypeProto type) { } } - private static StorageType[] convertStorageTypeProtos( - List storageTypesList) { - final StorageType[] storageTypes = new StorageType[storageTypesList.size()]; - for (int i = 0; i < storageTypes.length; ++i) { - storageTypes[i] = PBHelper.convertType(storageTypesList.get(i)); + public static StorageType[] convertStorageTypes( + List storageTypesList, int expectedSize) { + final StorageType[] storageTypes = new StorageType[expectedSize]; + if (storageTypesList.size() != expectedSize) { // missing storage types + Preconditions.checkState(storageTypesList.isEmpty()); + Arrays.fill(storageTypes, StorageType.DEFAULT); + } else { + for (int i = 0; i < storageTypes.length; ++i) { + storageTypes[i] = convertStorageType(storageTypesList.get(i)); + } } return storageTypes; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java index 7482b2fcdc..5dbdd643cd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java @@ -59,6 +59,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; @@ -368,7 +369,7 @@ private void dispatch() { in = new DataInputStream(new BufferedInputStream(unbufIn, HdfsConstants.IO_FILE_BUFFER_SIZE)); - sendRequest(out, eb, accessToken); + sendRequest(out, eb, StorageType.DEFAULT, accessToken); receiveResponse(in); bytesMoved.addAndGet(block.getNumBytes()); LOG.info("Successfully moved " + this); @@ -400,8 +401,9 @@ private void dispatch() { /* Send a block replace request to the output stream*/ private void sendRequest(DataOutputStream out, ExtendedBlock eb, + StorageType storageType, Token accessToken) throws IOException { - new Sender(out).replaceBlock(eb, accessToken, + new Sender(out).replaceBlock(eb, storageType, accessToken, source.getStorageID(), proxySource.getDatanode()); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java index cab61364ed..0a6549de8f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java @@ -575,7 +575,8 @@ private boolean processCommandFromActive(DatanodeCommand cmd, switch(cmd.getAction()) { case DatanodeProtocol.DNA_TRANSFER: // Send a copy of a block to another datanode - dn.transferBlocks(bcmd.getBlockPoolId(), bcmd.getBlocks(), bcmd.getTargets()); + dn.transferBlocks(bcmd.getBlockPoolId(), bcmd.getBlocks(), + bcmd.getTargets(), bcmd.getTargetStorageTypes()); dn.metrics.incrBlocksReplicated(bcmd.getBlocks().length); break; case DatanodeProtocol.DNA_INVALIDATE: diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index 3d9ccdcca9..2f3909ba56 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -37,6 +37,7 @@ import org.apache.commons.logging.Log; import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.fs.FSOutputSummer; +import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; @@ -122,7 +123,8 @@ class BlockReceiver implements Closeable { private boolean syncOnClose; private long restartBudget; - BlockReceiver(final ExtendedBlock block, final DataInputStream in, + BlockReceiver(final ExtendedBlock block, final StorageType storageType, + final DataInputStream in, final String inAddr, final String myAddr, final BlockConstructionStage stage, final long newGs, final long minBytesRcvd, final long maxBytesRcvd, @@ -162,11 +164,11 @@ class BlockReceiver implements Closeable { // Open local disk out // if (isDatanode) { //replication or move - replicaInfo = datanode.data.createTemporary(block); + replicaInfo = datanode.data.createTemporary(storageType, block); } else { switch (stage) { case PIPELINE_SETUP_CREATE: - replicaInfo = datanode.data.createRbw(block); + replicaInfo = datanode.data.createRbw(storageType, block); datanode.notifyNamenodeReceivingBlock( block, replicaInfo.getStorageUuid()); break; @@ -198,7 +200,7 @@ class BlockReceiver implements Closeable { case TRANSFER_RBW: case TRANSFER_FINALIZED: // this is a transfer destination - replicaInfo = datanode.data.createTemporary(block); + replicaInfo = datanode.data.createTemporary(storageType, block); break; default: throw new IOException("Unsupported stage " + stage + " while receiving block " + block + " from " + inAddr); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 5f32e29cd2..b55abed7e4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -19,11 +19,66 @@ import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT; import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ADMIN; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_INTERFACE_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_INTERFACE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_NAMESERVER_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_NAMESERVER_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PLUGINS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_KEY; +import static org.apache.hadoop.util.ExitUtil.terminate; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; -import com.google.protobuf.BlockingService; +import java.io.BufferedOutputStream; +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.PrintStream; +import java.lang.management.ManagementFactory; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.URI; +import java.net.UnknownHostException; +import java.nio.channels.SocketChannel; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.management.ObjectName; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -39,10 +94,23 @@ import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HDFSPolicyProvider; import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.net.DomainPeerServer; import org.apache.hadoop.hdfs.net.TcpPeerServer; -import org.apache.hadoop.hdfs.protocol.*; -import org.apache.hadoop.hdfs.protocol.datatransfer.*; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; +import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.DatanodeLocalInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException; +import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; +import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol; +import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; +import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory; import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient; import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferServer; @@ -50,9 +118,20 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DNTransferAckProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.protocol.proto.InterDatanodeProtocolProtos.InterDatanodeProtocolService; -import org.apache.hadoop.hdfs.protocolPB.*; -import org.apache.hadoop.hdfs.security.token.block.*; +import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolPB; +import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolServerSideTranslatorPB; +import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; +import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolPB; +import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolServerSideTranslatorPB; +import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolTranslatorPB; +import org.apache.hadoop.hdfs.protocolPB.PBHelper; +import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager; import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode; +import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; +import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; +import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; @@ -65,7 +144,11 @@ import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics; import org.apache.hadoop.hdfs.server.datanode.web.resources.DatanodeWebHdfsMethods; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; -import org.apache.hadoop.hdfs.server.protocol.*; +import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; +import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; +import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol; +import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; +import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; import org.apache.hadoop.hdfs.web.WebHdfsFileSystem; import org.apache.hadoop.hdfs.web.resources.Param; import org.apache.hadoop.http.HttpConfig; @@ -88,22 +171,21 @@ import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; -import org.apache.hadoop.util.*; +import org.apache.hadoop.util.Daemon; +import org.apache.hadoop.util.DiskChecker; import org.apache.hadoop.util.DiskChecker.DiskErrorException; +import org.apache.hadoop.util.GenericOptionsParser; +import org.apache.hadoop.util.JvmPauseMonitor; +import org.apache.hadoop.util.ServicePlugin; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.Time; +import org.apache.hadoop.util.VersionInfo; import org.mortbay.util.ajax.JSON; -import javax.management.ObjectName; - -import java.io.*; -import java.lang.management.ManagementFactory; -import java.net.*; -import java.nio.channels.SocketChannel; -import java.security.PrivilegedExceptionAction; -import java.util.*; -import java.util.concurrent.atomic.AtomicInteger; - -import static org.apache.hadoop.hdfs.DFSConfigKeys.*; -import static org.apache.hadoop.util.ExitUtil.terminate; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.protobuf.BlockingService; /********************************************************** * DataNode is a class (and program) that stores a set of @@ -1475,8 +1557,8 @@ int getXmitsInProgress() { return xmitsInProgress.get(); } - private void transferBlock(ExtendedBlock block, DatanodeInfo xferTargets[]) - throws IOException { + private void transferBlock(ExtendedBlock block, DatanodeInfo[] xferTargets, + StorageType[] xferTargetStorageTypes) throws IOException { BPOfferService bpos = getBPOSForBlock(block); DatanodeRegistration bpReg = getDNRegistrationForBP(block.getBlockPoolId()); @@ -1512,16 +1594,17 @@ private void transferBlock(ExtendedBlock block, DatanodeInfo xferTargets[]) LOG.info(bpReg + " Starting thread to transfer " + block + " to " + xfersBuilder); - new Daemon(new DataTransfer(xferTargets, block, + new Daemon(new DataTransfer(xferTargets, xferTargetStorageTypes, block, BlockConstructionStage.PIPELINE_SETUP_CREATE, "")).start(); } } void transferBlocks(String poolId, Block blocks[], - DatanodeInfo xferTargets[][]) { + DatanodeInfo xferTargets[][], StorageType[][] xferTargetStorageTypes) { for (int i = 0; i < blocks.length; i++) { try { - transferBlock(new ExtendedBlock(poolId, blocks[i]), xferTargets[i]); + transferBlock(new ExtendedBlock(poolId, blocks[i]), xferTargets[i], + xferTargetStorageTypes[i]); } catch (IOException ie) { LOG.warn("Failed to transfer block " + blocks[i], ie); } @@ -1624,6 +1707,7 @@ CHECKSUM_SIZE depends on CHECKSUM_TYPE (usually, 4 for CRC32) */ private class DataTransfer implements Runnable { final DatanodeInfo[] targets; + final StorageType[] targetStorageTypes; final ExtendedBlock b; final BlockConstructionStage stage; final private DatanodeRegistration bpReg; @@ -1634,7 +1718,8 @@ private class DataTransfer implements Runnable { * Connect to the first item in the target list. Pass along the * entire target list, the block, and the data. */ - DataTransfer(DatanodeInfo targets[], ExtendedBlock b, BlockConstructionStage stage, + DataTransfer(DatanodeInfo targets[], StorageType[] targetStorageTypes, + ExtendedBlock b, BlockConstructionStage stage, final String clientname) { if (DataTransferProtocol.LOG.isDebugEnabled()) { DataTransferProtocol.LOG.debug(getClass().getSimpleName() + ": " @@ -1644,6 +1729,7 @@ private class DataTransfer implements Runnable { + ", targests=" + Arrays.asList(targets)); } this.targets = targets; + this.targetStorageTypes = targetStorageTypes; this.b = b; this.stage = stage; BPOfferService bpos = blockPoolManager.get(b.getBlockPoolId()); @@ -1702,7 +1788,8 @@ public void run() { false, false, true, DataNode.this, null, cachingStrategy); DatanodeInfo srcNode = new DatanodeInfo(bpReg); - new Sender(out).writeBlock(b, accessToken, clientname, targets, srcNode, + new Sender(out).writeBlock(b, targetStorageTypes[0], accessToken, + clientname, targets, targetStorageTypes, srcNode, stage, 0, 0, 0, 0, blockSender.getChecksum(), cachingStrategy); // send data & checksum @@ -2403,7 +2490,8 @@ private void checkReadAccess(final ExtendedBlock block) throws IOException { * @param client client name */ void transferReplicaForPipelineRecovery(final ExtendedBlock b, - final DatanodeInfo[] targets, final String client) throws IOException { + final DatanodeInfo[] targets, final StorageType[] targetStorageTypes, + final String client) throws IOException { final long storedGS; final long visible; final BlockConstructionStage stage; @@ -2436,7 +2524,7 @@ void transferReplicaForPipelineRecovery(final ExtendedBlock b, b.setNumBytes(visible); if (targets.length > 0) { - new DataTransfer(targets, b, stage, client).run(); + new DataTransfer(targets, targetStorageTypes, b, stage, client).run(); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index 7b73090515..5ef6cc7ee2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -45,6 +45,7 @@ import org.apache.commons.logging.Log; import org.apache.hadoop.hdfs.ExtendedBlockId; +import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; @@ -524,9 +525,11 @@ public void readBlock(final ExtendedBlock block, @Override public void writeBlock(final ExtendedBlock block, + final StorageType storageType, final Token blockToken, final String clientname, final DatanodeInfo[] targets, + final StorageType[] targetStorageTypes, final DatanodeInfo srcDataNode, final BlockConstructionStage stage, final int pipelineSize, @@ -590,12 +593,13 @@ public void writeBlock(final ExtendedBlock block, if (isDatanode || stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) { // open a block receiver - blockReceiver = new BlockReceiver(block, in, + blockReceiver = new BlockReceiver(block, storageType, in, peer.getRemoteAddressString(), peer.getLocalAddressString(), stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd, clientname, srcDataNode, datanode, requestedChecksum, cachingStrategy); + storageUuid = blockReceiver.getStorageUuid(); } else { storageUuid = datanode.data.recoverClose( @@ -636,10 +640,10 @@ public void writeBlock(final ExtendedBlock block, HdfsConstants.SMALL_BUFFER_SIZE)); mirrorIn = new DataInputStream(unbufMirrorIn); - new Sender(mirrorOut).writeBlock(originalBlock, blockToken, - clientname, targets, srcDataNode, stage, pipelineSize, - minBytesRcvd, maxBytesRcvd, latestGenerationStamp, requestedChecksum, - cachingStrategy); + new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0], + blockToken, clientname, targets, targetStorageTypes, srcDataNode, + stage, pipelineSize, minBytesRcvd, maxBytesRcvd, + latestGenerationStamp, requestedChecksum, cachingStrategy); mirrorOut.flush(); @@ -754,7 +758,8 @@ public void writeBlock(final ExtendedBlock block, public void transferBlock(final ExtendedBlock blk, final Token blockToken, final String clientName, - final DatanodeInfo[] targets) throws IOException { + final DatanodeInfo[] targets, + final StorageType[] targetStorageTypes) throws IOException { checkAccess(socketOut, true, blk, blockToken, Op.TRANSFER_BLOCK, BlockTokenSecretManager.AccessMode.COPY); previousOpClientName = clientName; @@ -763,7 +768,8 @@ public void transferBlock(final ExtendedBlock blk, final DataOutputStream out = new DataOutputStream( getOutputStream()); try { - datanode.transferReplicaForPipelineRecovery(blk, targets, clientName); + datanode.transferReplicaForPipelineRecovery(blk, targets, + targetStorageTypes, clientName); writeResponse(Status.SUCCESS, null, out); } finally { IOUtils.closeStream(out); @@ -941,6 +947,7 @@ public void copyBlock(final ExtendedBlock block, @Override public void replaceBlock(final ExtendedBlock block, + final StorageType storageType, final Token blockToken, final String delHint, final DatanodeInfo proxySource) throws IOException { @@ -1026,8 +1033,8 @@ public void replaceBlock(final ExtendedBlock block, DataChecksum remoteChecksum = DataTransferProtoUtil.fromProto( checksumInfo.getChecksum()); // open a block receiver and check if the block does not exist - blockReceiver = new BlockReceiver( - block, proxyReply, proxySock.getRemoteSocketAddress().toString(), + blockReceiver = new BlockReceiver(block, storageType, + proxyReply, proxySock.getRemoteSocketAddress().toString(), proxySock.getLocalSocketAddress().toString(), null, 0, 0, 0, "", null, datanode, remoteChecksum, CachingStrategy.newDropBehind()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java index 8eb083a93f..5e4f55e733 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java @@ -28,6 +28,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; @@ -176,8 +177,8 @@ public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b, long blkoff, * @return the meta info of the replica which is being written to * @throws IOException if an error occurs */ - public ReplicaInPipelineInterface createTemporary(ExtendedBlock b - ) throws IOException; + public ReplicaInPipelineInterface createTemporary(StorageType storageType, + ExtendedBlock b) throws IOException; /** * Creates a RBW replica and returns the meta info of the replica @@ -186,8 +187,8 @@ public ReplicaInPipelineInterface createTemporary(ExtendedBlock b * @return the meta info of the replica which is being written to * @throws IOException if an error occurs */ - public ReplicaInPipelineInterface createRbw(ExtendedBlock b - ) throws IOException; + public ReplicaInPipelineInterface createRbw(StorageType storageType, + ExtendedBlock b) throws IOException; /** * Recovers a RBW replica and returns the meta info of the replica diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 40d3e81ad6..b068c664fe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -17,6 +17,28 @@ */ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; +import java.io.File; +import java.io.FileDescriptor; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executor; + +import javax.management.NotCompliantMBeanException; +import javax.management.ObjectName; +import javax.management.StandardMBean; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -24,12 +46,37 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.StorageType; -import org.apache.hadoop.hdfs.protocol.*; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; +import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata; +import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException; import org.apache.hadoop.hdfs.server.common.GenerationStamp; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.common.Storage; -import org.apache.hadoop.hdfs.server.datanode.*; -import org.apache.hadoop.hdfs.server.datanode.fsdataset.*; +import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; +import org.apache.hadoop.hdfs.server.datanode.DataBlockScanner; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.DataStorage; +import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica; +import org.apache.hadoop.hdfs.server.datanode.Replica; +import org.apache.hadoop.hdfs.server.datanode.ReplicaAlreadyExistsException; +import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten; +import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline; +import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo; +import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException; +import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery; +import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered; +import org.apache.hadoop.hdfs.server.datanode.StorageLocation; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy; import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; @@ -43,15 +90,6 @@ import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.Time; -import javax.management.NotCompliantMBeanException; -import javax.management.ObjectName; -import javax.management.StandardMBean; -import java.io.*; -import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; -import java.util.*; -import java.util.concurrent.Executor; - /************************************************** * FSDataset manages a set of data blocks. Each block * has a unique name and an extent on disk. @@ -736,8 +774,8 @@ private void bumpReplicaGS(ReplicaInfo replicaInfo, } @Override // FsDatasetSpi - public synchronized ReplicaInPipeline createRbw(ExtendedBlock b) - throws IOException { + public synchronized ReplicaInPipeline createRbw(StorageType storageType, + ExtendedBlock b) throws IOException { ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), b.getBlockId()); if (replicaInfo != null) { @@ -746,7 +784,7 @@ public synchronized ReplicaInPipeline createRbw(ExtendedBlock b) " and thus cannot be created."); } // create a new block - FsVolumeImpl v = volumes.getNextVolume(b.getNumBytes()); + FsVolumeImpl v = volumes.getNextVolume(storageType, b.getNumBytes()); // create a rbw file to hold block in the designated volume File f = v.createRbwFile(b.getBlockPoolId(), b.getLocalBlock()); ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(), @@ -874,8 +912,8 @@ public synchronized ReplicaInPipeline convertTemporaryToRbw( } @Override // FsDatasetSpi - public synchronized ReplicaInPipeline createTemporary(ExtendedBlock b) - throws IOException { + public synchronized ReplicaInPipeline createTemporary(StorageType storageType, + ExtendedBlock b) throws IOException { ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), b.getBlockId()); if (replicaInfo != null) { throw new ReplicaAlreadyExistsException("Block " + b + @@ -883,7 +921,7 @@ public synchronized ReplicaInPipeline createTemporary(ExtendedBlock b) " and thus cannot be created."); } - FsVolumeImpl v = volumes.getNextVolume(b.getNumBytes()); + FsVolumeImpl v = volumes.getNextVolume(storageType, b.getNumBytes()); // create a temporary file to hold block in the designated volume File f = v.createTmpFile(b.getBlockPoolId(), b.getLocalBlock()); ReplicaInPipeline newReplicaInfo = new ReplicaInPipeline(b.getBlockId(), diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java index 76b3ee8ba9..59a5c9021c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java @@ -18,13 +18,17 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; import java.io.IOException; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy; -import org.apache.hadoop.util.Time; import org.apache.hadoop.util.DiskChecker.DiskErrorException; +import org.apache.hadoop.util.Time; class FsVolumeList { /** @@ -52,11 +56,18 @@ int numberOfFailedVolumes() { * by a single thread and next volume is chosen with no concurrent * update to {@link #volumes}. * @param blockSize free space needed on the volume + * @param storageType the desired {@link StorageType} * @return next volume to store the block in. */ - // TODO should choose volume with storage type - synchronized FsVolumeImpl getNextVolume(long blockSize) throws IOException { - return blockChooser.chooseVolume(volumes, blockSize); + synchronized FsVolumeImpl getNextVolume(StorageType storageType, + long blockSize) throws IOException { + final List list = new ArrayList(volumes.size()); + for(FsVolumeImpl v : volumes) { + if (v.getStorageType() == storageType) { + list.add(v); + } + } + return blockChooser.chooseVolume(list, blockSize); } long getDfsUsed() throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockCommand.java index 1798d664f9..f17d702f48 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockCommand.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockCommand.java @@ -21,6 +21,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair; @@ -50,6 +51,7 @@ public class BlockCommand extends DatanodeCommand { final String poolId; final Block[] blocks; final DatanodeInfo[][] targets; + final StorageType[][] targetStorageTypes; final String[][] targetStorageIDs; /** @@ -62,17 +64,20 @@ public BlockCommand(int action, String poolId, this.poolId = poolId; blocks = new Block[blocktargetlist.size()]; targets = new DatanodeInfo[blocks.length][]; + targetStorageTypes = new StorageType[blocks.length][]; targetStorageIDs = new String[blocks.length][]; for(int i = 0; i < blocks.length; i++) { BlockTargetPair p = blocktargetlist.get(i); blocks[i] = p.block; targets[i] = DatanodeStorageInfo.toDatanodeInfos(p.targets); + targetStorageTypes[i] = DatanodeStorageInfo.toStorageTypes(p.targets); targetStorageIDs[i] = DatanodeStorageInfo.toStorageIDs(p.targets); } } private static final DatanodeInfo[][] EMPTY_TARGET_DATANODES = {}; + private static final StorageType[][] EMPTY_TARGET_STORAGE_TYPES = {}; private static final String[][] EMPTY_TARGET_STORAGEIDS = {}; /** @@ -81,7 +86,7 @@ public BlockCommand(int action, String poolId, */ public BlockCommand(int action, String poolId, Block blocks[]) { this(action, poolId, blocks, EMPTY_TARGET_DATANODES, - EMPTY_TARGET_STORAGEIDS); + EMPTY_TARGET_STORAGE_TYPES, EMPTY_TARGET_STORAGEIDS); } /** @@ -89,11 +94,13 @@ public BlockCommand(int action, String poolId, Block blocks[]) { * @param blocks blocks related to the action */ public BlockCommand(int action, String poolId, Block[] blocks, - DatanodeInfo[][] targets, String[][] targetStorageIDs) { + DatanodeInfo[][] targets, StorageType[][] targetStorageTypes, + String[][] targetStorageIDs) { super(action); this.poolId = poolId; this.blocks = blocks; this.targets = targets; + this.targetStorageTypes = targetStorageTypes; this.targetStorageIDs = targetStorageIDs; } @@ -109,6 +116,10 @@ public DatanodeInfo[][] getTargets() { return targets; } + public StorageType[][] getTargetStorageTypes() { + return targetStorageTypes; + } + public String[][] getTargetStorageIDs() { return targetStorageIDs; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto index d6cc3d3777..2afcf057f7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto @@ -113,6 +113,7 @@ message BlockCommandProto { repeated BlockProto blocks = 3; repeated DatanodeInfosProto targets = 4; repeated StorageUuidsProto targetStorageUuids = 5; + repeated StorageTypesProto targetStorageTypes = 6; } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto index d914713894..9b4ba339d2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto @@ -107,17 +107,21 @@ message OpWriteBlockProto { */ required ChecksumProto requestedChecksum = 9; optional CachingStrategyProto cachingStrategy = 10; + optional StorageTypeProto storageType = 11 [default = DISK]; + repeated StorageTypeProto targetStorageTypes = 12; } message OpTransferBlockProto { required ClientOperationHeaderProto header = 1; repeated DatanodeInfoProto targets = 2; + repeated StorageTypeProto targetStorageTypes = 3; } message OpReplaceBlockProto { required BaseHeaderProto header = 1; required string delHint = 2; required DatanodeInfoProto source = 3; + optional StorageTypeProto storageType = 4 [default = DISK]; } message OpCopyBlockProto { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto index 6fbb5b3420..8f15be6e92 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto @@ -136,6 +136,13 @@ enum StorageTypeProto { SSD = 2; } +/** + * A list of storage types. + */ +message StorageTypesProto { + repeated StorageTypeProto storageTypes = 1; +} + /** * A list of storage IDs. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index bebb946d27..949713e6f4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -380,7 +380,7 @@ public static boolean allBlockReplicasCorrupt(MiniDFSCluster cluster, */ public static void waitForReplication(MiniDFSCluster cluster, ExtendedBlock b, int racks, int replicas, int neededReplicas) - throws IOException, TimeoutException, InterruptedException { + throws TimeoutException, InterruptedException { int curRacks = 0; int curReplicas = 0; int curNeededReplicas = 0; @@ -414,7 +414,7 @@ public static void waitForReplication(MiniDFSCluster cluster, ExtendedBlock b, */ public static void waitCorruptReplicas(FileSystem fs, FSNamesystem ns, Path file, ExtendedBlock b, int corruptRepls) - throws IOException, TimeoutException, InterruptedException { + throws TimeoutException, InterruptedException { int count = 0; final int ATTEMPTS = 50; int repls = ns.getBlockManager().numCorruptReplicas(b.getLocalBlock()); @@ -839,7 +839,8 @@ public static BlockOpResponseProto transferRbw(final ExtendedBlock b, // send the request new Sender(out).transferBlock(b, new Token(), - dfsClient.clientName, new DatanodeInfo[]{datanodes[1]}); + dfsClient.clientName, new DatanodeInfo[]{datanodes[1]}, + new StorageType[]{StorageType.DEFAULT}); out.flush(); return BlockOpResponseProto.parseDelimitedFrom(in); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java index 244603eac6..bcb68e9ce7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java @@ -125,17 +125,16 @@ private void sendRecvData(String testDescription, throw eof; } - LOG.info("Received: " +new String(retBuf)); - LOG.info("Expected: " + StringUtils.byteToHexString(recvBuf.toByteArray())); + String received = StringUtils.byteToHexString(retBuf); + String expected = StringUtils.byteToHexString(recvBuf.toByteArray()); + LOG.info("Received: " + received); + LOG.info("Expected: " + expected); if (eofExpected) { throw new IOException("Did not recieve IOException when an exception " + "is expected while reading from " + datanode); } - - byte[] needed = recvBuf.toByteArray(); - assertEquals(StringUtils.byteToHexString(needed), - StringUtils.byteToHexString(retBuf)); + assertEquals(expected, received); } finally { IOUtils.closeSocket(sock); } @@ -184,10 +183,7 @@ private void testWrite(ExtendedBlock block, BlockConstructionStage stage, long n String description, Boolean eofExcepted) throws IOException { sendBuf.reset(); recvBuf.reset(); - sender.writeBlock(block, BlockTokenSecretManager.DUMMY_TOKEN, "cl", - new DatanodeInfo[1], null, stage, - 0, block.getNumBytes(), block.getNumBytes(), newGS, - DEFAULT_CHECKSUM, CachingStrategy.newDefaultStrategy()); + writeBlock(block, stage, newGS, DEFAULT_CHECKSUM); if (eofExcepted) { sendResponse(Status.ERROR, null, null, recvOut); sendRecvData(description, true); @@ -343,10 +339,7 @@ public void testDataTransferProtocol() throws IOException { MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build(); try { cluster.waitActive(); - DFSClient dfsClient = new DFSClient( - new InetSocketAddress("localhost", cluster.getNameNodePort()), - conf); - datanode = dfsClient.datanodeReport(DatanodeReportType.LIVE)[0]; + datanode = cluster.getFileSystem().getDataNodeStats(DatanodeReportType.LIVE)[0]; dnAddr = NetUtils.createSocketAddr(datanode.getXferAddr()); FileSystem fileSys = cluster.getFileSystem(); @@ -381,23 +374,14 @@ public void testDataTransferProtocol() throws IOException { DataChecksum badChecksum = Mockito.spy(DEFAULT_CHECKSUM); Mockito.doReturn(-1).when(badChecksum).getBytesPerChecksum(); - sender.writeBlock(new ExtendedBlock(poolId, newBlockId), - BlockTokenSecretManager.DUMMY_TOKEN, "cl", - new DatanodeInfo[1], null, - BlockConstructionStage.PIPELINE_SETUP_CREATE, - 0, 0L, 0L, 0L, - badChecksum, CachingStrategy.newDefaultStrategy()); + writeBlock(poolId, newBlockId, badChecksum); recvBuf.reset(); sendResponse(Status.ERROR, null, null, recvOut); sendRecvData("wrong bytesPerChecksum while writing", true); sendBuf.reset(); recvBuf.reset(); - sender.writeBlock(new ExtendedBlock(poolId, ++newBlockId), - BlockTokenSecretManager.DUMMY_TOKEN, "cl", - new DatanodeInfo[1], null, - BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0L, 0L, 0L, - DEFAULT_CHECKSUM, CachingStrategy.newDefaultStrategy()); + writeBlock(poolId, ++newBlockId, DEFAULT_CHECKSUM); PacketHeader hdr = new PacketHeader( 4, // size of packet @@ -416,11 +400,7 @@ public void testDataTransferProtocol() throws IOException { // test for writing a valid zero size block sendBuf.reset(); recvBuf.reset(); - sender.writeBlock(new ExtendedBlock(poolId, ++newBlockId), - BlockTokenSecretManager.DUMMY_TOKEN, "cl", - new DatanodeInfo[1], null, - BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0L, 0L, 0L, - DEFAULT_CHECKSUM, CachingStrategy.newDefaultStrategy()); + writeBlock(poolId, ++newBlockId, DEFAULT_CHECKSUM); hdr = new PacketHeader( 8, // size of packet @@ -532,4 +512,18 @@ public void testPacketHeader() throws IOException { assertTrue(hdr.sanityCheck(99)); assertFalse(hdr.sanityCheck(100)); } + + void writeBlock(String poolId, long blockId, DataChecksum checksum) throws IOException { + writeBlock(new ExtendedBlock(poolId, blockId), + BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, checksum); + } + + void writeBlock(ExtendedBlock block, BlockConstructionStage stage, + long newGS, DataChecksum checksum) throws IOException { + sender.writeBlock(block, StorageType.DEFAULT, + BlockTokenSecretManager.DUMMY_TOKEN, "cl", + new DatanodeInfo[1], new StorageType[1], null, stage, + 0, block.getNumBytes(), block.getNumBytes(), newGS, + checksum, CachingStrategy.newDefaultStrategy()); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java index e527439bc2..6c8547ebf8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java @@ -550,8 +550,10 @@ public void testConvertBlockCommand() { dnInfos[1][0] = DFSTestUtil.getLocalDatanodeInfo(); dnInfos[1][1] = DFSTestUtil.getLocalDatanodeInfo(); String[][] storageIDs = {{"s00"}, {"s10", "s11"}}; + StorageType[][] storageTypes = {{StorageType.DEFAULT}, + {StorageType.DEFAULT, StorageType.DEFAULT}}; BlockCommand bc = new BlockCommand(DatanodeProtocol.DNA_TRANSFER, "bp1", - blocks, dnInfos, storageIDs); + blocks, dnInfos, storageTypes, storageIDs); BlockCommandProto bcProto = PBHelper.convert(bc); BlockCommand bc2 = PBHelper.convert(bcProto); assertEquals(bc.getAction(), bc2.getAction()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java index d2f58a877e..9ea6c5186a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; @@ -324,7 +325,7 @@ public void blockReport_02() throws IOException { public void blockReport_03() throws IOException { final String METHOD_NAME = GenericTestUtils.getMethodName(); Path filePath = new Path("/" + METHOD_NAME + ".dat"); - ArrayList blocks = writeFile(METHOD_NAME, FILE_SIZE, filePath); + writeFile(METHOD_NAME, FILE_SIZE, filePath); // all blocks belong to the same file, hence same BP DataNode dn = cluster.getDataNodes().get(DN_N0); @@ -363,7 +364,7 @@ public void blockReport_04() throws IOException { // Create a bogus new block which will not be present on the namenode. ExtendedBlock b = new ExtendedBlock( poolId, rand.nextLong(), 1024L, rand.nextLong()); - dn.getFSDataset().createRbw(b); + dn.getFSDataset().createRbw(StorageType.DEFAULT, b); DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index baf046af84..e3db535002 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -744,14 +744,14 @@ public synchronized ReplicaInPipelineInterface recoverRbw(ExtendedBlock b, } @Override // FsDatasetSpi - public synchronized ReplicaInPipelineInterface createRbw(ExtendedBlock b) - throws IOException { - return createTemporary(b); + public synchronized ReplicaInPipelineInterface createRbw( + StorageType storageType, ExtendedBlock b) throws IOException { + return createTemporary(storageType, b); } @Override // FsDatasetSpi - public synchronized ReplicaInPipelineInterface createTemporary(ExtendedBlock b) - throws IOException { + public synchronized ReplicaInPipelineInterface createTemporary( + StorageType storageType, ExtendedBlock b) throws IOException { if (isValidBlock(b)) { throw new ReplicaAlreadyExistsException("Block " + b + " is valid, and cannot be written to."); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java index 4ff942b1f8..a3622a465a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java @@ -57,6 +57,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSNNTopology; +import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; @@ -531,7 +532,7 @@ public void testNoReplicaUnderRecovery() throws IOException { if(LOG.isDebugEnabled()) { LOG.debug("Running " + GenericTestUtils.getMethodName()); } - dn.data.createRbw(block); + dn.data.createRbw(StorageType.DEFAULT, block); try { dn.syncBlock(rBlock, initBlockRecords(dn)); fail("Sync should fail"); @@ -554,7 +555,8 @@ public void testNotMatchedReplicaID() throws IOException { if(LOG.isDebugEnabled()) { LOG.debug("Running " + GenericTestUtils.getMethodName()); } - ReplicaInPipelineInterface replicaInfo = dn.data.createRbw(block); + ReplicaInPipelineInterface replicaInfo = dn.data.createRbw( + StorageType.DEFAULT, block); ReplicaOutputStreams streams = null; try { streams = replicaInfo.createStreams(true, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java index 02faa595e4..478b6d1546 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; @@ -264,7 +265,8 @@ private boolean replaceBlock( ExtendedBlock block, DatanodeInfo source, sock.setKeepAlive(true); // sendRequest DataOutputStream out = new DataOutputStream(sock.getOutputStream()); - new Sender(out).replaceBlock(block, BlockTokenSecretManager.DUMMY_TOKEN, + new Sender(out).replaceBlock(block, StorageType.DEFAULT, + BlockTokenSecretManager.DUMMY_TOKEN, source.getDatanodeUuid(), sourceProxy); out.flush(); // receiveResponse diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java index f75bf2465f..798b7b7c70 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; @@ -147,9 +148,9 @@ public void testReplicationError() throws Exception { DataChecksum checksum = DataChecksum.newDataChecksum( DataChecksum.Type.CRC32, 512); - new Sender(out).writeBlock(block.getBlock(), + new Sender(out).writeBlock(block.getBlock(), StorageType.DEFAULT, BlockTokenSecretManager.DUMMY_TOKEN, "", - new DatanodeInfo[0], null, + new DatanodeInfo[0], new StorageType[0], null, BlockConstructionStage.PIPELINE_SETUP_CREATE, 1, 0L, 0L, 0L, checksum, CachingStrategy.newDefaultStrategy()); out.flush(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java index d03e5ea025..bd6c3de226 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java @@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; @@ -65,7 +66,8 @@ int addSomeBlocks(SimulatedFSDataset fsdataset, int startingBlockId) ExtendedBlock b = new ExtendedBlock(bpid, i, 0, 0); // we pass expected len as zero, - fsdataset should use the sizeof actual // data written - ReplicaInPipelineInterface bInfo = fsdataset.createRbw(b); + ReplicaInPipelineInterface bInfo = fsdataset.createRbw( + StorageType.DEFAULT, b); ReplicaOutputStreams out = bInfo.createStreams(true, DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512)); try { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java index dfefc1e843..b8246c3191 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java @@ -21,6 +21,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; @@ -147,7 +148,7 @@ private ExtendedBlock[] setup(String bpid, FsDatasetImpl dataSet) throws IOExcep }; ReplicaMap replicasMap = dataSet.volumeMap; - FsVolumeImpl vol = dataSet.volumes.getNextVolume(0); + FsVolumeImpl vol = dataSet.volumes.getNextVolume(StorageType.DEFAULT, 0); ReplicaInfo replicaInfo = new FinalizedReplica( blocks[FINALIZED].getLocalBlock(), vol, vol.getCurrentDir().getParentFile()); replicasMap.add(bpid, replicaInfo); @@ -357,7 +358,7 @@ private void testWriteToRbw(FsDatasetImpl dataSet, ExtendedBlock[] blocks) throw } try { - dataSet.createRbw(blocks[FINALIZED]); + dataSet.createRbw(StorageType.DEFAULT, blocks[FINALIZED]); Assert.fail("Should not have created a replica that's already " + "finalized " + blocks[FINALIZED]); } catch (ReplicaAlreadyExistsException e) { @@ -375,7 +376,7 @@ private void testWriteToRbw(FsDatasetImpl dataSet, ExtendedBlock[] blocks) throw } try { - dataSet.createRbw(blocks[TEMPORARY]); + dataSet.createRbw(StorageType.DEFAULT, blocks[TEMPORARY]); Assert.fail("Should not have created a replica that had created as " + "temporary " + blocks[TEMPORARY]); } catch (ReplicaAlreadyExistsException e) { @@ -385,7 +386,7 @@ private void testWriteToRbw(FsDatasetImpl dataSet, ExtendedBlock[] blocks) throw 0L, blocks[RBW].getNumBytes()); // expect to be successful try { - dataSet.createRbw(blocks[RBW]); + dataSet.createRbw(StorageType.DEFAULT, blocks[RBW]); Assert.fail("Should not have created a replica that had created as RBW " + blocks[RBW]); } catch (ReplicaAlreadyExistsException e) { @@ -401,7 +402,7 @@ private void testWriteToRbw(FsDatasetImpl dataSet, ExtendedBlock[] blocks) throw } try { - dataSet.createRbw(blocks[RWR]); + dataSet.createRbw(StorageType.DEFAULT, blocks[RWR]); Assert.fail("Should not have created a replica that was waiting to be " + "recovered " + blocks[RWR]); } catch (ReplicaAlreadyExistsException e) { @@ -417,7 +418,7 @@ private void testWriteToRbw(FsDatasetImpl dataSet, ExtendedBlock[] blocks) throw } try { - dataSet.createRbw(blocks[RUR]); + dataSet.createRbw(StorageType.DEFAULT, blocks[RUR]); Assert.fail("Should not have created a replica that was under recovery " + blocks[RUR]); } catch (ReplicaAlreadyExistsException e) { @@ -434,45 +435,45 @@ private void testWriteToRbw(FsDatasetImpl dataSet, ExtendedBlock[] blocks) throw e.getMessage().contains(ReplicaNotFoundException.NON_EXISTENT_REPLICA)); } - dataSet.createRbw(blocks[NON_EXISTENT]); + dataSet.createRbw(StorageType.DEFAULT, blocks[NON_EXISTENT]); } private void testWriteToTemporary(FsDatasetImpl dataSet, ExtendedBlock[] blocks) throws IOException { try { - dataSet.createTemporary(blocks[FINALIZED]); + dataSet.createTemporary(StorageType.DEFAULT, blocks[FINALIZED]); Assert.fail("Should not have created a temporary replica that was " + "finalized " + blocks[FINALIZED]); } catch (ReplicaAlreadyExistsException e) { } try { - dataSet.createTemporary(blocks[TEMPORARY]); + dataSet.createTemporary(StorageType.DEFAULT, blocks[TEMPORARY]); Assert.fail("Should not have created a replica that had created as" + "temporary " + blocks[TEMPORARY]); } catch (ReplicaAlreadyExistsException e) { } try { - dataSet.createTemporary(blocks[RBW]); + dataSet.createTemporary(StorageType.DEFAULT, blocks[RBW]); Assert.fail("Should not have created a replica that had created as RBW " + blocks[RBW]); } catch (ReplicaAlreadyExistsException e) { } try { - dataSet.createTemporary(blocks[RWR]); + dataSet.createTemporary(StorageType.DEFAULT, blocks[RWR]); Assert.fail("Should not have created a replica that was waiting to be " + "recovered " + blocks[RWR]); } catch (ReplicaAlreadyExistsException e) { } try { - dataSet.createTemporary(blocks[RUR]); + dataSet.createTemporary(StorageType.DEFAULT, blocks[RUR]); Assert.fail("Should not have created a replica that was under recovery " + blocks[RUR]); } catch (ReplicaAlreadyExistsException e) { } - dataSet.createTemporary(blocks[NON_EXISTENT]); + dataSet.createTemporary(StorageType.DEFAULT, blocks[NON_EXISTENT]); } }