HDFS-6801. Archival Storage: Add a new data migration tool. Contributed by Tsz Wo Nicholas Sze.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-6584@1618675 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jing Zhao 2014-08-18 17:51:18 +00:00
parent cb75b6b07d
commit 5d5aae0694
6 changed files with 577 additions and 62 deletions

View File

@ -180,7 +180,23 @@ public StorageType getCreationFallback(EnumSet<StorageType> unavailables) {
public StorageType getReplicationFallback(EnumSet<StorageType> unavailables) {
return getFallback(unavailables, replicationFallbacks);
}
@Override
public int hashCode() {
return Byte.valueOf(id).hashCode();
}
@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
} else if (obj == null || !(obj instanceof BlockStoragePolicy)) {
return false;
}
final BlockStoragePolicy that = (BlockStoragePolicy)obj;
return this.id == that.id;
}
@Override
public String toString() {
return getClass().getSimpleName() + "{" + name + ":" + id
@ -193,6 +209,10 @@ public byte getId() {
return id;
}
public String getName() {
return name;
}
private static StorageType getFallback(EnumSet<StorageType> unavailables,
StorageType[] fallbacks) {
for(StorageType fb : fallbacks) {

View File

@ -362,6 +362,12 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final int DFS_BALANCER_MOVERTHREADS_DEFAULT = 1000;
public static final String DFS_BALANCER_DISPATCHERTHREADS_KEY = "dfs.balancer.dispatcherThreads";
public static final int DFS_BALANCER_DISPATCHERTHREADS_DEFAULT = 200;
public static final String DFS_MOVER_MOVEDWINWIDTH_KEY = "dfs.mover.movedWinWidth";
public static final long DFS_MOVER_MOVEDWINWIDTH_DEFAULT = 5400*1000L;
public static final String DFS_MOVER_MOVERTHREADS_KEY = "dfs.mover.moverThreads";
public static final int DFS_MOVER_MOVERTHREADS_DEFAULT = 1000;
public static final String DFS_DATANODE_ADDRESS_KEY = "dfs.datanode.address";
public static final int DFS_DATANODE_DEFAULT_PORT = 50010;
public static final String DFS_DATANODE_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_DATANODE_DEFAULT_PORT;

View File

@ -23,7 +23,6 @@
import java.io.PrintStream;
import java.net.URI;
import java.text.DateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@ -54,6 +53,7 @@
import org.apache.hadoop.hdfs.server.namenode.UnsupportedActionException;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.Tool;
@ -270,7 +270,7 @@ private long init(List<DatanodeStorageReport> reports) {
// over-utilized, above-average, below-average and under-utilized.
long overLoadedBytes = 0L, underLoadedBytes = 0L;
for(DatanodeStorageReport r : reports) {
final DDatanode dn = dispatcher.newDatanode(r);
final DDatanode dn = dispatcher.newDatanode(r.getDatanodeInfo());
for(StorageType t : StorageType.asList()) {
final Double utilization = policy.getUtilization(r, t);
if (utilization == null) { // datanode does not have such storage type
@ -294,7 +294,7 @@ private long init(List<DatanodeStorageReport> reports) {
}
g = s;
} else {
g = dn.addStorageGroup(t, maxSize2Move);
g = dn.addTarget(t, maxSize2Move);
if (thresholdDiff <= 0) { // within threshold
belowAvgUtilized.add(g);
} else {
@ -546,15 +546,10 @@ static int run(Collection<URI> namenodes, final Parameters p,
final Formatter formatter = new Formatter(System.out);
System.out.println("Time Stamp Iteration# Bytes Already Moved Bytes Left To Move Bytes Being Moved");
final List<NameNodeConnector> connectors
= new ArrayList<NameNodeConnector>(namenodes.size());
List<NameNodeConnector> connectors = Collections.emptyList();
try {
for (URI uri : namenodes) {
final NameNodeConnector nnc = new NameNodeConnector(
Balancer.class.getSimpleName(), uri, BALANCER_ID_PATH, conf);
nnc.getKeyManager().startBlockKeyUpdater();
connectors.add(nnc);
}
connectors = NameNodeConnector.newNameNodeConnectors(namenodes,
Balancer.class.getSimpleName(), BALANCER_ID_PATH, conf);
boolean done = false;
for(int iteration = 0; !done; iteration++) {
@ -579,7 +574,7 @@ static int run(Collection<URI> namenodes, final Parameters p,
}
} finally {
for(NameNodeConnector nnc : connectors) {
nnc.close();
IOUtils.cleanup(LOG, nnc);
}
}
return ExitStatus.SUCCESS.getExitCode();

View File

@ -49,6 +49,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@ -103,7 +104,8 @@ public class Dispatcher {
private final MovedBlocks<StorageGroup> movedBlocks;
/** Map (datanodeUuid,storageType -> StorageGroup) */
private final StorageGroupMap storageGroupMap = new StorageGroupMap();
private final StorageGroupMap<StorageGroup> storageGroupMap
= new StorageGroupMap<StorageGroup>();
private NetworkTopology cluster;
@ -140,18 +142,18 @@ private void removeAllButRetain(MovedBlocks<StorageGroup> movedBlocks) {
}
}
static class StorageGroupMap {
public static class StorageGroupMap<G extends StorageGroup> {
private static String toKey(String datanodeUuid, StorageType storageType) {
return datanodeUuid + ":" + storageType;
}
private final Map<String, StorageGroup> map = new HashMap<String, StorageGroup>();
private final Map<String, G> map = new HashMap<String, G>();
StorageGroup get(String datanodeUuid, StorageType storageType) {
public G get(String datanodeUuid, StorageType storageType) {
return map.get(toKey(datanodeUuid, storageType));
}
void put(StorageGroup g) {
public void put(G g) {
final String key = toKey(g.getDatanodeInfo().getDatanodeUuid(), g.storageType);
final StorageGroup existing = map.put(key, g);
Preconditions.checkState(existing == null);
@ -167,7 +169,7 @@ void clear() {
}
/** This class keeps track of a scheduled block move */
private class PendingMove {
public class PendingMove {
private DBlock block;
private Source source;
private DDatanode proxySource;
@ -176,6 +178,12 @@ private class PendingMove {
private PendingMove() {
}
public PendingMove(DBlock block, Source source, StorageGroup target) {
this.block = block;
this.source = source;
this.target = target;
}
@Override
public String toString() {
final Block b = block.getBlock();
@ -227,7 +235,7 @@ private boolean markMovedIfGoodBlock(DBlock block) {
*
* @return true if a proxy is found; otherwise false
*/
private boolean chooseProxySource() {
public boolean chooseProxySource() {
final DatanodeInfo targetDN = target.getDatanodeInfo();
// if node group is supported, first try add nodes in the same node group
if (cluster.isNodeGroupAware()) {
@ -356,8 +364,8 @@ private void reset() {
}
/** A class for keeping track of block locations in the dispatcher. */
private static class DBlock extends MovedBlocks.Locations<StorageGroup> {
DBlock(Block block) {
public static class DBlock extends MovedBlocks.Locations<StorageGroup> {
public DBlock(Block block) {
super(block);
}
}
@ -378,10 +386,10 @@ long getSize() {
}
/** A class that keeps track of a datanode. */
static class DDatanode {
public static class DDatanode {
/** A group of storages in a datanode with the same storage type. */
class StorageGroup {
public class StorageGroup {
final StorageType storageType;
final long maxSize2Move;
private long scheduledSize = 0L;
@ -390,18 +398,26 @@ private StorageGroup(StorageType storageType, long maxSize2Move) {
this.storageType = storageType;
this.maxSize2Move = maxSize2Move;
}
public StorageType getStorageType() {
return storageType;
}
private DDatanode getDDatanode() {
return DDatanode.this;
}
DatanodeInfo getDatanodeInfo() {
public DatanodeInfo getDatanodeInfo() {
return DDatanode.this.datanode;
}
/** Decide if still need to move more bytes */
synchronized boolean hasSpaceForScheduling() {
return availableSizeToMove() > 0L;
boolean hasSpaceForScheduling() {
return hasSpaceForScheduling(0L);
}
synchronized boolean hasSpaceForScheduling(long size) {
return availableSizeToMove() > size;
}
/** @return the total number of bytes that need to be moved */
@ -410,7 +426,7 @@ synchronized long availableSizeToMove() {
}
/** increment scheduled size */
synchronized void incScheduledSize(long size) {
public synchronized void incScheduledSize(long size) {
scheduledSize += size;
}
@ -436,7 +452,9 @@ public String toString() {
}
final DatanodeInfo datanode;
final EnumMap<StorageType, StorageGroup> storageMap
private final EnumMap<StorageType, Source> sourceMap
= new EnumMap<StorageType, Source>(StorageType.class);
private final EnumMap<StorageType, StorageGroup> targetMap
= new EnumMap<StorageType, StorageGroup>(StorageType.class);
protected long delayUntil = 0L;
/** blocks being moved but not confirmed yet */
@ -445,29 +463,34 @@ public String toString() {
@Override
public String toString() {
return getClass().getSimpleName() + ":" + datanode + ":" + storageMap.values();
return getClass().getSimpleName() + ":" + datanode;
}
private DDatanode(DatanodeStorageReport r, int maxConcurrentMoves) {
this.datanode = r.getDatanodeInfo();
private DDatanode(DatanodeInfo datanode, int maxConcurrentMoves) {
this.datanode = datanode;
this.maxConcurrentMoves = maxConcurrentMoves;
this.pendings = new ArrayList<PendingMove>(maxConcurrentMoves);
}
private void put(StorageType storageType, StorageGroup g) {
final StorageGroup existing = storageMap.put(storageType, g);
public DatanodeInfo getDatanodeInfo() {
return datanode;
}
private static <G extends StorageGroup> void put(StorageType storageType,
G g, EnumMap<StorageType, G> map) {
final StorageGroup existing = map.put(storageType, g);
Preconditions.checkState(existing == null);
}
StorageGroup addStorageGroup(StorageType storageType, long maxSize2Move) {
public StorageGroup addTarget(StorageType storageType, long maxSize2Move) {
final StorageGroup g = new StorageGroup(storageType, maxSize2Move);
put(storageType, g);
put(storageType, g, targetMap);
return g;
}
Source addSource(StorageType storageType, long maxSize2Move, Dispatcher d) {
public Source addSource(StorageType storageType, long maxSize2Move, Dispatcher d) {
final Source s = d.new Source(storageType, maxSize2Move, this);
put(storageType, s);
put(storageType, s, sourceMap);
return s;
}
@ -508,7 +531,7 @@ synchronized boolean removePendingBlock(PendingMove pendingBlock) {
}
/** A node that can be the sources of a block move */
class Source extends DDatanode.StorageGroup {
public class Source extends DDatanode.StorageGroup {
private final List<Task> tasks = new ArrayList<Task>(2);
private long blocksToReceive = 0L;
@ -654,13 +677,7 @@ private void dispatchBlocks() {
&& (!srcBlocks.isEmpty() || blocksToReceive > 0)) {
final PendingMove p = chooseNextMove();
if (p != null) {
// move the block
moveExecutor.execute(new Runnable() {
@Override
public void run() {
p.dispatch();
}
});
executePendingMove(p);
continue;
}
@ -716,7 +733,8 @@ public Dispatcher(NameNodeConnector nnc, Set<String> includedNodes,
this.cluster = NetworkTopology.getInstance(conf);
this.moveExecutor = Executors.newFixedThreadPool(moverThreads);
this.dispatchExecutor = Executors.newFixedThreadPool(dispatcherThreads);
this.dispatchExecutor = dispatcherThreads == 0? null
: Executors.newFixedThreadPool(dispatcherThreads);
this.maxConcurrentMovesPerNode = maxConcurrentMovesPerNode;
final boolean fallbackToSimpleAuthAllowed = conf.getBoolean(
@ -727,11 +745,15 @@ public Dispatcher(NameNodeConnector nnc, Set<String> includedNodes,
TrustedChannelResolver.getInstance(conf), fallbackToSimpleAuthAllowed);
}
StorageGroupMap getStorageGroupMap() {
public DistributedFileSystem getDistributedFileSystem() {
return nnc.getDistributedFileSystem();
}
public StorageGroupMap<StorageGroup> getStorageGroupMap() {
return storageGroupMap;
}
NetworkTopology getCluster() {
public NetworkTopology getCluster() {
return cluster;
}
@ -779,7 +801,7 @@ private boolean shouldIgnore(DatanodeInfo dn) {
}
/** Get live datanode storage reports and then build the network topology. */
List<DatanodeStorageReport> init() throws IOException {
public List<DatanodeStorageReport> init() throws IOException {
final DatanodeStorageReport[] reports = nnc.getLiveDatanodeStorageReport();
final List<DatanodeStorageReport> trimmed = new ArrayList<DatanodeStorageReport>();
// create network topology and classify utilization collections:
@ -795,8 +817,18 @@ List<DatanodeStorageReport> init() throws IOException {
return trimmed;
}
public DDatanode newDatanode(DatanodeStorageReport r) {
return new DDatanode(r, maxConcurrentMovesPerNode);
public DDatanode newDatanode(DatanodeInfo datanode) {
return new DDatanode(datanode, maxConcurrentMovesPerNode);
}
public void executePendingMove(final PendingMove p) {
// move the block
moveExecutor.execute(new Runnable() {
@Override
public void run() {
p.dispatch();
}
});
}
public boolean dispatchAndCheckContinue() throws InterruptedException {
@ -869,6 +901,12 @@ private void waitForMoveCompletion() {
}
}
private boolean isGoodBlockCandidate(StorageGroup source, StorageGroup target,
DBlock block) {
// match source and target storage type
return isGoodBlockCandidate(source, target, source.getStorageType(), block);
}
/**
* Decide if the block is a good candidate to be moved from source to target.
* A block is a good candidate if
@ -876,9 +914,12 @@ private void waitForMoveCompletion() {
* 2. the block does not have a replica on the target;
* 3. doing the move does not reduce the number of racks that the block has
*/
private boolean isGoodBlockCandidate(Source source, StorageGroup target,
DBlock block) {
if (source.storageType != target.storageType) {
public boolean isGoodBlockCandidate(StorageGroup source, StorageGroup target,
StorageType targetStorageType, DBlock block) {
if (target.storageType != targetStorageType) {
return false;
}
if (!target.hasSpaceForScheduling(block.getNumBytes())) {
return false;
}
// check if the block is moved or not
@ -889,7 +930,7 @@ private boolean isGoodBlockCandidate(Source source, StorageGroup target,
return false;
}
if (cluster.isNodeGroupAware()
&& isOnSameNodeGroupWithReplicas(target, block, source)) {
&& isOnSameNodeGroupWithReplicas(source, target, block)) {
return false;
}
if (reduceNumOfRacks(source, target, block)) {
@ -902,7 +943,7 @@ && isOnSameNodeGroupWithReplicas(target, block, source)) {
* Determine whether moving the given block replica from source to target
* would reduce the number of racks of the block replicas.
*/
private boolean reduceNumOfRacks(Source source, StorageGroup target,
private boolean reduceNumOfRacks(StorageGroup source, StorageGroup target,
DBlock block) {
final DatanodeInfo sourceDn = source.getDatanodeInfo();
if (cluster.isOnSameRack(sourceDn, target.getDatanodeInfo())) {
@ -939,8 +980,8 @@ private boolean reduceNumOfRacks(Source source, StorageGroup target,
* @return true if there are any replica (other than source) on the same node
* group with target
*/
private boolean isOnSameNodeGroupWithReplicas(
StorageGroup target, DBlock block, Source source) {
private boolean isOnSameNodeGroupWithReplicas(StorageGroup source,
StorageGroup target, DBlock block) {
final DatanodeInfo targetDn = target.getDatanodeInfo();
for (StorageGroup g : block.getLocations()) {
if (g != source && cluster.isOnSameNodeGroup(g.getDatanodeInfo(), targetDn)) {
@ -961,7 +1002,7 @@ void reset(Configuration conf) {
}
/** shutdown thread pools */
void shutdownNow() {
public void shutdownNow() {
dispatchExecutor.shutdownNow();
moveExecutor.shutdownNow();
}

View File

@ -23,6 +23,9 @@
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -31,6 +34,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
@ -51,6 +55,20 @@ public class NameNodeConnector implements Closeable {
private static final Log LOG = LogFactory.getLog(NameNodeConnector.class);
private static final int MAX_NOT_CHANGED_ITERATIONS = 5;
/** Create {@link NameNodeConnector} for the given namenodes. */
public static List<NameNodeConnector> newNameNodeConnectors(
Collection<URI> namenodes, String name, Path idPath, Configuration conf)
throws IOException {
final List<NameNodeConnector> connectors = new ArrayList<NameNodeConnector>(
namenodes.size());
for (URI uri : namenodes) {
NameNodeConnector nnc = new NameNodeConnector(name, uri, idPath, conf);
nnc.getKeyManager().startBlockKeyUpdater();
connectors.add(nnc);
}
return connectors;
}
private final URI nameNodeUri;
private final String blockpoolID;
@ -59,7 +77,7 @@ public class NameNodeConnector implements Closeable {
private final ClientProtocol client;
private final KeyManager keyManager;
private final FileSystem fs;
private final DistributedFileSystem fs;
private final Path idPath;
private final OutputStream out;
@ -74,7 +92,7 @@ public NameNodeConnector(String name, URI nameNodeUri, Path idPath,
NamenodeProtocol.class).getProxy();
this.client = NameNodeProxies.createProxy(conf, nameNodeUri,
ClientProtocol.class).getProxy();
this.fs = FileSystem.get(nameNodeUri, conf);
this.fs = (DistributedFileSystem)FileSystem.get(nameNodeUri, conf);
final NamespaceInfo namespaceinfo = namenode.versionRequest();
this.blockpoolID = namespaceinfo.getBlockPoolID();
@ -89,6 +107,10 @@ public NameNodeConnector(String name, URI nameNodeUri, Path idPath,
}
}
public DistributedFileSystem getDistributedFileSystem() {
return fs;
}
/** @return the block pool ID */
public String getBlockpoolID() {
return blockpoolID;

View File

@ -0,0 +1,431 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.mover;
import java.io.IOException;
import java.net.URI;
import java.text.DateFormat;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.EnumMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.BlockStoragePolicy;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.balancer.Dispatcher;
import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DBlock;
import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DDatanode;
import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DDatanode.StorageGroup;
import org.apache.hadoop.hdfs.server.balancer.Dispatcher.PendingMove;
import org.apache.hadoop.hdfs.server.balancer.Dispatcher.Source;
import org.apache.hadoop.hdfs.server.balancer.Dispatcher.StorageGroupMap;
import org.apache.hadoop.hdfs.server.balancer.ExitStatus;
import org.apache.hadoop.hdfs.server.balancer.Matcher;
import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
@InterfaceAudience.Private
public class Mover {
static final Log LOG = LogFactory.getLog(Mover.class);
private static final Path MOVER_ID_PATH = new Path("/system/mover.id");
private static class StorageMap {
private final StorageGroupMap<Source> sources
= new StorageGroupMap<Source>();
private final StorageGroupMap<StorageGroup> targets
= new StorageGroupMap<StorageGroup>();
private final EnumMap<StorageType, List<StorageGroup>> targetStorageTypeMap
= new EnumMap<StorageType, List<StorageGroup>>(StorageType.class);
private StorageMap() {
for(StorageType t : StorageType.asList()) {
targetStorageTypeMap.put(t, new LinkedList<StorageGroup>());
}
}
private void add(Source source, StorageGroup target) {
sources.put(source);
targets.put(target);
getTargetStorages(target.getStorageType()).add(target);
}
private Source getSource(MLocation ml) {
return get(sources, ml);
}
private StorageGroup getTarget(MLocation ml) {
return get(targets, ml);
}
private static <G extends StorageGroup> G get(StorageGroupMap<G> map, MLocation ml) {
return map.get(ml.datanode.getDatanodeUuid(), ml.storageType);
}
private List<StorageGroup> getTargetStorages(StorageType t) {
return targetStorageTypeMap.get(t);
}
}
private final Dispatcher dispatcher;
private final StorageMap storages;
private final BlockStoragePolicy.Suite blockStoragePolicies;
Mover(NameNodeConnector nnc, Configuration conf) {
final long movedWinWidth = conf.getLong(
DFSConfigKeys.DFS_MOVER_MOVEDWINWIDTH_KEY,
DFSConfigKeys.DFS_MOVER_MOVEDWINWIDTH_DEFAULT);
final int moverThreads = conf.getInt(
DFSConfigKeys.DFS_MOVER_MOVERTHREADS_KEY,
DFSConfigKeys.DFS_MOVER_MOVERTHREADS_DEFAULT);
final int maxConcurrentMovesPerNode = conf.getInt(
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT);
this.dispatcher = new Dispatcher(nnc, Collections.<String> emptySet(),
Collections.<String> emptySet(), movedWinWidth, moverThreads, 0,
maxConcurrentMovesPerNode, conf);
this.storages = new StorageMap();
this.blockStoragePolicies = BlockStoragePolicy.readBlockStorageSuite(conf);
}
private ExitStatus run() {
try {
final List<DatanodeStorageReport> reports = dispatcher.init();
for(DatanodeStorageReport r : reports) {
final DDatanode dn = dispatcher.newDatanode(r.getDatanodeInfo());
for(StorageType t : StorageType.asList()) {
final long maxRemaining = getMaxRemaining(r, t);
if (maxRemaining > 0L) {
final Source source = dn.addSource(t, Long.MAX_VALUE, dispatcher);
final StorageGroup target = dn.addTarget(t, maxRemaining);
storages.add(source, target);
}
}
}
new Processor().processNamespace();
return ExitStatus.IN_PROGRESS;
} catch (IllegalArgumentException e) {
System.out.println(e + ". Exiting ...");
return ExitStatus.ILLEGAL_ARGUMENTS;
} catch (IOException e) {
System.out.println(e + ". Exiting ...");
return ExitStatus.IO_EXCEPTION;
} finally {
dispatcher.shutdownNow();
}
}
private static long getMaxRemaining(DatanodeStorageReport report, StorageType t) {
long max = 0L;
for(StorageReport r : report.getStorageReports()) {
if (r.getStorage().getStorageType() == t) {
if (r.getRemaining() > max) {
max = r.getRemaining();
}
}
}
return max;
}
private class Processor {
private final DFSClient dfs;
private Processor() {
dfs = dispatcher.getDistributedFileSystem().getClient();
}
private void processNamespace() {
try {
processDirRecursively("", dfs.getFileInfo("/"));
} catch (IOException e) {
LOG.warn("Failed to get root directory status. Ignore and continue.", e);
}
}
private void processDirRecursively(String parent, HdfsFileStatus status) {
if (status.isSymlink()) {
return; //ignore symlinks
} else if (status.isDir()) {
String dir = status.getFullName(parent);
if (!dir.endsWith(Path.SEPARATOR)) {
dir = dir + Path.SEPARATOR;
}
for(byte[] lastReturnedName = HdfsFileStatus.EMPTY_NAME;;) {
final DirectoryListing children;
try {
children = dfs.listPaths(dir, lastReturnedName, true);
} catch(IOException e) {
LOG.warn("Failed to list directory " + dir
+ ". Ignore the directory and continue.", e);
return;
}
if (children == null) {
return;
}
for (HdfsFileStatus child : children.getPartialListing()) {
processDirRecursively(dir, child);
}
if (!children.hasMore()) {
lastReturnedName = children.getLastName();
} else {
return;
}
}
} else { // file
processFile(parent, (HdfsLocatedFileStatus)status);
}
}
private void processFile(String parent, HdfsLocatedFileStatus status) {
final BlockStoragePolicy policy = blockStoragePolicies.getPolicy(
status.getStoragePolicy());
final List<StorageType> types = policy.chooseStorageTypes(
status.getReplication());
final LocatedBlocks locations = status.getBlockLocations();
for(LocatedBlock lb : locations.getLocatedBlocks()) {
final StorageTypeDiff diff = new StorageTypeDiff(types, lb.getStorageTypes());
if (!diff.removeOverlap()) {
scheduleMoves4Block(diff, lb);
}
}
}
void scheduleMoves4Block(StorageTypeDiff diff, LocatedBlock lb) {
final List<MLocation> locations = MLocation.toLocations(lb);
Collections.shuffle(locations);
final DBlock db = new DBlock(lb.getBlock().getLocalBlock());
for(MLocation ml : locations) {
db.addLocation(storages.getTarget(ml));
}
for(final Iterator<StorageType> i = diff.existing.iterator(); i.hasNext(); ) {
final StorageType t = i.next();
for(final Iterator<MLocation> j = locations.iterator(); j.hasNext(); ) {
final MLocation ml = j.next();
final Source source = storages.getSource(ml);
if (ml.storageType == t) {
// try to schedule replica move.
if (scheduleMoveReplica(db, ml, source, diff.expected)) {
i.remove();
j.remove();
}
}
}
}
}
boolean scheduleMoveReplica(DBlock db, MLocation ml, Source source,
List<StorageType> targetTypes) {
if (dispatcher.getCluster().isNodeGroupAware()) {
if (chooseTarget(db, ml, source, targetTypes, Matcher.SAME_NODE_GROUP)) {
return true;
}
}
// Then, match nodes on the same rack
if (chooseTarget(db, ml, source, targetTypes, Matcher.SAME_RACK)) {
return true;
}
// At last, match all remaining nodes
if (chooseTarget(db, ml, source, targetTypes, Matcher.ANY_OTHER)) {
return true;
}
return false;
}
boolean chooseTarget(DBlock db, MLocation ml, Source source,
List<StorageType> targetTypes, Matcher matcher) {
final NetworkTopology cluster = dispatcher.getCluster();
for(final Iterator<StorageType> i = targetTypes.iterator(); i.hasNext(); ) {
final StorageType t = i.next();
for(StorageGroup target : storages.getTargetStorages(t)) {
if (matcher.match(cluster, ml.datanode, target.getDatanodeInfo())
&& dispatcher.isGoodBlockCandidate(source, target, t, db)) {
final PendingMove pm = dispatcher.new PendingMove(db, source, target);
if (pm.chooseProxySource()) {
i.remove();
target.incScheduledSize(ml.size);
dispatcher.executePendingMove(pm);
return true;
}
}
}
}
return false;
}
}
static class MLocation {
final DatanodeInfo datanode;
final StorageType storageType;
final long size;
MLocation(DatanodeInfo datanode, StorageType storageType, long size) {
this.datanode = datanode;
this.storageType = storageType;
this.size = size;
}
static List<MLocation> toLocations(LocatedBlock lb) {
final DatanodeInfo[] datanodeInfos = lb.getLocations();
final StorageType[] storageTypes = lb.getStorageTypes();
final long size = lb.getBlockSize();
final List<MLocation> locations = new LinkedList<MLocation>();
for(int i = 0; i < datanodeInfos.length; i++) {
locations.add(new MLocation(datanodeInfos[i], storageTypes[i], size));
}
return locations;
}
}
private static class StorageTypeDiff {
final List<StorageType> expected;
final List<StorageType> existing;
StorageTypeDiff(List<StorageType> expected, StorageType[] existing) {
this.expected = new LinkedList<StorageType>(expected);
this.existing = new LinkedList<StorageType>(Arrays.asList(existing));
}
/**
* Remove the overlap between the expected types and the existing types.
* @return if the existing types is empty after removed the overlap.
*/
boolean removeOverlap() {
for(Iterator<StorageType> i = existing.iterator(); i.hasNext(); ) {
final StorageType t = i.next();
if (expected.remove(t)) {
i.remove();
}
}
return existing.isEmpty();
}
}
static int run(Collection<URI> namenodes, Configuration conf)
throws IOException, InterruptedException {
final long sleeptime = 2000*conf.getLong(
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT);
LOG.info("namenodes = " + namenodes);
List<NameNodeConnector> connectors = Collections.emptyList();
try {
connectors = NameNodeConnector.newNameNodeConnectors(namenodes,
Mover.class.getSimpleName(), MOVER_ID_PATH, conf);
while (true) {
Collections.shuffle(connectors);
for(NameNodeConnector nnc : connectors) {
final Mover m = new Mover(nnc, conf);
final ExitStatus r = m.run();
if (r != ExitStatus.IN_PROGRESS) {
//must be an error statue, return.
return r.getExitCode();
}
}
Thread.sleep(sleeptime);
}
} finally {
for(NameNodeConnector nnc : connectors) {
IOUtils.cleanup(LOG, nnc);
}
}
}
static class Cli extends Configured implements Tool {
private static final String USAGE = "Usage: java "
+ Mover.class.getSimpleName();
@Override
public int run(String[] args) throws Exception {
final long startTime = Time.monotonicNow();
final Configuration conf = getConf();
try {
final Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
return Mover.run(namenodes, conf);
} catch (IOException e) {
System.out.println(e + ". Exiting ...");
return ExitStatus.IO_EXCEPTION.getExitCode();
} catch (InterruptedException e) {
System.out.println(e + ". Exiting ...");
return ExitStatus.INTERRUPTED.getExitCode();
} finally {
System.out.format("%-24s ", DateFormat.getDateTimeInstance().format(new Date()));
System.out.println("Mover took " + StringUtils.formatTime(Time.monotonicNow()-startTime));
}
}
/**
* Run a Mover in command line.
*
* @param args Command line arguments
*/
public static void main(String[] args) {
if (DFSUtil.parseHelpArgument(args, USAGE, System.out, true)) {
System.exit(0);
}
try {
System.exit(ToolRunner.run(new HdfsConfiguration(), new Cli(), args));
} catch (Throwable e) {
LOG.error("Exiting " + Mover.class.getSimpleName()
+ " due to an exception", e);
System.exit(-1);
}
}
}
}