HDFS-6809. Move Balancer's inner classes MovedBlocks and Matcher as to standalone classes and separates KeyManager from NameNodeConnector.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1616422 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
be6360593b
commit
83b9933db3
@ -360,6 +360,10 @@ Release 2.6.0 - UNRELEASED
|
||||
|
||||
HDFS-6787. Remove duplicate code in FSDirectory#unprotectedConcat. (Yi Liu via umamahesh)
|
||||
|
||||
HDFS-6809. Move Balancer's inner classes MovedBlocks and Matcher as to
|
||||
standalone classes and separates KeyManager from NameNodeConnector.
|
||||
(szetszwo)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HDFS-6690. Deduplicate xattr names in memory. (wang)
|
||||
|
@ -58,6 +58,7 @@
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.conf.Configured;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
@ -85,7 +86,6 @@
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.net.NetworkTopology;
|
||||
import org.apache.hadoop.net.Node;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.util.HostsFileReader;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
@ -195,10 +195,12 @@
|
||||
@InterfaceAudience.Private
|
||||
public class Balancer {
|
||||
static final Log LOG = LogFactory.getLog(Balancer.class);
|
||||
final private static long GB = 1L << 30; //1GB
|
||||
final private static long MAX_SIZE_TO_MOVE = 10*GB;
|
||||
final private static long MAX_BLOCKS_SIZE_TO_FETCH = 2*GB;
|
||||
private static long WIN_WIDTH = 5400*1000L; // 1.5 hour
|
||||
|
||||
private static final Path BALANCER_ID_PATH = new Path("/system/balancer.id");
|
||||
|
||||
private static final long GB = 1L << 30; //1GB
|
||||
private static final long MAX_SIZE_TO_MOVE = 10*GB;
|
||||
private static final long MAX_BLOCKS_SIZE_TO_FETCH = 2*GB;
|
||||
|
||||
/** The maximum number of concurrent blocks moves for
|
||||
* balancing purpose at a datanode
|
||||
@ -219,6 +221,8 @@ public class Balancer {
|
||||
+ "\tIncludes only the specified datanodes.";
|
||||
|
||||
private final NameNodeConnector nnc;
|
||||
private final KeyManager keyManager;
|
||||
|
||||
private final BalancingPolicy policy;
|
||||
private final SaslDataTransferClient saslClient;
|
||||
private final double threshold;
|
||||
@ -241,7 +245,8 @@ public class Balancer {
|
||||
|
||||
private final Map<Block, BalancerBlock> globalBlockList
|
||||
= new HashMap<Block, BalancerBlock>();
|
||||
private final MovedBlocks movedBlocks = new MovedBlocks();
|
||||
private final MovedBlocks<BalancerDatanode.StorageGroup> movedBlocks;
|
||||
|
||||
/** Map (datanodeUuid,storageType -> StorageGroup) */
|
||||
private final StorageGroupMap storageGroupMap = new StorageGroupMap();
|
||||
|
||||
@ -326,7 +331,7 @@ private boolean markMovedIfGoodBlock(BalancerBlock block) {
|
||||
if (isGoodBlockCandidate(source, target, block)) {
|
||||
this.block = block;
|
||||
if ( chooseProxySource() ) {
|
||||
movedBlocks.add(block);
|
||||
movedBlocks.put(block);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Decided to move " + this);
|
||||
}
|
||||
@ -399,10 +404,10 @@ private void dispatch() {
|
||||
|
||||
OutputStream unbufOut = sock.getOutputStream();
|
||||
InputStream unbufIn = sock.getInputStream();
|
||||
ExtendedBlock eb = new ExtendedBlock(nnc.blockpoolID, block.getBlock());
|
||||
Token<BlockTokenIdentifier> accessToken = nnc.getAccessToken(eb);
|
||||
ExtendedBlock eb = new ExtendedBlock(nnc.getBlockpoolID(), block.getBlock());
|
||||
Token<BlockTokenIdentifier> accessToken = keyManager.getAccessToken(eb);
|
||||
IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut,
|
||||
unbufIn, nnc, accessToken, target.getDatanode());
|
||||
unbufIn, keyManager, accessToken, target.getDatanode());
|
||||
unbufOut = saslStreams.out;
|
||||
unbufIn = saslStreams.in;
|
||||
out = new DataOutputStream(new BufferedOutputStream(unbufOut,
|
||||
@ -483,47 +488,9 @@ public void run() {
|
||||
}
|
||||
|
||||
/* A class for keeping track of blocks in the Balancer */
|
||||
static private class BalancerBlock {
|
||||
private final Block block; // the block
|
||||
/** The locations of the replicas of the block. */
|
||||
private final List<BalancerDatanode.StorageGroup> locations
|
||||
= new ArrayList<BalancerDatanode.StorageGroup>(3);
|
||||
|
||||
/* Constructor */
|
||||
private BalancerBlock(Block block) {
|
||||
this.block = block;
|
||||
}
|
||||
|
||||
/* clean block locations */
|
||||
private synchronized void clearLocations() {
|
||||
locations.clear();
|
||||
}
|
||||
|
||||
/* add a location */
|
||||
private synchronized void addLocation(BalancerDatanode.StorageGroup g) {
|
||||
if (!locations.contains(g)) {
|
||||
locations.add(g);
|
||||
}
|
||||
}
|
||||
|
||||
/** @return if the block is located on the given storage group. */
|
||||
private synchronized boolean isLocatedOn(BalancerDatanode.StorageGroup g) {
|
||||
return locations.contains(g);
|
||||
}
|
||||
|
||||
/* Return its locations */
|
||||
private synchronized List<BalancerDatanode.StorageGroup> getLocations() {
|
||||
return locations;
|
||||
}
|
||||
|
||||
/* Return the block */
|
||||
private Block getBlock() {
|
||||
return block;
|
||||
}
|
||||
|
||||
/* Return the length of the block */
|
||||
private long getNumBytes() {
|
||||
return block.getNumBytes();
|
||||
static class BalancerBlock extends MovedBlocks.Locations<BalancerDatanode.StorageGroup> {
|
||||
BalancerBlock(Block block) {
|
||||
super(block);
|
||||
}
|
||||
}
|
||||
|
||||
@ -735,7 +702,7 @@ private Iterator<BalancerBlock> getBlockIterator() {
|
||||
*/
|
||||
private long getBlockList() throws IOException {
|
||||
final long size = Math.min(MAX_BLOCKS_SIZE_TO_FETCH, blocksToReceive);
|
||||
final BlockWithLocations[] newBlocks = nnc.namenode.getBlocks(
|
||||
final BlockWithLocations[] newBlocks = nnc.getNamenode().getBlocks(
|
||||
getDatanode(), size).getBlocks();
|
||||
|
||||
long bytesReceived = 0;
|
||||
@ -819,7 +786,7 @@ private PendingBlockMove chooseNextBlockToMove() {
|
||||
private void filterMovedBlocks() {
|
||||
for (Iterator<BalancerBlock> blocks=getBlockIterator();
|
||||
blocks.hasNext();) {
|
||||
if (movedBlocks.contains(blocks.next())) {
|
||||
if (movedBlocks.contains(blocks.next().getBlock())) {
|
||||
blocks.remove();
|
||||
}
|
||||
}
|
||||
@ -925,6 +892,13 @@ private static void checkReplicationPolicyCompatibility(Configuration conf
|
||||
this.nodesToBeExcluded = p.nodesToBeExcluded;
|
||||
this.nodesToBeIncluded = p.nodesToBeIncluded;
|
||||
this.nnc = theblockpool;
|
||||
this.keyManager = nnc.getKeyManager();
|
||||
|
||||
final long movedWinWidth = conf.getLong(
|
||||
DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY,
|
||||
DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_DEFAULT);
|
||||
movedBlocks = new MovedBlocks<BalancerDatanode.StorageGroup>(movedWinWidth);
|
||||
|
||||
cluster = NetworkTopology.getInstance(conf);
|
||||
|
||||
this.moverExecutor = Executors.newFixedThreadPool(
|
||||
@ -1094,36 +1068,6 @@ void logUtilizationCollection(String name, Collection<T> items) {
|
||||
LOG.info(items.size() + " " + name + ": " + items);
|
||||
}
|
||||
|
||||
/** A matcher interface for matching nodes. */
|
||||
private interface Matcher {
|
||||
/** Given the cluster topology, does the left node match the right node? */
|
||||
boolean match(NetworkTopology cluster, Node left, Node right);
|
||||
}
|
||||
|
||||
/** Match datanodes in the same node group. */
|
||||
static final Matcher SAME_NODE_GROUP = new Matcher() {
|
||||
@Override
|
||||
public boolean match(NetworkTopology cluster, Node left, Node right) {
|
||||
return cluster.isOnSameNodeGroup(left, right);
|
||||
}
|
||||
};
|
||||
|
||||
/** Match datanodes in the same rack. */
|
||||
static final Matcher SAME_RACK = new Matcher() {
|
||||
@Override
|
||||
public boolean match(NetworkTopology cluster, Node left, Node right) {
|
||||
return cluster.isOnSameRack(left, right);
|
||||
}
|
||||
};
|
||||
|
||||
/** Match any datanode with any other datanode. */
|
||||
static final Matcher ANY_OTHER = new Matcher() {
|
||||
@Override
|
||||
public boolean match(NetworkTopology cluster, Node left, Node right) {
|
||||
return left != right;
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Decide all <source, target> pairs and
|
||||
* the number of bytes to move from a source to a target
|
||||
@ -1134,13 +1078,13 @@ public boolean match(NetworkTopology cluster, Node left, Node right) {
|
||||
private long chooseStorageGroups() {
|
||||
// First, match nodes on the same node group if cluster is node group aware
|
||||
if (cluster.isNodeGroupAware()) {
|
||||
chooseStorageGroups(SAME_NODE_GROUP);
|
||||
chooseStorageGroups(Matcher.SAME_NODE_GROUP);
|
||||
}
|
||||
|
||||
// Then, match nodes on the same rack
|
||||
chooseStorageGroups(SAME_RACK);
|
||||
chooseStorageGroups(Matcher.SAME_RACK);
|
||||
// At last, match all remaining nodes
|
||||
chooseStorageGroups(ANY_OTHER);
|
||||
chooseStorageGroups(Matcher.ANY_OTHER);
|
||||
|
||||
Preconditions.checkState(storageGroupMap.size() >= sources.size() + targets.size(),
|
||||
"Mismatched number of datanodes (" + storageGroupMap.size() + " < "
|
||||
@ -1307,56 +1251,6 @@ private void waitForMoveCompletion() {
|
||||
} while (shouldWait);
|
||||
}
|
||||
|
||||
/** This window makes sure to keep blocks that have been moved within 1.5 hour.
|
||||
* Old window has blocks that are older;
|
||||
* Current window has blocks that are more recent;
|
||||
* Cleanup method triggers the check if blocks in the old window are
|
||||
* more than 1.5 hour old. If yes, purge the old window and then
|
||||
* move blocks in current window to old window.
|
||||
*/
|
||||
private static class MovedBlocks {
|
||||
private long lastCleanupTime = Time.now();
|
||||
final private static int CUR_WIN = 0;
|
||||
final private static int OLD_WIN = 1;
|
||||
final private static int NUM_WINS = 2;
|
||||
final private List<HashMap<Block, BalancerBlock>> movedBlocks =
|
||||
new ArrayList<HashMap<Block, BalancerBlock>>(NUM_WINS);
|
||||
|
||||
/* initialize the moved blocks collection */
|
||||
private MovedBlocks() {
|
||||
movedBlocks.add(new HashMap<Block,BalancerBlock>());
|
||||
movedBlocks.add(new HashMap<Block,BalancerBlock>());
|
||||
}
|
||||
|
||||
/* add a block thus marking a block to be moved */
|
||||
synchronized private void add(BalancerBlock block) {
|
||||
movedBlocks.get(CUR_WIN).put(block.getBlock(), block);
|
||||
}
|
||||
|
||||
/* check if a block is marked as moved */
|
||||
synchronized private boolean contains(BalancerBlock block) {
|
||||
return contains(block.getBlock());
|
||||
}
|
||||
|
||||
/* check if a block is marked as moved */
|
||||
synchronized private boolean contains(Block block) {
|
||||
return movedBlocks.get(CUR_WIN).containsKey(block) ||
|
||||
movedBlocks.get(OLD_WIN).containsKey(block);
|
||||
}
|
||||
|
||||
/* remove old blocks */
|
||||
synchronized private void cleanup() {
|
||||
long curTime = Time.now();
|
||||
// check if old win is older than winWidth
|
||||
if (lastCleanupTime + WIN_WIDTH <= curTime) {
|
||||
// purge the old window
|
||||
movedBlocks.set(OLD_WIN, movedBlocks.get(CUR_WIN));
|
||||
movedBlocks.set(CUR_WIN, new HashMap<Block, BalancerBlock>());
|
||||
lastCleanupTime = curTime;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* Decide if it is OK to move the given block from source to target
|
||||
* A block is a good candidate if
|
||||
* 1. the block is not in the process of being moved/has not been moved;
|
||||
@ -1369,7 +1263,7 @@ private boolean isGoodBlockCandidate(Source source,
|
||||
return false;
|
||||
}
|
||||
// check if the block is moved or not
|
||||
if (movedBlocks.contains(block)) {
|
||||
if (movedBlocks.contains(block.getBlock())) {
|
||||
return false;
|
||||
}
|
||||
if (block.isLocatedOn(target)) {
|
||||
@ -1387,7 +1281,7 @@ private boolean isGoodBlockCandidate(Source source,
|
||||
} else {
|
||||
boolean notOnSameRack = true;
|
||||
synchronized (block) {
|
||||
for (BalancerDatanode.StorageGroup loc : block.locations) {
|
||||
for (BalancerDatanode.StorageGroup loc : block.getLocations()) {
|
||||
if (cluster.isOnSameRack(loc.getDatanode(), target.getDatanode())) {
|
||||
notOnSameRack = false;
|
||||
break;
|
||||
@ -1399,7 +1293,7 @@ private boolean isGoodBlockCandidate(Source source,
|
||||
goodBlock = true;
|
||||
} else {
|
||||
// good if source is on the same rack as on of the replicas
|
||||
for (BalancerDatanode.StorageGroup loc : block.locations) {
|
||||
for (BalancerDatanode.StorageGroup loc : block.getLocations()) {
|
||||
if (loc != source &&
|
||||
cluster.isOnSameRack(loc.getDatanode(), source.getDatanode())) {
|
||||
goodBlock = true;
|
||||
@ -1425,7 +1319,7 @@ private boolean isGoodBlockCandidate(Source source,
|
||||
private boolean isOnSameNodeGroupWithReplicas(BalancerDatanode.StorageGroup target,
|
||||
BalancerBlock block, Source source) {
|
||||
final DatanodeInfo targetDn = target.getDatanode();
|
||||
for (BalancerDatanode.StorageGroup loc : block.locations) {
|
||||
for (BalancerDatanode.StorageGroup loc : block.getLocations()) {
|
||||
if (loc != source &&
|
||||
cluster.isOnSameNodeGroup(loc.getDatanode(), targetDn)) {
|
||||
return true;
|
||||
@ -1489,7 +1383,7 @@ private ReturnStatus run(int iteration, Formatter formatter,
|
||||
* decide the number of bytes need to be moved
|
||||
*/
|
||||
final long bytesLeftToMove = init(
|
||||
nnc.client.getDatanodeStorageReport(DatanodeReportType.LIVE));
|
||||
nnc.getClient().getDatanodeStorageReport(DatanodeReportType.LIVE));
|
||||
if (bytesLeftToMove == 0) {
|
||||
System.out.println("The cluster is balanced. Exiting...");
|
||||
return ReturnStatus.SUCCESS;
|
||||
@ -1558,8 +1452,8 @@ static int run(Collection<URI> namenodes, final Parameters p,
|
||||
final long sleeptime = 2000*conf.getLong(
|
||||
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
|
||||
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT);
|
||||
LOG.info("namenodes = " + namenodes);
|
||||
LOG.info("p = " + p);
|
||||
LOG.info("namenodes = " + namenodes);
|
||||
LOG.info("parameters = " + p);
|
||||
|
||||
final Formatter formatter = new Formatter(System.out);
|
||||
System.out.println("Time Stamp Iteration# Bytes Already Moved Bytes Left To Move Bytes Being Moved");
|
||||
@ -1568,7 +1462,10 @@ static int run(Collection<URI> namenodes, final Parameters p,
|
||||
= new ArrayList<NameNodeConnector>(namenodes.size());
|
||||
try {
|
||||
for (URI uri : namenodes) {
|
||||
connectors.add(new NameNodeConnector(uri, conf));
|
||||
final NameNodeConnector nnc = new NameNodeConnector(
|
||||
Balancer.class.getSimpleName(), uri, BALANCER_ID_PATH, conf);
|
||||
nnc.getKeyManager().startBlockKeyUpdater();
|
||||
connectors.add(nnc);
|
||||
}
|
||||
|
||||
boolean done = false;
|
||||
@ -1730,9 +1627,6 @@ static class Cli extends Configured implements Tool {
|
||||
public int run(String[] args) {
|
||||
final long startTime = Time.now();
|
||||
final Configuration conf = getConf();
|
||||
WIN_WIDTH = conf.getLong(
|
||||
DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY,
|
||||
DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_DEFAULT);
|
||||
|
||||
try {
|
||||
checkReplicationPolicyCompatibility(conf);
|
||||
|
@ -0,0 +1,173 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.balancer;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.EnumSet;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
|
||||
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
|
||||
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.util.Daemon;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
/**
|
||||
* The class provides utilities for key and token management.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class KeyManager implements Closeable, DataEncryptionKeyFactory {
|
||||
private static final Log LOG = LogFactory.getLog(KeyManager.class);
|
||||
|
||||
private final NamenodeProtocol namenode;
|
||||
|
||||
private final boolean isBlockTokenEnabled;
|
||||
private final boolean encryptDataTransfer;
|
||||
private boolean shouldRun;
|
||||
|
||||
private final BlockTokenSecretManager blockTokenSecretManager;
|
||||
private final BlockKeyUpdater blockKeyUpdater;
|
||||
private DataEncryptionKey encryptionKey;
|
||||
|
||||
public KeyManager(String blockpoolID, NamenodeProtocol namenode,
|
||||
boolean encryptDataTransfer, Configuration conf) throws IOException {
|
||||
this.namenode = namenode;
|
||||
this.encryptDataTransfer = encryptDataTransfer;
|
||||
|
||||
final ExportedBlockKeys keys = namenode.getBlockKeys();
|
||||
this.isBlockTokenEnabled = keys.isBlockTokenEnabled();
|
||||
if (isBlockTokenEnabled) {
|
||||
long updateInterval = keys.getKeyUpdateInterval();
|
||||
long tokenLifetime = keys.getTokenLifetime();
|
||||
LOG.info("Block token params received from NN: update interval="
|
||||
+ StringUtils.formatTime(updateInterval)
|
||||
+ ", token lifetime=" + StringUtils.formatTime(tokenLifetime));
|
||||
String encryptionAlgorithm = conf.get(
|
||||
DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY);
|
||||
this.blockTokenSecretManager = new BlockTokenSecretManager(
|
||||
updateInterval, tokenLifetime, blockpoolID, encryptionAlgorithm);
|
||||
this.blockTokenSecretManager.addKeys(keys);
|
||||
|
||||
// sync block keys with NN more frequently than NN updates its block keys
|
||||
this.blockKeyUpdater = new BlockKeyUpdater(updateInterval / 4);
|
||||
this.shouldRun = true;
|
||||
} else {
|
||||
this.blockTokenSecretManager = null;
|
||||
this.blockKeyUpdater = null;
|
||||
}
|
||||
}
|
||||
|
||||
public void startBlockKeyUpdater() {
|
||||
if (blockKeyUpdater != null) {
|
||||
blockKeyUpdater.daemon.start();
|
||||
}
|
||||
}
|
||||
|
||||
/** Get an access token for a block. */
|
||||
public Token<BlockTokenIdentifier> getAccessToken(ExtendedBlock eb
|
||||
) throws IOException {
|
||||
if (!isBlockTokenEnabled) {
|
||||
return BlockTokenSecretManager.DUMMY_TOKEN;
|
||||
} else {
|
||||
if (!shouldRun) {
|
||||
throw new IOException(
|
||||
"Cannot get access token since BlockKeyUpdater is not running");
|
||||
}
|
||||
return blockTokenSecretManager.generateToken(null, eb,
|
||||
EnumSet.of(AccessMode.REPLACE, AccessMode.COPY));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataEncryptionKey newDataEncryptionKey() {
|
||||
if (encryptDataTransfer) {
|
||||
synchronized (this) {
|
||||
if (encryptionKey == null) {
|
||||
encryptionKey = blockTokenSecretManager.generateDataEncryptionKey();
|
||||
}
|
||||
return encryptionKey;
|
||||
}
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
shouldRun = false;
|
||||
try {
|
||||
if (blockKeyUpdater != null) {
|
||||
blockKeyUpdater.daemon.interrupt();
|
||||
}
|
||||
} catch(Exception e) {
|
||||
LOG.warn("Exception shutting down access key updater thread", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Periodically updates access keys.
|
||||
*/
|
||||
class BlockKeyUpdater implements Runnable, Closeable {
|
||||
private final Daemon daemon = new Daemon(this);
|
||||
private final long sleepInterval;
|
||||
|
||||
BlockKeyUpdater(final long sleepInterval) {
|
||||
this.sleepInterval = sleepInterval;
|
||||
LOG.info("Update block keys every " + StringUtils.formatTime(sleepInterval));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
while (shouldRun) {
|
||||
try {
|
||||
blockTokenSecretManager.addKeys(namenode.getBlockKeys());
|
||||
} catch (IOException e) {
|
||||
LOG.error("Failed to set keys", e);
|
||||
}
|
||||
Thread.sleep(sleepInterval);
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
LOG.debug("InterruptedException in block key updater thread", e);
|
||||
} catch (Throwable e) {
|
||||
LOG.error("Exception in block key updater thread", e);
|
||||
shouldRun = false;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
try {
|
||||
daemon.interrupt();
|
||||
} catch(Exception e) {
|
||||
LOG.warn("Exception shutting down key updater thread", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,51 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.balancer;
|
||||
|
||||
import org.apache.hadoop.net.NetworkTopology;
|
||||
import org.apache.hadoop.net.Node;
|
||||
|
||||
/** A matcher interface for matching nodes. */
|
||||
public interface Matcher {
|
||||
/** Given the cluster topology, does the left node match the right node? */
|
||||
public boolean match(NetworkTopology cluster, Node left, Node right);
|
||||
|
||||
/** Match datanodes in the same node group. */
|
||||
public static final Matcher SAME_NODE_GROUP = new Matcher() {
|
||||
@Override
|
||||
public boolean match(NetworkTopology cluster, Node left, Node right) {
|
||||
return cluster.isOnSameNodeGroup(left, right);
|
||||
}
|
||||
};
|
||||
|
||||
/** Match datanodes in the same rack. */
|
||||
public static final Matcher SAME_RACK = new Matcher() {
|
||||
@Override
|
||||
public boolean match(NetworkTopology cluster, Node left, Node right) {
|
||||
return cluster.isOnSameRack(left, right);
|
||||
}
|
||||
};
|
||||
|
||||
/** Match any datanode with any other datanode. */
|
||||
public static final Matcher ANY_OTHER = new Matcher() {
|
||||
@Override
|
||||
public boolean match(NetworkTopology cluster, Node left, Node right) {
|
||||
return left != right;
|
||||
}
|
||||
};
|
||||
}
|
@ -0,0 +1,124 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.balancer;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.util.Time;
|
||||
|
||||
/**
|
||||
* This window makes sure to keep blocks that have been moved within a fixed
|
||||
* time interval (default is 1.5 hour). Old window has blocks that are older;
|
||||
* Current window has blocks that are more recent; Cleanup method triggers the
|
||||
* check if blocks in the old window are more than the fixed time interval. If
|
||||
* yes, purge the old window and then move blocks in current window to old
|
||||
* window.
|
||||
*
|
||||
* @param <L> Location type
|
||||
*/
|
||||
public class MovedBlocks<L> {
|
||||
/** A class for keeping track of a block and its locations */
|
||||
public static class Locations<L> {
|
||||
private final Block block; // the block
|
||||
/** The locations of the replicas of the block. */
|
||||
private final List<L> locations = new ArrayList<L>(3);
|
||||
|
||||
public Locations(Block block) {
|
||||
this.block = block;
|
||||
}
|
||||
|
||||
/** clean block locations */
|
||||
public synchronized void clearLocations() {
|
||||
locations.clear();
|
||||
}
|
||||
|
||||
/** add a location */
|
||||
public synchronized void addLocation(L loc) {
|
||||
if (!locations.contains(loc)) {
|
||||
locations.add(loc);
|
||||
}
|
||||
}
|
||||
|
||||
/** @return if the block is located on the given location. */
|
||||
public synchronized boolean isLocatedOn(L loc) {
|
||||
return locations.contains(loc);
|
||||
}
|
||||
|
||||
/** @return its locations */
|
||||
public synchronized List<L> getLocations() {
|
||||
return locations;
|
||||
}
|
||||
|
||||
/* @return the block */
|
||||
public Block getBlock() {
|
||||
return block;
|
||||
}
|
||||
|
||||
/* Return the length of the block */
|
||||
public long getNumBytes() {
|
||||
return block.getNumBytes();
|
||||
}
|
||||
}
|
||||
|
||||
private static final int CUR_WIN = 0;
|
||||
private static final int OLD_WIN = 1;
|
||||
private static final int NUM_WINS = 2;
|
||||
|
||||
private final long winTimeInterval;
|
||||
private long lastCleanupTime = Time.monotonicNow();
|
||||
private final List<Map<Block, Locations<L>>> movedBlocks
|
||||
= new ArrayList<Map<Block, Locations<L>>>(NUM_WINS);
|
||||
|
||||
/** initialize the moved blocks collection */
|
||||
public MovedBlocks(long winTimeInterval) {
|
||||
this.winTimeInterval = winTimeInterval;
|
||||
movedBlocks.add(newMap());
|
||||
movedBlocks.add(newMap());
|
||||
}
|
||||
|
||||
private Map<Block, Locations<L>> newMap() {
|
||||
return new HashMap<Block, Locations<L>>();
|
||||
}
|
||||
|
||||
/** add a block thus marking a block to be moved */
|
||||
public synchronized void put(Locations<L> block) {
|
||||
movedBlocks.get(CUR_WIN).put(block.getBlock(), block);
|
||||
}
|
||||
|
||||
/** @return if a block is marked as moved */
|
||||
public synchronized boolean contains(Block block) {
|
||||
return movedBlocks.get(CUR_WIN).containsKey(block) ||
|
||||
movedBlocks.get(OLD_WIN).containsKey(block);
|
||||
}
|
||||
|
||||
/** remove old blocks */
|
||||
public synchronized void cleanup() {
|
||||
long curTime = Time.monotonicNow();
|
||||
// check if old win is older than winWidth
|
||||
if (lastCleanupTime + winTimeInterval <= curTime) {
|
||||
// purge the old window
|
||||
movedBlocks.set(OLD_WIN, movedBlocks.get(CUR_WIN));
|
||||
movedBlocks.set(CUR_WIN, newMap());
|
||||
lastCleanupTime = curTime;
|
||||
}
|
||||
}
|
||||
}
|
@ -17,113 +17,96 @@
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.balancer;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.net.InetAddress;
|
||||
import java.net.URI;
|
||||
import java.util.EnumSet;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FsServerDefaults;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.NameNodeProxies;
|
||||
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
|
||||
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
|
||||
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
|
||||
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.util.Daemon;
|
||||
|
||||
/**
|
||||
* The class provides utilities for {@link Balancer} to access a NameNode
|
||||
* The class provides utilities for accessing a NameNode.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
class NameNodeConnector implements DataEncryptionKeyFactory {
|
||||
private static final Log LOG = Balancer.LOG;
|
||||
private static final Path BALANCER_ID_PATH = new Path("/system/balancer.id");
|
||||
public class NameNodeConnector implements Closeable {
|
||||
private static final Log LOG = LogFactory.getLog(NameNodeConnector.class);
|
||||
|
||||
private static final int MAX_NOT_CHANGED_ITERATIONS = 5;
|
||||
|
||||
final URI nameNodeUri;
|
||||
final String blockpoolID;
|
||||
private final URI nameNodeUri;
|
||||
private final String blockpoolID;
|
||||
|
||||
final NamenodeProtocol namenode;
|
||||
final ClientProtocol client;
|
||||
final FileSystem fs;
|
||||
final OutputStream out;
|
||||
private final NamenodeProtocol namenode;
|
||||
private final ClientProtocol client;
|
||||
private final KeyManager keyManager;
|
||||
|
||||
private final FileSystem fs;
|
||||
private final Path idPath;
|
||||
private final OutputStream out;
|
||||
|
||||
private final boolean isBlockTokenEnabled;
|
||||
private final boolean encryptDataTransfer;
|
||||
private boolean shouldRun;
|
||||
private long keyUpdaterInterval;
|
||||
// used for balancer
|
||||
private int notChangedIterations = 0;
|
||||
private BlockTokenSecretManager blockTokenSecretManager;
|
||||
private Daemon keyupdaterthread; // AccessKeyUpdater thread
|
||||
private DataEncryptionKey encryptionKey;
|
||||
|
||||
NameNodeConnector(URI nameNodeUri,
|
||||
public NameNodeConnector(String name, URI nameNodeUri, Path idPath,
|
||||
Configuration conf) throws IOException {
|
||||
this.nameNodeUri = nameNodeUri;
|
||||
this.idPath = idPath;
|
||||
|
||||
this.namenode =
|
||||
NameNodeProxies.createProxy(conf, nameNodeUri, NamenodeProtocol.class)
|
||||
.getProxy();
|
||||
this.client =
|
||||
NameNodeProxies.createProxy(conf, nameNodeUri, ClientProtocol.class)
|
||||
.getProxy();
|
||||
this.namenode = NameNodeProxies.createProxy(conf, nameNodeUri,
|
||||
NamenodeProtocol.class).getProxy();
|
||||
this.client = NameNodeProxies.createProxy(conf, nameNodeUri,
|
||||
ClientProtocol.class).getProxy();
|
||||
this.fs = FileSystem.get(nameNodeUri, conf);
|
||||
|
||||
final NamespaceInfo namespaceinfo = namenode.versionRequest();
|
||||
this.blockpoolID = namespaceinfo.getBlockPoolID();
|
||||
|
||||
final ExportedBlockKeys keys = namenode.getBlockKeys();
|
||||
this.isBlockTokenEnabled = keys.isBlockTokenEnabled();
|
||||
if (isBlockTokenEnabled) {
|
||||
long blockKeyUpdateInterval = keys.getKeyUpdateInterval();
|
||||
long blockTokenLifetime = keys.getTokenLifetime();
|
||||
LOG.info("Block token params received from NN: keyUpdateInterval="
|
||||
+ blockKeyUpdateInterval / (60 * 1000) + " min(s), tokenLifetime="
|
||||
+ blockTokenLifetime / (60 * 1000) + " min(s)");
|
||||
String encryptionAlgorithm = conf.get(
|
||||
DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY);
|
||||
this.blockTokenSecretManager = new BlockTokenSecretManager(
|
||||
blockKeyUpdateInterval, blockTokenLifetime, blockpoolID,
|
||||
encryptionAlgorithm);
|
||||
this.blockTokenSecretManager.addKeys(keys);
|
||||
/*
|
||||
* Balancer should sync its block keys with NN more frequently than NN
|
||||
* updates its block keys
|
||||
*/
|
||||
this.keyUpdaterInterval = blockKeyUpdateInterval / 4;
|
||||
LOG.info("Balancer will update its block keys every "
|
||||
+ keyUpdaterInterval / (60 * 1000) + " minute(s)");
|
||||
this.keyupdaterthread = new Daemon(new BlockKeyUpdater());
|
||||
this.shouldRun = true;
|
||||
this.keyupdaterthread.start();
|
||||
}
|
||||
this.encryptDataTransfer = fs.getServerDefaults(new Path("/"))
|
||||
.getEncryptDataTransfer();
|
||||
// Check if there is another balancer running.
|
||||
final FsServerDefaults defaults = fs.getServerDefaults(new Path("/"));
|
||||
this.keyManager = new KeyManager(blockpoolID, namenode,
|
||||
defaults.getEncryptDataTransfer(), conf);
|
||||
// Exit if there is another one running.
|
||||
out = checkAndMarkRunningBalancer();
|
||||
out = checkAndMarkRunning();
|
||||
if (out == null) {
|
||||
throw new IOException("Another balancer is running");
|
||||
throw new IOException("Another " + name + " is running.");
|
||||
}
|
||||
}
|
||||
|
||||
boolean shouldContinue(long dispatchBlockMoveBytes) {
|
||||
/** @return the block pool ID */
|
||||
public String getBlockpoolID() {
|
||||
return blockpoolID;
|
||||
}
|
||||
|
||||
/** @return the namenode proxy. */
|
||||
public NamenodeProtocol getNamenode() {
|
||||
return namenode;
|
||||
}
|
||||
|
||||
/** @return the client proxy. */
|
||||
public ClientProtocol getClient() {
|
||||
return client;
|
||||
}
|
||||
|
||||
/** @return the key manager */
|
||||
public KeyManager getKeyManager() {
|
||||
return keyManager;
|
||||
}
|
||||
|
||||
/** Should the instance continue running? */
|
||||
public boolean shouldContinue(long dispatchBlockMoveBytes) {
|
||||
if (dispatchBlockMoveBytes > 0) {
|
||||
notChangedIterations = 0;
|
||||
} else {
|
||||
@ -137,53 +120,25 @@ boolean shouldContinue(long dispatchBlockMoveBytes) {
|
||||
return true;
|
||||
}
|
||||
|
||||
/** Get an access token for a block. */
|
||||
Token<BlockTokenIdentifier> getAccessToken(ExtendedBlock eb
|
||||
) throws IOException {
|
||||
if (!isBlockTokenEnabled) {
|
||||
return BlockTokenSecretManager.DUMMY_TOKEN;
|
||||
} else {
|
||||
if (!shouldRun) {
|
||||
throw new IOException(
|
||||
"Can not get access token. BlockKeyUpdater is not running");
|
||||
}
|
||||
return blockTokenSecretManager.generateToken(null, eb,
|
||||
EnumSet.of(BlockTokenSecretManager.AccessMode.REPLACE,
|
||||
BlockTokenSecretManager.AccessMode.COPY));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataEncryptionKey newDataEncryptionKey() {
|
||||
if (encryptDataTransfer) {
|
||||
synchronized (this) {
|
||||
if (encryptionKey == null) {
|
||||
encryptionKey = blockTokenSecretManager.generateDataEncryptionKey();
|
||||
}
|
||||
return encryptionKey;
|
||||
}
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/* The idea for making sure that there is no more than one balancer
|
||||
/**
|
||||
* The idea for making sure that there is no more than one instance
|
||||
* running in an HDFS is to create a file in the HDFS, writes the hostname
|
||||
* of the machine on which the balancer is running to the file, but did not
|
||||
* close the file until the balancer exits.
|
||||
* This prevents the second balancer from running because it can not
|
||||
* of the machine on which the instance is running to the file, but did not
|
||||
* close the file until it exits.
|
||||
*
|
||||
* This prevents the second instance from running because it can not
|
||||
* creates the file while the first one is running.
|
||||
*
|
||||
* This method checks if there is any running balancer and
|
||||
* if no, mark yes if no.
|
||||
* This method checks if there is any running instance. If no, mark yes.
|
||||
* Note that this is an atomic operation.
|
||||
*
|
||||
* Return null if there is a running balancer; otherwise the output stream
|
||||
* to the newly created file.
|
||||
* @return null if there is a running instance;
|
||||
* otherwise, the output stream to the newly created file.
|
||||
*/
|
||||
private OutputStream checkAndMarkRunningBalancer() throws IOException {
|
||||
private OutputStream checkAndMarkRunning() throws IOException {
|
||||
try {
|
||||
final DataOutputStream out = fs.create(BALANCER_ID_PATH);
|
||||
final DataOutputStream out = fs.create(idPath);
|
||||
out.writeBytes(InetAddress.getLocalHost().getHostName());
|
||||
out.flush();
|
||||
return out;
|
||||
@ -196,24 +151,17 @@ private OutputStream checkAndMarkRunningBalancer() throws IOException {
|
||||
}
|
||||
}
|
||||
|
||||
/** Close the connection. */
|
||||
void close() {
|
||||
shouldRun = false;
|
||||
try {
|
||||
if (keyupdaterthread != null) {
|
||||
keyupdaterthread.interrupt();
|
||||
}
|
||||
} catch(Exception e) {
|
||||
LOG.warn("Exception shutting down access key updater thread", e);
|
||||
}
|
||||
@Override
|
||||
public void close() {
|
||||
keyManager.close();
|
||||
|
||||
// close the output file
|
||||
IOUtils.closeStream(out);
|
||||
if (fs != null) {
|
||||
try {
|
||||
fs.delete(BALANCER_ID_PATH, true);
|
||||
fs.delete(idPath, true);
|
||||
} catch(IOException ioe) {
|
||||
LOG.warn("Failed to delete " + BALANCER_ID_PATH, ioe);
|
||||
LOG.warn("Failed to delete " + idPath, ioe);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -221,31 +169,6 @@ void close() {
|
||||
@Override
|
||||
public String toString() {
|
||||
return getClass().getSimpleName() + "[namenodeUri=" + nameNodeUri
|
||||
+ ", id=" + blockpoolID
|
||||
+ "]";
|
||||
}
|
||||
|
||||
/**
|
||||
* Periodically updates access keys.
|
||||
*/
|
||||
class BlockKeyUpdater implements Runnable {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
while (shouldRun) {
|
||||
try {
|
||||
blockTokenSecretManager.addKeys(namenode.getBlockKeys());
|
||||
} catch (IOException e) {
|
||||
LOG.error("Failed to set keys", e);
|
||||
}
|
||||
Thread.sleep(keyUpdaterInterval);
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
LOG.debug("InterruptedException in block key updater thread", e);
|
||||
} catch (Throwable e) {
|
||||
LOG.error("Exception in block key updater thread", e);
|
||||
shouldRun = false;
|
||||
}
|
||||
}
|
||||
+ ", bpid=" + blockpoolID + "]";
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user