HDFS-10884: [SPS]: Add block movement tracker to track the completion of block movement future tasks at DN. Contributed by Rakesh R
This commit is contained in:
parent
e2a15d18bb
commit
24add8c2f8
@ -0,0 +1,146 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.datanode;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.CompletionService;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlocksMovementsCompletionHandler;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlockMovementResult;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This class is used to track the completion of block movement future tasks.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Unstable
|
||||||
|
public class BlockStorageMovementTracker implements Runnable {
|
||||||
|
private static final Logger LOG = LoggerFactory
|
||||||
|
.getLogger(BlockStorageMovementTracker.class);
|
||||||
|
private final CompletionService<BlockMovementResult> moverCompletionService;
|
||||||
|
private final BlocksMovementsCompletionHandler blksMovementscompletionHandler;
|
||||||
|
|
||||||
|
// Keeps the information - trackID vs its list of blocks
|
||||||
|
private final Map<Long, List<Future<BlockMovementResult>>> moverTaskFutures;
|
||||||
|
private final Map<Long, List<BlockMovementResult>> movementResults;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* BlockStorageMovementTracker constructor.
|
||||||
|
*
|
||||||
|
* @param moverCompletionService
|
||||||
|
* completion service.
|
||||||
|
* @param handler
|
||||||
|
* blocks movements completion handler
|
||||||
|
*/
|
||||||
|
public BlockStorageMovementTracker(
|
||||||
|
CompletionService<BlockMovementResult> moverCompletionService,
|
||||||
|
BlocksMovementsCompletionHandler handler) {
|
||||||
|
this.moverCompletionService = moverCompletionService;
|
||||||
|
this.moverTaskFutures = new HashMap<>();
|
||||||
|
this.blksMovementscompletionHandler = handler;
|
||||||
|
this.movementResults = new HashMap<>();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
while (true) {
|
||||||
|
if (moverTaskFutures.size() <= 0) {
|
||||||
|
try {
|
||||||
|
synchronized (moverTaskFutures) {
|
||||||
|
// Waiting for mover tasks.
|
||||||
|
moverTaskFutures.wait(2000);
|
||||||
|
}
|
||||||
|
} catch (InterruptedException ignore) {
|
||||||
|
// ignore
|
||||||
|
}
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
Future<BlockMovementResult> future = moverCompletionService.take();
|
||||||
|
if (future != null) {
|
||||||
|
BlockMovementResult result = future.get();
|
||||||
|
LOG.debug("Completed block movement. {}", result);
|
||||||
|
long trackId = result.getTrackId();
|
||||||
|
List<Future<BlockMovementResult>> blocksMoving = moverTaskFutures
|
||||||
|
.get(trackId);
|
||||||
|
blocksMoving.remove(future);
|
||||||
|
|
||||||
|
List<BlockMovementResult> resultPerTrackIdList =
|
||||||
|
addMovementResultToTrackIdList(result);
|
||||||
|
|
||||||
|
// Completed all the scheduled blocks movement under this 'trackId'.
|
||||||
|
if (blocksMoving.isEmpty()) {
|
||||||
|
synchronized (moverTaskFutures) {
|
||||||
|
moverTaskFutures.remove(trackId);
|
||||||
|
}
|
||||||
|
// handle completed blocks movements per trackId.
|
||||||
|
blksMovementscompletionHandler.handle(resultPerTrackIdList);
|
||||||
|
movementResults.remove(trackId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (ExecutionException | InterruptedException e) {
|
||||||
|
// TODO: Do we need failure retries and implement the same if required.
|
||||||
|
LOG.error("Exception while moving block replica to target storage type",
|
||||||
|
e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<BlockMovementResult> addMovementResultToTrackIdList(
|
||||||
|
BlockMovementResult result) {
|
||||||
|
long trackId = result.getTrackId();
|
||||||
|
List<BlockMovementResult> perTrackIdList = movementResults.get(trackId);
|
||||||
|
if (perTrackIdList == null) {
|
||||||
|
perTrackIdList = new ArrayList<>();
|
||||||
|
movementResults.put(trackId, perTrackIdList);
|
||||||
|
}
|
||||||
|
perTrackIdList.add(result);
|
||||||
|
return perTrackIdList;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add future task to the tracking list to check the completion status of the
|
||||||
|
* block movement.
|
||||||
|
*
|
||||||
|
* @param trackID
|
||||||
|
* tracking Id
|
||||||
|
* @param futureTask
|
||||||
|
* future task used for moving the respective block
|
||||||
|
*/
|
||||||
|
void addBlock(long trackID, Future<BlockMovementResult> futureTask) {
|
||||||
|
synchronized (moverTaskFutures) {
|
||||||
|
List<Future<BlockMovementResult>> futures = moverTaskFutures
|
||||||
|
.get(Long.valueOf(trackID));
|
||||||
|
// null for the first task
|
||||||
|
if (futures == null) {
|
||||||
|
futures = new ArrayList<>();
|
||||||
|
moverTaskFutures.put(trackID, futures);
|
||||||
|
}
|
||||||
|
futures.add(futureTask);
|
||||||
|
// Notify waiting tracker thread about the newly added tasks.
|
||||||
|
moverTaskFutures.notify();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -33,7 +33,6 @@
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.CompletionService;
|
import java.util.concurrent.CompletionService;
|
||||||
import java.util.concurrent.ExecutionException;
|
|
||||||
import java.util.concurrent.ExecutorCompletionService;
|
import java.util.concurrent.ExecutorCompletionService;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
@ -65,6 +64,8 @@
|
|||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* StoragePolicySatisfyWorker handles the storage policy satisfier commands.
|
* StoragePolicySatisfyWorker handles the storage policy satisfier commands.
|
||||||
* These commands would be issued from NameNode as part of Datanode's heart beat
|
* These commands would be issued from NameNode as part of Datanode's heart beat
|
||||||
@ -82,8 +83,10 @@ public class StoragePolicySatisfyWorker {
|
|||||||
|
|
||||||
private final int moverThreads;
|
private final int moverThreads;
|
||||||
private final ExecutorService moveExecutor;
|
private final ExecutorService moveExecutor;
|
||||||
private final CompletionService<Void> moverExecutorCompletionService;
|
private final CompletionService<BlockMovementResult> moverCompletionService;
|
||||||
private final List<Future<Void>> moverTaskFutures;
|
private final BlocksMovementsCompletionHandler handler;
|
||||||
|
private final BlockStorageMovementTracker movementTracker;
|
||||||
|
private Daemon movementTrackerThread;
|
||||||
|
|
||||||
public StoragePolicySatisfyWorker(Configuration conf, DataNode datanode) {
|
public StoragePolicySatisfyWorker(Configuration conf, DataNode datanode) {
|
||||||
this.datanode = datanode;
|
this.datanode = datanode;
|
||||||
@ -92,9 +95,13 @@ public StoragePolicySatisfyWorker(Configuration conf, DataNode datanode) {
|
|||||||
moverThreads = conf.getInt(DFSConfigKeys.DFS_MOVER_MOVERTHREADS_KEY,
|
moverThreads = conf.getInt(DFSConfigKeys.DFS_MOVER_MOVERTHREADS_KEY,
|
||||||
DFSConfigKeys.DFS_MOVER_MOVERTHREADS_DEFAULT);
|
DFSConfigKeys.DFS_MOVER_MOVERTHREADS_DEFAULT);
|
||||||
moveExecutor = initializeBlockMoverThreadPool(moverThreads);
|
moveExecutor = initializeBlockMoverThreadPool(moverThreads);
|
||||||
moverExecutorCompletionService = new ExecutorCompletionService<>(
|
moverCompletionService = new ExecutorCompletionService<>(moveExecutor);
|
||||||
moveExecutor);
|
handler = new BlocksMovementsCompletionHandler();
|
||||||
moverTaskFutures = new ArrayList<>();
|
movementTracker = new BlockStorageMovementTracker(moverCompletionService,
|
||||||
|
handler);
|
||||||
|
movementTrackerThread = new Daemon(movementTracker);
|
||||||
|
movementTrackerThread.setName("BlockStorageMovementTracker");
|
||||||
|
movementTrackerThread.start();
|
||||||
// TODO: Needs to manage the number of concurrent moves per DataNode.
|
// TODO: Needs to manage the number of concurrent moves per DataNode.
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -133,10 +140,6 @@ public void rejectedExecution(Runnable runnable,
|
|||||||
* separate thread. Each task will move the block replica to the target node &
|
* separate thread. Each task will move the block replica to the target node &
|
||||||
* wait for the completion.
|
* wait for the completion.
|
||||||
*
|
*
|
||||||
* TODO: Presently this function is a blocking call, this has to be refined by
|
|
||||||
* moving the tracking logic to another tracker thread. HDFS-10884 jira
|
|
||||||
* addresses the same.
|
|
||||||
*
|
|
||||||
* @param trackID
|
* @param trackID
|
||||||
* unique tracking identifier
|
* unique tracking identifier
|
||||||
* @param blockPoolID
|
* @param blockPoolID
|
||||||
@ -146,68 +149,64 @@ public void rejectedExecution(Runnable runnable,
|
|||||||
*/
|
*/
|
||||||
public void processBlockMovingTasks(long trackID, String blockPoolID,
|
public void processBlockMovingTasks(long trackID, String blockPoolID,
|
||||||
Collection<BlockMovingInfo> blockMovingInfos) {
|
Collection<BlockMovingInfo> blockMovingInfos) {
|
||||||
Future<Void> moveCallable = null;
|
|
||||||
for (BlockMovingInfo blkMovingInfo : blockMovingInfos) {
|
for (BlockMovingInfo blkMovingInfo : blockMovingInfos) {
|
||||||
assert blkMovingInfo
|
assert blkMovingInfo
|
||||||
.getSources().length == blkMovingInfo.getTargets().length;
|
.getSources().length == blkMovingInfo.getTargets().length;
|
||||||
|
|
||||||
for (int i = 0; i < blkMovingInfo.getSources().length; i++) {
|
for (int i = 0; i < blkMovingInfo.getSources().length; i++) {
|
||||||
BlockMovingTask blockMovingTask = new BlockMovingTask(
|
BlockMovingTask blockMovingTask = new BlockMovingTask(
|
||||||
blkMovingInfo.getBlock(), blockPoolID,
|
trackID, blockPoolID, blkMovingInfo.getBlock(),
|
||||||
blkMovingInfo.getSources()[i], blkMovingInfo.getTargets()[i],
|
blkMovingInfo.getSources()[i], blkMovingInfo.getTargets()[i],
|
||||||
|
blkMovingInfo.getSourceStorageTypes()[i],
|
||||||
blkMovingInfo.getTargetStorageTypes()[i]);
|
blkMovingInfo.getTargetStorageTypes()[i]);
|
||||||
moveCallable = moverExecutorCompletionService.submit(blockMovingTask);
|
Future<BlockMovementResult> moveCallable = moverCompletionService
|
||||||
moverTaskFutures.add(moveCallable);
|
.submit(blockMovingTask);
|
||||||
}
|
movementTracker.addBlock(trackID, moveCallable);
|
||||||
}
|
|
||||||
|
|
||||||
for (int i = 0; i < moverTaskFutures.size(); i++) {
|
|
||||||
try {
|
|
||||||
moveCallable = moverExecutorCompletionService.take();
|
|
||||||
moveCallable.get();
|
|
||||||
} catch (InterruptedException | ExecutionException e) {
|
|
||||||
// TODO: Failure retries and report back the error to NameNode.
|
|
||||||
LOG.error("Exception while moving block replica to target storage type",
|
|
||||||
e);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class encapsulates the process of moving the block replica to the
|
* This class encapsulates the process of moving the block replica to the
|
||||||
* given target.
|
* given target and wait for the response.
|
||||||
*/
|
*/
|
||||||
private class BlockMovingTask implements Callable<Void> {
|
private class BlockMovingTask implements Callable<BlockMovementResult> {
|
||||||
|
private final long trackID;
|
||||||
|
private final String blockPoolID;
|
||||||
private final Block block;
|
private final Block block;
|
||||||
private final DatanodeInfo source;
|
private final DatanodeInfo source;
|
||||||
private final DatanodeInfo target;
|
private final DatanodeInfo target;
|
||||||
|
private final StorageType srcStorageType;
|
||||||
private final StorageType targetStorageType;
|
private final StorageType targetStorageType;
|
||||||
private String blockPoolID;
|
|
||||||
|
|
||||||
BlockMovingTask(Block block, String blockPoolID, DatanodeInfo source,
|
BlockMovingTask(long trackID, String blockPoolID, Block block,
|
||||||
DatanodeInfo target, StorageType targetStorageType) {
|
DatanodeInfo source, DatanodeInfo target,
|
||||||
this.block = block;
|
StorageType srcStorageType, StorageType targetStorageType) {
|
||||||
|
this.trackID = trackID;
|
||||||
this.blockPoolID = blockPoolID;
|
this.blockPoolID = blockPoolID;
|
||||||
|
this.block = block;
|
||||||
this.source = source;
|
this.source = source;
|
||||||
this.target = target;
|
this.target = target;
|
||||||
|
this.srcStorageType = srcStorageType;
|
||||||
this.targetStorageType = targetStorageType;
|
this.targetStorageType = targetStorageType;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Void call() {
|
public BlockMovementResult call() {
|
||||||
moveBlock();
|
BlockMovementStatus status = moveBlock();
|
||||||
return null;
|
return new BlockMovementResult(trackID, block.getBlockId(), target,
|
||||||
|
status);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void moveBlock() {
|
private BlockMovementStatus moveBlock() {
|
||||||
LOG.info("Start moving block {}", block);
|
LOG.info("Start moving block:{} from src:{} to destin:{} to satisfy "
|
||||||
|
+ "storageType, sourceStoragetype:{} and destinStoragetype:{}",
|
||||||
LOG.debug("Start moving block:{} from src:{} to destin:{} to satisfy "
|
block, source, target, srcStorageType, targetStorageType);
|
||||||
+ "storageType:{}", block, source, target, targetStorageType);
|
|
||||||
Socket sock = null;
|
Socket sock = null;
|
||||||
DataOutputStream out = null;
|
DataOutputStream out = null;
|
||||||
DataInputStream in = null;
|
DataInputStream in = null;
|
||||||
try {
|
try {
|
||||||
|
ExtendedBlock extendedBlock = new ExtendedBlock(blockPoolID, block);
|
||||||
DNConf dnConf = datanode.getDnConf();
|
DNConf dnConf = datanode.getDnConf();
|
||||||
String dnAddr = target.getXferAddr(dnConf.getConnectToDnViaHostname());
|
String dnAddr = target.getXferAddr(dnConf.getConnectToDnViaHostname());
|
||||||
sock = datanode.newSocket();
|
sock = datanode.newSocket();
|
||||||
@ -218,7 +217,6 @@ private void moveBlock() {
|
|||||||
|
|
||||||
OutputStream unbufOut = sock.getOutputStream();
|
OutputStream unbufOut = sock.getOutputStream();
|
||||||
InputStream unbufIn = sock.getInputStream();
|
InputStream unbufIn = sock.getInputStream();
|
||||||
ExtendedBlock extendedBlock = new ExtendedBlock(blockPoolID, block);
|
|
||||||
Token<BlockTokenIdentifier> accessToken = datanode.getBlockAccessToken(
|
Token<BlockTokenIdentifier> accessToken = datanode.getBlockAccessToken(
|
||||||
extendedBlock, EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE));
|
extendedBlock, EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE));
|
||||||
|
|
||||||
@ -239,12 +237,14 @@ private void moveBlock() {
|
|||||||
"Successfully moved block:{} from src:{} to destin:{} for"
|
"Successfully moved block:{} from src:{} to destin:{} for"
|
||||||
+ " satisfying storageType:{}",
|
+ " satisfying storageType:{}",
|
||||||
block, source, target, targetStorageType);
|
block, source, target, targetStorageType);
|
||||||
|
return BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_SUCCESS;
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
// TODO: handle failure retries
|
// TODO: handle failure retries
|
||||||
LOG.warn(
|
LOG.warn(
|
||||||
"Failed to move block:{} from src:{} to destin:{} to satisfy "
|
"Failed to move block:{} from src:{} to destin:{} to satisfy "
|
||||||
+ "storageType:{}",
|
+ "storageType:{}",
|
||||||
block, source, target, targetStorageType, e);
|
block, source, target, targetStorageType, e);
|
||||||
|
return BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_FAILURE;
|
||||||
} finally {
|
} finally {
|
||||||
IOUtils.closeStream(out);
|
IOUtils.closeStream(out);
|
||||||
IOUtils.closeStream(in);
|
IOUtils.closeStream(in);
|
||||||
@ -272,4 +272,102 @@ private void receiveResponse(DataInputStream in) throws IOException {
|
|||||||
DataTransferProtoUtil.checkBlockOpStatus(response, logInfo);
|
DataTransferProtoUtil.checkBlockOpStatus(response, logInfo);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Block movement status code.
|
||||||
|
*/
|
||||||
|
enum BlockMovementStatus {
|
||||||
|
/** Success. */
|
||||||
|
DN_BLK_STORAGE_MOVEMENT_SUCCESS(0),
|
||||||
|
/**
|
||||||
|
* Failure due to generation time stamp mismatches or network errors
|
||||||
|
* or no available space.
|
||||||
|
*/
|
||||||
|
DN_BLK_STORAGE_MOVEMENT_FAILURE(-1);
|
||||||
|
|
||||||
|
// TODO: need to support different type of failures. Failure due to network
|
||||||
|
// errors, block pinned, no space available etc.
|
||||||
|
|
||||||
|
private final int code;
|
||||||
|
|
||||||
|
private BlockMovementStatus(int code) {
|
||||||
|
this.code = code;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the status code.
|
||||||
|
*/
|
||||||
|
int getStatusCode() {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This class represents result from a block movement task. This will have the
|
||||||
|
* information of the task which was successful or failed due to errors.
|
||||||
|
*/
|
||||||
|
static class BlockMovementResult {
|
||||||
|
private final long trackId;
|
||||||
|
private final long blockId;
|
||||||
|
private final DatanodeInfo target;
|
||||||
|
private final BlockMovementStatus status;
|
||||||
|
|
||||||
|
public BlockMovementResult(long trackId, long blockId,
|
||||||
|
DatanodeInfo target, BlockMovementStatus status) {
|
||||||
|
this.trackId = trackId;
|
||||||
|
this.blockId = blockId;
|
||||||
|
this.target = target;
|
||||||
|
this.status = status;
|
||||||
|
}
|
||||||
|
|
||||||
|
long getTrackId() {
|
||||||
|
return trackId;
|
||||||
|
}
|
||||||
|
|
||||||
|
long getBlockId() {
|
||||||
|
return blockId;
|
||||||
|
}
|
||||||
|
|
||||||
|
BlockMovementStatus getStatus() {
|
||||||
|
return status;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return new StringBuilder().append("Block movement result(\n ")
|
||||||
|
.append("track id: ").append(trackId).append(" block id: ")
|
||||||
|
.append(blockId).append(" target node: ").append(target)
|
||||||
|
.append(" movement status: ").append(status).append(")").toString();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Blocks movements completion handler, which is used to collect details of
|
||||||
|
* the completed list of block movements and notify the namenode about the
|
||||||
|
* success or failures.
|
||||||
|
*/
|
||||||
|
static class BlocksMovementsCompletionHandler {
|
||||||
|
private final List<BlockMovementResult> completedBlocks = new ArrayList<>();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Collect all the block movement results and notify namenode.
|
||||||
|
*
|
||||||
|
* @param results
|
||||||
|
* result of all the block movements per trackId
|
||||||
|
*/
|
||||||
|
void handle(List<BlockMovementResult> results) {
|
||||||
|
completedBlocks.addAll(results);
|
||||||
|
// TODO: notify namenode about the success/failures.
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
List<BlockMovementResult> getCompletedBlocks() {
|
||||||
|
return completedBlocks;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
BlocksMovementsCompletionHandler getBlocksMovementsCompletionHandler() {
|
||||||
|
return handler;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -35,10 +35,10 @@
|
|||||||
* service. After the block movement this DataNode sends response back to the
|
* service. After the block movement this DataNode sends response back to the
|
||||||
* NameNode about the movement status.
|
* NameNode about the movement status.
|
||||||
*
|
*
|
||||||
* The coordinator datanode will use 'trackId' identifier to coordinate the block
|
* The coordinator datanode will use 'trackId' identifier to coordinate the
|
||||||
* movement of the given set of blocks. TrackId is a unique identifier that
|
* block movement of the given set of blocks. TrackId is a unique identifier
|
||||||
* represents a group of blocks. Namenode will generate this unique value and
|
* that represents a group of blocks. Namenode will generate this unique value
|
||||||
* send it to the coordinator datanode along with the
|
* and send it to the coordinator datanode along with the
|
||||||
* BlockStorageMovementCommand. Datanode will monitor the completion of the
|
* BlockStorageMovementCommand. Datanode will monitor the completion of the
|
||||||
* block movements that grouped under this trackId and notifies Namenode about
|
* block movements that grouped under this trackId and notifies Namenode about
|
||||||
* the completion status.
|
* the completion status.
|
||||||
@ -153,11 +153,11 @@ public String toString() {
|
|||||||
return new StringBuilder().append("BlockMovingInfo(\n ")
|
return new StringBuilder().append("BlockMovingInfo(\n ")
|
||||||
.append("Moving block: ").append(blk).append(" From: ")
|
.append("Moving block: ").append(blk).append(" From: ")
|
||||||
.append(Arrays.asList(sourceNodes)).append(" To: [")
|
.append(Arrays.asList(sourceNodes)).append(" To: [")
|
||||||
.append(Arrays.asList(targetNodes)).append(")\n")
|
.append(Arrays.asList(targetNodes)).append("\n ")
|
||||||
.append(" sourceStorageTypes: ")
|
.append(" sourceStorageTypes: ")
|
||||||
.append(Arrays.toString(sourceStorageTypes))
|
.append(Arrays.toString(sourceStorageTypes))
|
||||||
.append(" targetStorageTypes: ")
|
.append(" targetStorageTypes: ")
|
||||||
.append(Arrays.toString(targetStorageTypes)).toString();
|
.append(Arrays.toString(targetStorageTypes)).append(")").toString();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -18,6 +18,7 @@
|
|||||||
package org.apache.hadoop.hdfs.server.datanode;
|
package org.apache.hadoop.hdfs.server.datanode;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
@ -33,10 +34,15 @@
|
|||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlockMovementResult;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlockMovementStatus;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.INode;
|
import org.apache.hadoop.hdfs.server.namenode.INode;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
|
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
@ -51,8 +57,9 @@ public class TestStoragePolicySatisfyWorker {
|
|||||||
|
|
||||||
private static final Logger LOG = LoggerFactory
|
private static final Logger LOG = LoggerFactory
|
||||||
.getLogger(TestStoragePolicySatisfyWorker.class);
|
.getLogger(TestStoragePolicySatisfyWorker.class);
|
||||||
|
|
||||||
private static final int DEFAULT_BLOCK_SIZE = 100;
|
private static final int DEFAULT_BLOCK_SIZE = 100;
|
||||||
|
private MiniDFSCluster cluster = null;
|
||||||
|
private final Configuration conf = new HdfsConfiguration();
|
||||||
|
|
||||||
private static void initConf(Configuration conf) {
|
private static void initConf(Configuration conf) {
|
||||||
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
|
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
|
||||||
@ -63,64 +70,141 @@ private static void initConf(Configuration conf) {
|
|||||||
conf.setLong(DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, 2000L);
|
conf.setLong(DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, 2000L);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws IOException {
|
||||||
|
initConf(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void teardown() throws IOException {
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests to verify that the block replica is moving to ARCHIVE storage type to
|
* Tests to verify that the block replica is moving to ARCHIVE storage type to
|
||||||
* fulfill the storage policy requirement.
|
* fulfill the storage policy requirement.
|
||||||
*/
|
*/
|
||||||
@Test(timeout = 120000)
|
@Test(timeout = 120000)
|
||||||
public void testMoveSingleBlockToAnotherDatanode() throws Exception {
|
public void testMoveSingleBlockToAnotherDatanode() throws Exception {
|
||||||
final Configuration conf = new HdfsConfiguration();
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(4)
|
||||||
initConf(conf);
|
.storageTypes(
|
||||||
final MiniDFSCluster cluster =
|
new StorageType[][] {{StorageType.DISK, StorageType.ARCHIVE},
|
||||||
new MiniDFSCluster.Builder(conf).numDataNodes(4)
|
{StorageType.DISK, StorageType.ARCHIVE},
|
||||||
.storageTypes(
|
{StorageType.ARCHIVE, StorageType.ARCHIVE},
|
||||||
new StorageType[][]{{StorageType.DISK, StorageType.ARCHIVE},
|
{StorageType.ARCHIVE, StorageType.ARCHIVE}})
|
||||||
{StorageType.DISK, StorageType.ARCHIVE},
|
.build();
|
||||||
{StorageType.ARCHIVE, StorageType.ARCHIVE},
|
cluster.waitActive();
|
||||||
{StorageType.ARCHIVE, StorageType.ARCHIVE}})
|
final DistributedFileSystem dfs = cluster.getFileSystem();
|
||||||
.build();
|
final String file = "/testMoveSingleBlockToAnotherDatanode";
|
||||||
try {
|
// write to DISK
|
||||||
cluster.waitActive();
|
final FSDataOutputStream out = dfs.create(new Path(file), (short) 2);
|
||||||
final DistributedFileSystem dfs = cluster.getFileSystem();
|
out.writeChars("testMoveSingleBlockToAnotherDatanode");
|
||||||
final String file = "/testMoveSingleBlockToAnotherDatanode";
|
out.close();
|
||||||
// write to DISK
|
|
||||||
final FSDataOutputStream out = dfs.create(new Path(file), (short) 2);
|
|
||||||
out.writeChars("testMoveSingleBlockToAnotherDatanode");
|
|
||||||
out.close();
|
|
||||||
|
|
||||||
// verify before movement
|
// verify before movement
|
||||||
LocatedBlock lb = dfs.getClient().getLocatedBlocks(file, 0).get(0);
|
LocatedBlock lb = dfs.getClient().getLocatedBlocks(file, 0).get(0);
|
||||||
StorageType[] storageTypes = lb.getStorageTypes();
|
StorageType[] storageTypes = lb.getStorageTypes();
|
||||||
for (StorageType storageType : storageTypes) {
|
for (StorageType storageType : storageTypes) {
|
||||||
Assert.assertTrue(StorageType.DISK == storageType);
|
Assert.assertTrue(StorageType.DISK == storageType);
|
||||||
}
|
|
||||||
// move to ARCHIVE
|
|
||||||
dfs.setStoragePolicy(new Path(file), "COLD");
|
|
||||||
|
|
||||||
lb = dfs.getClient().getLocatedBlocks(file, 0).get(0);
|
|
||||||
DataNode src = cluster.getDataNodes().get(3);
|
|
||||||
DatanodeInfo targetDnInfo = DFSTestUtil
|
|
||||||
.getLocalDatanodeInfo(src.getXferPort());
|
|
||||||
|
|
||||||
// TODO: Need to revisit this when NN is implemented to be able to send
|
|
||||||
// block moving commands.
|
|
||||||
StoragePolicySatisfyWorker worker = new StoragePolicySatisfyWorker(conf,
|
|
||||||
src);
|
|
||||||
List<BlockMovingInfo> blockMovingInfos = new ArrayList<>();
|
|
||||||
BlockMovingInfo blockMovingInfo = prepareBlockMovingInfo(
|
|
||||||
lb.getBlock().getLocalBlock(), lb.getLocations()[0], targetDnInfo,
|
|
||||||
lb.getStorageTypes()[0], StorageType.ARCHIVE);
|
|
||||||
blockMovingInfos.add(blockMovingInfo);
|
|
||||||
INode inode = cluster.getNamesystem().getFSDirectory().getINode(file);
|
|
||||||
worker.processBlockMovingTasks(inode.getId(),
|
|
||||||
cluster.getNamesystem().getBlockPoolId(), blockMovingInfos);
|
|
||||||
cluster.triggerHeartbeats();
|
|
||||||
|
|
||||||
// Wait till NameNode notified about the block location details
|
|
||||||
waitForLocatedBlockWithArchiveStorageType(dfs, file, 1, 30000);
|
|
||||||
} finally {
|
|
||||||
cluster.shutdown();
|
|
||||||
}
|
}
|
||||||
|
// move to ARCHIVE
|
||||||
|
dfs.setStoragePolicy(new Path(file), "COLD");
|
||||||
|
|
||||||
|
FSNamesystem namesystem = cluster.getNamesystem();
|
||||||
|
INode inode = namesystem.getFSDirectory().getINode(file);
|
||||||
|
namesystem.getBlockManager().satisfyStoragePolicy(inode.getId());
|
||||||
|
|
||||||
|
cluster.triggerHeartbeats();
|
||||||
|
|
||||||
|
// Wait till NameNode notified about the block location details
|
||||||
|
waitForLocatedBlockWithArchiveStorageType(dfs, file, 2, 30000);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test to verify that satisfy worker can't move blocks. If specified target
|
||||||
|
* datanode doesn't have enough space to accommodate the moving block.
|
||||||
|
*/
|
||||||
|
@Test(timeout = 120000)
|
||||||
|
public void testMoveWithNoSpaceAvailable() throws Exception {
|
||||||
|
final long capacity = 150;
|
||||||
|
final String rack0 = "/rack0";
|
||||||
|
final String rack1 = "/rack1";
|
||||||
|
long[] capacities = new long[] {capacity, capacity, capacity / 2};
|
||||||
|
String[] hosts = {"host0", "host1", "host2"};
|
||||||
|
String[] racks = {rack0, rack1, rack0};
|
||||||
|
int numOfDatanodes = capacities.length;
|
||||||
|
|
||||||
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numOfDatanodes)
|
||||||
|
.hosts(hosts).racks(racks).simulatedCapacities(capacities)
|
||||||
|
.storageTypes(
|
||||||
|
new StorageType[][] {{StorageType.DISK, StorageType.ARCHIVE},
|
||||||
|
{StorageType.DISK, StorageType.ARCHIVE},
|
||||||
|
{StorageType.ARCHIVE, StorageType.ARCHIVE}})
|
||||||
|
.build();
|
||||||
|
|
||||||
|
cluster.waitActive();
|
||||||
|
InetSocketAddress[] favoredNodes = new InetSocketAddress[3];
|
||||||
|
for (int i = 0; i < favoredNodes.length; i++) {
|
||||||
|
// DFSClient will attempt reverse lookup. In case it resolves
|
||||||
|
// "127.0.0.1" to "localhost", we manually specify the hostname.
|
||||||
|
favoredNodes[i] = cluster.getDataNodes().get(i).getXferAddress();
|
||||||
|
}
|
||||||
|
final DistributedFileSystem dfs = cluster.getFileSystem();
|
||||||
|
final String file = "/testMoveWithNoSpaceAvailable";
|
||||||
|
DFSTestUtil.createFile(dfs, new Path(file), false, 1024, 100,
|
||||||
|
DEFAULT_BLOCK_SIZE, (short) 2, 0, false, favoredNodes);
|
||||||
|
|
||||||
|
// verify before movement
|
||||||
|
LocatedBlock lb = dfs.getClient().getLocatedBlocks(file, 0).get(0);
|
||||||
|
StorageType[] storageTypes = lb.getStorageTypes();
|
||||||
|
for (StorageType storageType : storageTypes) {
|
||||||
|
Assert.assertTrue(StorageType.DISK == storageType);
|
||||||
|
}
|
||||||
|
|
||||||
|
// move to ARCHIVE
|
||||||
|
dfs.setStoragePolicy(new Path(file), "COLD");
|
||||||
|
|
||||||
|
lb = dfs.getClient().getLocatedBlocks(file, 0).get(0);
|
||||||
|
DataNode src = cluster.getDataNodes().get(2);
|
||||||
|
DatanodeInfo targetDnInfo = DFSTestUtil
|
||||||
|
.getLocalDatanodeInfo(src.getXferPort());
|
||||||
|
|
||||||
|
StoragePolicySatisfyWorker worker = new StoragePolicySatisfyWorker(conf,
|
||||||
|
src);
|
||||||
|
List<BlockMovingInfo> blockMovingInfos = new ArrayList<>();
|
||||||
|
BlockMovingInfo blockMovingInfo = prepareBlockMovingInfo(
|
||||||
|
lb.getBlock().getLocalBlock(), lb.getLocations()[0], targetDnInfo,
|
||||||
|
lb.getStorageTypes()[0], StorageType.ARCHIVE);
|
||||||
|
blockMovingInfos.add(blockMovingInfo);
|
||||||
|
INode inode = cluster.getNamesystem().getFSDirectory().getINode(file);
|
||||||
|
worker.processBlockMovingTasks(inode.getId(),
|
||||||
|
cluster.getNamesystem().getBlockPoolId(), blockMovingInfos);
|
||||||
|
|
||||||
|
waitForBlockMovementCompletion(worker, inode.getId(), 1, 30000);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void waitForBlockMovementCompletion(
|
||||||
|
final StoragePolicySatisfyWorker worker, final long inodeId,
|
||||||
|
int expectedFailedItemsCount, int timeout) throws Exception {
|
||||||
|
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||||
|
@Override
|
||||||
|
public Boolean get() {
|
||||||
|
List<BlockMovementResult> completedBlocks = worker
|
||||||
|
.getBlocksMovementsCompletionHandler().getCompletedBlocks();
|
||||||
|
int failedCount = 0;
|
||||||
|
for (BlockMovementResult blockMovementResult : completedBlocks) {
|
||||||
|
if (BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_FAILURE ==
|
||||||
|
blockMovementResult.getStatus()) {
|
||||||
|
failedCount++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
LOG.info("Block movement completed count={}, expected={} and actual={}",
|
||||||
|
completedBlocks.size(), expectedFailedItemsCount, failedCount);
|
||||||
|
return expectedFailedItemsCount == failedCount;
|
||||||
|
}
|
||||||
|
}, 100, timeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void waitForLocatedBlockWithArchiveStorageType(
|
private void waitForLocatedBlockWithArchiveStorageType(
|
||||||
@ -150,7 +234,7 @@ public Boolean get() {
|
|||||||
}, 100, timeout);
|
}, 100, timeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
BlockMovingInfo prepareBlockMovingInfo(Block block,
|
private BlockMovingInfo prepareBlockMovingInfo(Block block,
|
||||||
DatanodeInfo src, DatanodeInfo destin, StorageType storageType,
|
DatanodeInfo src, DatanodeInfo destin, StorageType storageType,
|
||||||
StorageType targetStorageType) {
|
StorageType targetStorageType) {
|
||||||
return new BlockMovingInfo(block, new DatanodeInfo[] {src},
|
return new BlockMovingInfo(block, new DatanodeInfo[] {src},
|
||||||
|
Loading…
Reference in New Issue
Block a user