HDFS-11243. [SPS]: Add a protocol command from NN to DN for dropping the SPS work and queues. Contributed by Uma Maheswara Rao G

This commit is contained in:
Rakesh Radhakrishnan 2017-01-31 23:44:01 +05:30 committed by Uma Maheswara Rao Gangumalla
parent f8fc96a66e
commit e34331c31d
12 changed files with 216 additions and 15 deletions

View File

@ -45,6 +45,7 @@
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockRecoveryCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeRegistrationProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DropSPSWorkCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.FinalizeCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.KeyUpdateCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReceivedDeletedBlockInfoProto;
@ -112,6 +113,7 @@
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DropSPSWorkCommand;
import org.apache.hadoop.hdfs.server.protocol.FinalizeCommand;
import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
@ -143,6 +145,10 @@ public class PBHelper {
private static final RegisterCommandProto REG_CMD_PROTO =
RegisterCommandProto.newBuilder().build();
private static final RegisterCommand REG_CMD = new RegisterCommand();
private static final DropSPSWorkCommandProto DROP_SPS_WORK_CMD_PROTO =
DropSPSWorkCommandProto.newBuilder().build();
private static final DropSPSWorkCommand DROP_SPS_WORK_CMD =
new DropSPSWorkCommand();
private PBHelper() {
/** Hidden constructor */
@ -478,6 +484,8 @@ public static DatanodeCommand convert(DatanodeCommandProto proto) {
return PBHelper.convert(proto.getBlkECReconstructionCmd());
case BlockStorageMovementCommand:
return PBHelper.convert(proto.getBlkStorageMovementCmd());
case DropSPSWorkCommand:
return DROP_SPS_WORK_CMD;
default:
return null;
}
@ -617,6 +625,10 @@ public static DatanodeCommandProto convert(DatanodeCommand datanodeCommand) {
.setBlkStorageMovementCmd(
convert((BlockStorageMovementCommand) datanodeCommand));
break;
case DatanodeProtocol.DNA_DROP_SPS_WORK_COMMAND:
builder.setCmdType(DatanodeCommandProto.Type.DropSPSWorkCommand)
.setDropSPSWorkCmd(DROP_SPS_WORK_CMD_PROTO);
break;
case DatanodeProtocol.DNA_UNKNOWN: //Not expected
default:
builder.setCmdType(DatanodeCommandProto.Type.NullDatanodeCommand);

View File

@ -720,13 +720,13 @@ public void activate(Configuration conf, long blockTotal) {
mxBeanName = MBeans.register("NameNode", "BlockStats", this);
bmSafeMode.activate(blockTotal);
if (sps != null && !haEnabled) {
sps.start();
sps.start(false);
}
}
public void close() {
if (sps != null) {
sps.stop();
sps.stop(false);
}
bmSafeMode.close();
try {
@ -5053,7 +5053,7 @@ public void activateSPS() {
return;
}
sps.start();
sps.start(true);
}
/**
@ -5067,12 +5067,7 @@ public void deactivateSPS() {
LOG.info("Storage policy satisfier is already stopped.");
return;
}
sps.stop();
// TODO: add command to DNs for stop in-progress processing SPS commands?
// to avoid confusions in cluster, I think sending commands from centralized
// place would be better to drop pending queues at DN. Anyway in progress
// work will be finished in a while, but this command can void starting
// fresh movements at DN.
sps.stop(true);
}
/**

View File

@ -214,6 +214,7 @@ public Type getType() {
*/
private final Queue<BlockStorageMovementInfosBatch> storageMovementBlocks =
new LinkedList<>();
private volatile boolean dropSPSWork = false;
/* Variables for maintaining number of blocks scheduled to be written to
* this storage. This count is approximate and might be slightly bigger
@ -1104,4 +1105,21 @@ public BlockStorageMovementInfosBatch getBlocksToMoveStorages() {
return storageMovementBlocks.poll();
}
}
/**
* Set whether to drop SPS related queues at DN side.
*
* @param dropSPSWork
* - true if need to drop SPS queues, otherwise false.
*/
public synchronized void setDropSPSWork(boolean dropSPSWork) {
this.dropSPSWork = dropSPSWork;
}
/**
* @return true if need to drop SPS queues at DN.
*/
public synchronized boolean shouldDropSPSWork() {
return this.dropSPSWork;
}
}

View File

@ -1750,6 +1750,13 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
blkStorageMovementInfosBatch.getBlockMovingInfo()));
}
if (nodeinfo.shouldDropSPSWork()) {
cmds.add(DropSPSWorkCommand.DNA_DROP_SPS_WORK_COMMAND);
// Set back to false to indicate that the new value has been sent to the
// datanode.
nodeinfo.setDropSPSWork(false);
}
if (!cmds.isEmpty()) {
return cmds.toArray(new DatanodeCommand[cmds.size()]);
}
@ -1978,5 +1985,17 @@ public String getSlowDisksReport() {
return slowDiskTracker != null ?
slowDiskTracker.getSlowDiskReportAsJsonString() : null;
}
/**
* Mark all DNs to drop SPS queues. A DNA_DROP_SPS_WORK_COMMAND will be added
* in heartbeat response, which will indicate DN to drop SPS queues
*/
public void addDropSPSWorkCommandsToAllDNs() {
synchronized (this) {
for (DatanodeDescriptor dn : datanodeMap.values()) {
dn.setDropSPSWork(true);
}
}
}
}

View File

@ -802,6 +802,10 @@ assert getBlockPoolId().equals(bp) :
blkSPSCmd.getTrackID(), blkSPSCmd.getBlockPoolId(),
blkSPSCmd.getBlockMovingTasks());
break;
case DatanodeProtocol.DNA_DROP_SPS_WORK_COMMAND:
LOG.info("DatanodeCommand action: DNA_DROP_SPS_WORK_COMMAND");
dn.getStoragePolicySatisfyWorker().dropSPSWork();
break;
default:
LOG.warn("Unknown DatanodeCommand action: " + cmd.getAction());
}

View File

@ -146,4 +146,16 @@ void addBlock(long trackID, Future<BlockMovementResult> futureTask) {
moverTaskFutures.notify();
}
}
/**
* Clear the pending movement and movement result queues.
*/
void removeAll() {
synchronized (moverTaskFutures) {
moverTaskFutures.clear();
}
synchronized (movementResults) {
movementResults.clear();
}
}
}

View File

@ -115,7 +115,6 @@ private ThreadPoolExecutor initializeBlockMoverThreadPool(int num) {
TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
new Daemon.DaemonFactory() {
private final AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread t = super.newThread(r);
@ -421,10 +420,31 @@ void remove(BlocksStorageMovementResult[] results) {
}
}
}
/**
* Clear the trackID vs movement status tracking map.
*/
void removeAll() {
synchronized (trackIdVsMovementStatus) {
trackIdVsMovementStatus.clear();
}
}
}
@VisibleForTesting
BlocksMovementsCompletionHandler getBlocksMovementsCompletionHandler() {
return handler;
}
/**
* Drop the in-progress SPS work queues.
*/
public void dropSPSWork() {
LOG.info("Received request to drop StoragePolicySatisfierWorker queues. "
+ "So, none of the SPS Worker queued block movements will"
+ " be scheduled.");
movementTracker.removeAll();
handler.removeAll();
}
}

View File

@ -99,9 +99,14 @@ public StoragePolicySatisfier(final Namesystem namesystem,
* Start storage policy satisfier demon thread. Also start block storage
* movements monitor for retry the attempts if needed.
*/
public synchronized void start() {
public synchronized void start(boolean reconfigStart) {
isRunning = true;
LOG.info("Starting StoragePolicySatisfier.");
if (reconfigStart) {
LOG.info("Starting StoragePolicySatisfier, as admin requested to "
+ "activate it.");
} else {
LOG.info("Starting StoragePolicySatisfier.");
}
storagePolicySatisfierThread = new Daemon(this);
storagePolicySatisfierThread.setName("StoragePolicySatisfier");
storagePolicySatisfierThread.start();
@ -110,10 +115,17 @@ public synchronized void start() {
/**
* Stop storage policy satisfier demon thread.
*
* @param reconfigStop
*/
public synchronized void stop() {
public synchronized void stop(boolean reconfigStop) {
isRunning = false;
LOG.info("Stopping StoragePolicySatisfier.");
if (reconfigStop) {
LOG.info("Stopping StoragePolicySatisfier, as admin requested to "
+ "deactivate it.");
} else {
LOG.info("Stopping StoragePolicySatisfier.");
}
if (storagePolicySatisfierThread == null) {
return;
}
@ -123,7 +135,10 @@ public synchronized void stop() {
} catch (InterruptedException ie) {
}
this.storageMovementsMonitor.stop();
this.clearQueues();
if (reconfigStop) {
this.clearQueues();
this.blockManager.getDatanodeManager().addDropSPSWorkCommandsToAllDNs();
}
}
/**

View File

@ -80,6 +80,8 @@ public interface DatanodeProtocol {
final static int DNA_UNCACHE = 10; // uncache blocks
final static int DNA_ERASURE_CODING_RECONSTRUCTION = 11; // erasure coding reconstruction command
final static int DNA_BLOCK_STORAGE_MOVEMENT = 12; // block storage movement command
final static int DNA_DROP_SPS_WORK_COMMAND = 13; // block storage movement
// command
/**
* Register Datanode.

View File

@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.protocol;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* A DropSPSWorkCommand is an instruction to a datanode to drop the SPSWorker's
* pending block storage movement queues.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class DropSPSWorkCommand extends DatanodeCommand {
public static final DropSPSWorkCommand DNA_DROP_SPS_WORK_COMMAND =
new DropSPSWorkCommand();
public DropSPSWorkCommand() {
super(DatanodeProtocol.DNA_DROP_SPS_WORK_COMMAND);
}
}

View File

@ -61,6 +61,7 @@ message DatanodeCommandProto {
BlockIdCommand = 8;
BlockECReconstructionCommand = 9;
BlockStorageMovementCommand = 10;
DropSPSWorkCommand = 11;
}
required Type cmdType = 1; // Type of the command
@ -76,6 +77,7 @@ message DatanodeCommandProto {
optional BlockIdCommandProto blkIdCmd = 8;
optional BlockECReconstructionCommandProto blkECReconstructionCmd = 9;
optional BlockStorageMovementCommandProto blkStorageMovementCmd = 10;
optional DropSPSWorkCommandProto dropSPSWorkCmd = 11;
}
/**
@ -165,6 +167,13 @@ message BlockStorageMovementCommandProto {
repeated BlockStorageMovementProto blockStorageMovement = 3;
}
/**
* Instruct datanode to drop SPS work queues
*/
message DropSPSWorkCommandProto {
// void
}
/**
* Block storage movement information
*/

View File

@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hdfs.server.datanode;
import static org.junit.Assert.*;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
@ -186,6 +188,63 @@ public void testMoveWithNoSpaceAvailable() throws Exception {
waitForBlockMovementCompletion(worker, inode.getId(), 1, 30000);
}
/**
* Tests that drop SPS work method clears all the queues.
*
* @throws Exception
*/
@Test(timeout = 120000)
public void testDropSPSWork() throws Exception {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(20).build();
cluster.waitActive();
final DistributedFileSystem dfs = cluster.getFileSystem();
final String file = "/testDropSPSWork";
DFSTestUtil.createFile(dfs, new Path(file), false, 1024, 50 * 100,
DEFAULT_BLOCK_SIZE, (short) 2, 0, false, null);
// move to ARCHIVE
dfs.setStoragePolicy(new Path(file), "COLD");
DataNode src = cluster.getDataNodes().get(2);
DatanodeInfo targetDnInfo =
DFSTestUtil.getLocalDatanodeInfo(src.getXferPort());
StoragePolicySatisfyWorker worker =
new StoragePolicySatisfyWorker(conf, src);
List<BlockMovingInfo> blockMovingInfos = new ArrayList<>();
List<LocatedBlock> locatedBlocks =
dfs.getClient().getLocatedBlocks(file, 0).getLocatedBlocks();
for (LocatedBlock locatedBlock : locatedBlocks) {
BlockMovingInfo blockMovingInfo =
prepareBlockMovingInfo(locatedBlock.getBlock().getLocalBlock(),
locatedBlock.getLocations()[0], targetDnInfo,
locatedBlock.getStorageTypes()[0], StorageType.ARCHIVE);
blockMovingInfos.add(blockMovingInfo);
}
INode inode = cluster.getNamesystem().getFSDirectory().getINode(file);
worker.processBlockMovingTasks(inode.getId(),
cluster.getNamesystem().getBlockPoolId(), blockMovingInfos);
// Wait till results queue build up
waitForBlockMovementResult(worker, inode.getId(), 30000);
worker.dropSPSWork();
assertTrue(worker.getBlocksMovementsCompletionHandler()
.getBlksMovementResults().size() == 0);
}
private void waitForBlockMovementResult(
final StoragePolicySatisfyWorker worker, final long inodeId, int timeout)
throws Exception {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
List<BlocksStorageMovementResult> completedBlocks = worker
.getBlocksMovementsCompletionHandler().getBlksMovementResults();
return completedBlocks.size() > 0;
}
}, 100, timeout);
}
private void waitForBlockMovementCompletion(
final StoragePolicySatisfyWorker worker, final long inodeId,
int expectedFailedItemsCount, int timeout) throws Exception {