From 56928873951c4893b1978718bd029e12d49d839e Mon Sep 17 00:00:00 2001 From: Kai Zheng Date: Wed, 14 Sep 2016 17:02:11 +0800 Subject: [PATCH] HDFS-10794. [SPS]: Provide storage policy satisfy worker at DN for co-ordinating the block storage movement work. Contributed by Rakesh R --- .../datanode/StoragePolicySatisfyWorker.java | 258 ++++++++++++++++++ .../protocol/BlockStorageMovementCommand.java | 101 +++++++ .../TestStoragePolicySatisfyWorker.java | 160 +++++++++++ 3 files changed, 519 insertions(+) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockStorageMovementCommand.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java new file mode 100644 index 0000000000..6df4e816e7 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java @@ -0,0 +1,258 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode; + +import static org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.Socket; +import java.util.ArrayList; +import java.util.EnumSet; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSUtilClient; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; +import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; +import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; +import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.Daemon; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * StoragePolicySatisfyWorker handles the storage policy satisfier commands. + * These commands would be issued from NameNode as part of Datanode's heart beat + * response. BPOfferService delegates the work to this class for handling + * BlockStorageMovement commands. + */ +@InterfaceAudience.Private +public class StoragePolicySatisfyWorker { + + private static final Logger LOG = LoggerFactory + .getLogger(StoragePolicySatisfyWorker.class); + + private final DataNode datanode; + private final int ioFileBufferSize; + + private final int moverThreads; + private final ExecutorService moveExecutor; + private final CompletionService moverExecutorCompletionService; + private final List> moverTaskFutures; + + public StoragePolicySatisfyWorker(Configuration conf, DataNode datanode) { + this.datanode = datanode; + this.ioFileBufferSize = DFSUtilClient.getIoFileBufferSize(conf); + + moverThreads = conf.getInt(DFSConfigKeys.DFS_MOVER_MOVERTHREADS_KEY, + DFSConfigKeys.DFS_MOVER_MOVERTHREADS_DEFAULT); + moveExecutor = initializeBlockMoverThreadPool(moverThreads); + moverExecutorCompletionService = new ExecutorCompletionService<>( + moveExecutor); + moverTaskFutures = new ArrayList<>(); + // TODO: Needs to manage the number of concurrent moves per DataNode. + } + + private ThreadPoolExecutor initializeBlockMoverThreadPool(int num) { + LOG.debug("Block mover to satisfy storage policy; pool threads={}", num); + + ThreadPoolExecutor moverThreadPool = new ThreadPoolExecutor(1, num, 60, + TimeUnit.SECONDS, new SynchronousQueue(), + new Daemon.DaemonFactory() { + private final AtomicInteger threadIndex = new AtomicInteger(0); + + @Override + public Thread newThread(Runnable r) { + Thread t = super.newThread(r); + t.setName("BlockMoverTask-" + threadIndex.getAndIncrement()); + return t; + } + }, new ThreadPoolExecutor.CallerRunsPolicy() { + @Override + public void rejectedExecution(Runnable runnable, + ThreadPoolExecutor e) { + LOG.info("Execution for block movement to satisfy storage policy" + + " got rejected, Executing in current thread"); + // will run in the current thread. + super.rejectedExecution(runnable, e); + } + }); + + moverThreadPool.allowCoreThreadTimeOut(true); + return moverThreadPool; + } + + public void processBlockMovingTasks(long trackID, + List blockMovingInfos) { + Future moveCallable = null; + for (BlockMovingInfo blkMovingInfo : blockMovingInfos) { + assert blkMovingInfo + .getSources().length == blkMovingInfo.getTargets().length; + + for (int i = 0; i < blkMovingInfo.getSources().length; i++) { + BlockMovingTask blockMovingTask = + new BlockMovingTask(blkMovingInfo.getBlock(), + blkMovingInfo.getSources()[i], + blkMovingInfo.getTargets()[i], + blkMovingInfo.getTargetStorageTypes()[i]); + moveCallable = moverExecutorCompletionService + .submit(blockMovingTask); + moverTaskFutures.add(moveCallable); + } + } + + // TODO: Presently this function act as a blocking call, this has to be + // refined by moving the tracking logic to another tracker thread. + for (int i = 0; i < moverTaskFutures.size(); i++) { + try { + moveCallable = moverExecutorCompletionService.take(); + moveCallable.get(); + } catch (InterruptedException | ExecutionException e) { + // TODO: Failure retries and report back the error to NameNode. + LOG.error("Exception while moving block replica to target storage type", + e); + } + } + } + + /** + * This class encapsulates the process of moving the block replica to the + * given target. + */ + private class BlockMovingTask implements Callable { + private final ExtendedBlock block; + private final DatanodeInfo source; + private final DatanodeInfo target; + private final StorageType targetStorageType; + + BlockMovingTask(ExtendedBlock block, DatanodeInfo source, + DatanodeInfo target, StorageType targetStorageType) { + this.block = block; + this.source = source; + this.target = target; + this.targetStorageType = targetStorageType; + } + + @Override + public Void call() { + moveBlock(); + return null; + } + + private void moveBlock() { + LOG.info("Start moving block {}", block); + + LOG.debug("Start moving block:{} from src:{} to destin:{} to satisfy " + + "storageType:{}", block, source, target, targetStorageType); + Socket sock = null; + DataOutputStream out = null; + DataInputStream in = null; + try { + DNConf dnConf = datanode.getDnConf(); + String dnAddr = target.getXferAddr(dnConf.getConnectToDnViaHostname()); + sock = datanode.newSocket(); + NetUtils.connect(sock, NetUtils.createSocketAddr(dnAddr), + dnConf.getSocketTimeout()); + sock.setSoTimeout(2 * dnConf.getSocketTimeout()); + LOG.debug("Connecting to datanode {}", dnAddr); + + OutputStream unbufOut = sock.getOutputStream(); + InputStream unbufIn = sock.getInputStream(); + + Token accessToken = datanode.getBlockAccessToken( + block, EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE)); + + DataEncryptionKeyFactory keyFactory = datanode + .getDataEncryptionKeyFactoryForBlock(block); + IOStreamPair saslStreams = datanode.getSaslClient().socketSend(sock, + unbufOut, unbufIn, keyFactory, accessToken, target); + unbufOut = saslStreams.out; + unbufIn = saslStreams.in; + out = new DataOutputStream( + new BufferedOutputStream(unbufOut, ioFileBufferSize)); + in = new DataInputStream( + new BufferedInputStream(unbufIn, ioFileBufferSize)); + sendRequest(out, block, accessToken, source, targetStorageType); + receiveResponse(in); + + LOG.debug( + "Successfully moved block:{} from src:{} to destin:{} for" + + " satisfying storageType:{}", + block, source, target, targetStorageType); + } catch (IOException e) { + // TODO: handle failure retries + LOG.warn( + "Failed to move block:{} from src:{} to destin:{} to satisfy " + + "storageType:{}", + block, source, target, targetStorageType, e); + } finally { + IOUtils.closeStream(out); + IOUtils.closeStream(in); + IOUtils.closeSocket(sock); + } + } + + /** Send a reportedBlock replace request to the output stream. */ + private void sendRequest(DataOutputStream out, ExtendedBlock eb, + Token accessToken, DatanodeInfo srcDn, + StorageType destinStorageType) throws IOException { + new Sender(out).replaceBlock(eb, destinStorageType, accessToken, + srcDn.getDatanodeUuid(), srcDn); + } + + /** Receive a reportedBlock copy response from the input stream. */ + private void receiveResponse(DataInputStream in) throws IOException { + BlockOpResponseProto response = BlockOpResponseProto + .parseFrom(vintPrefixed(in)); + while (response.getStatus() == Status.IN_PROGRESS) { + // read intermediate responses + response = BlockOpResponseProto.parseFrom(vintPrefixed(in)); + } + String logInfo = "reportedBlock move is failed"; + DataTransferProtoUtil.checkBlockOpStatus(response, logInfo); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockStorageMovementCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockStorageMovementCommand.java new file mode 100644 index 0000000000..42ba265176 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockStorageMovementCommand.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.protocol; + +import java.util.Arrays; + +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; + +/** + * A BlockStorageMovementCommand is an instruction to a DataNode to move the + * given set of blocks to specified target DataNodes to fulfill the block + * storage policy. + * + * Upon receiving this command, this DataNode coordinates all the block movement + * by passing the details to + * {@link org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker} + * service. After the block movement this DataNode sends response back to the + * NameNode about the movement status. + */ +public class BlockStorageMovementCommand extends DatanodeCommand { + + // TODO: constructor needs to be refined based on the block movement data + // structure. + BlockStorageMovementCommand(int action) { + super(action); + } + + /** + * Stores block to storage info that can be used for block movement. + */ + public static class BlockMovingInfo { + private ExtendedBlock blk; + private DatanodeInfo[] sourceNodes; + private StorageType[] sourceStorageTypes; + private DatanodeInfo[] targetNodes; + private StorageType[] targetStorageTypes; + + public BlockMovingInfo(ExtendedBlock block, + DatanodeInfo[] sourceDnInfos, DatanodeInfo[] targetDnInfos, + StorageType[] srcStorageTypes, StorageType[] targetStorageTypes) { + this.blk = block; + this.sourceNodes = sourceDnInfos; + this.targetNodes = targetDnInfos; + this.sourceStorageTypes = srcStorageTypes; + this.targetStorageTypes = targetStorageTypes; + } + + public void addBlock(ExtendedBlock block) { + this.blk = block; + } + + public ExtendedBlock getBlock() { + return this.blk; + } + + public DatanodeInfo[] getSources() { + return sourceNodes; + } + + public DatanodeInfo[] getTargets() { + return targetNodes; + } + + public StorageType[] getTargetStorageTypes() { + return targetStorageTypes; + } + + public StorageType[] getSourceStorageTypes() { + return sourceStorageTypes; + } + + @Override + public String toString() { + return new StringBuilder().append("BlockMovingInfo(\n ") + .append("Moving block: ").append(blk).append(" From: ") + .append(Arrays.asList(sourceNodes)).append(" To: [") + .append(Arrays.asList(targetNodes)).append(")\n") + .append(" sourceStorageTypes: ") + .append(Arrays.toString(sourceStorageTypes)) + .append(" targetStorageTypes: ") + .append(Arrays.toString(targetStorageTypes)).toString(); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java new file mode 100644 index 0000000000..c7223067e1 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.server.namenode.INode; +import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Supplier; + +/** + * This class tests the behavior of moving block replica to the given storage + * type to fulfill the storage policy requirement. + */ +public class TestStoragePolicySatisfyWorker { + + private static final Logger LOG = LoggerFactory + .getLogger(TestStoragePolicySatisfyWorker.class); + + private static final int DEFAULT_BLOCK_SIZE = 100; + + private static void initConf(Configuration conf) { + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE); + conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BLOCK_SIZE); + conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L); + conf.setLong(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, + 1L); + conf.setLong(DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, 2000L); + } + + /** + * Tests to verify that the block replica is moving to ARCHIVE storage type to + * fulfill the storage policy requirement. + */ + @Test(timeout = 120000) + public void testMoveSingleBlockToAnotherDatanode() throws Exception { + final Configuration conf = new HdfsConfiguration(); + initConf(conf); + final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(4) + .storageTypes( + new StorageType[][] {{StorageType.DISK, StorageType.ARCHIVE}, + {StorageType.DISK, StorageType.ARCHIVE}, + {StorageType.DISK, StorageType.ARCHIVE}, + {StorageType.DISK, StorageType.ARCHIVE}}) + .build(); + try { + cluster.waitActive(); + final DistributedFileSystem dfs = cluster.getFileSystem(); + final String file = "/testMoveSingleBlockToAnotherDatanode"; + // write to DISK + final FSDataOutputStream out = dfs.create(new Path(file), (short) 2); + out.writeChars("testMoveSingleBlockToAnotherDatanode"); + out.close(); + + // verify before movement + LocatedBlock lb = dfs.getClient().getLocatedBlocks(file, 0).get(0); + StorageType[] storageTypes = lb.getStorageTypes(); + for (StorageType storageType : storageTypes) { + Assert.assertTrue(StorageType.DISK == storageType); + } + // move to ARCHIVE + dfs.setStoragePolicy(new Path(file), "COLD"); + + lb = dfs.getClient().getLocatedBlocks(file, 0).get(0); + DataNode src = cluster.getDataNodes().get(3); + DatanodeInfo targetDnInfo = DFSTestUtil + .getLocalDatanodeInfo(src.getXferPort()); + + // TODO: Need to revisit this when NN is implemented to be able to send + // block moving commands. + StoragePolicySatisfyWorker worker = new StoragePolicySatisfyWorker(conf, + src); + List blockMovingInfos = new ArrayList<>(); + BlockMovingInfo blockMovingInfo = prepareBlockMovingInfo( + lb.getBlock(), lb.getLocations()[0], targetDnInfo, + lb.getStorageTypes()[0], StorageType.ARCHIVE); + blockMovingInfos.add(blockMovingInfo); + INode inode = cluster.getNamesystem().getFSDirectory().getINode(file); + worker.processBlockMovingTasks(inode.getId(), + blockMovingInfos); + cluster.triggerHeartbeats(); + + // Wait till NameNode notified about the block location details + waitForLocatedBlockWithArchiveStorageType(dfs, file, 1, 30000); + } finally { + cluster.shutdown(); + } + } + + private void waitForLocatedBlockWithArchiveStorageType( + final DistributedFileSystem dfs, final String file, + int expectedArchiveCount, int timeout) throws Exception { + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + LocatedBlock lb = null; + try { + lb = dfs.getClient().getLocatedBlocks(file, 0).get(0); + } catch (IOException e) { + LOG.error("Exception while getting located blocks", e); + return false; + } + + int archiveCount = 0; + for (StorageType storageType : lb.getStorageTypes()) { + if (StorageType.ARCHIVE == storageType) { + archiveCount++; + } + } + LOG.info("Archive replica count, expected={} and actual={}", + expectedArchiveCount, archiveCount); + return expectedArchiveCount == archiveCount; + } + }, 100, timeout); + } + + BlockMovingInfo prepareBlockMovingInfo(ExtendedBlock block, + DatanodeInfo src, DatanodeInfo destin, StorageType storageType, + StorageType targetStorageType) { + return new BlockMovingInfo(block, new DatanodeInfo[] {src}, + new DatanodeInfo[] {destin}, new StorageType[] {storageType}, + new StorageType[] {targetStorageType}); + } +}