HDFS-10954. [SPS]: Provide mechanism to send blocks movement result back to NN from coordinator DN. Contributed by Rakesh R

This commit is contained in:
Rakesh Radhakrishnan 2016-11-03 09:39:14 +05:30 committed by Uma Maheswara Rao Gangumalla
parent 24add8c2f8
commit 0f2d1ddc2c
25 changed files with 368 additions and 43 deletions

View File

@ -48,6 +48,7 @@
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReceivedDeletedBlocksProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.VersionRequestProto;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@ -138,7 +139,8 @@ public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration,
VolumeFailureSummary volumeFailureSummary,
boolean requestFullBlockReportLease,
@Nonnull SlowPeerReports slowPeers,
@Nonnull SlowDiskReports slowDisks) throws IOException {
@Nonnull SlowDiskReports slowDisks,
BlocksStorageMovementResult[] blksMovementResults) throws IOException {
HeartbeatRequestProto.Builder builder = HeartbeatRequestProto.newBuilder()
.setRegistration(PBHelper.convert(registration))
.setXmitsInProgress(xmitsInProgress).setXceiverCount(xceiverCount)
@ -161,6 +163,11 @@ public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration,
if (slowDisks.haveSlowDisks()) {
builder.addAllSlowDisks(PBHelper.convertSlowDiskInfo(slowDisks));
}
// Adding blocks movement results to the heart beat request.
builder.addAllBlksMovementResults(
PBHelper.convertBlksMovResults(blksMovementResults));
HeartbeatResponseProto resp;
try {
resp = rpcProxy.sendHeartbeat(NULL_CONTROLLER, builder.build());

View File

@ -122,7 +122,9 @@ public HeartbeatResponseProto sendHeartbeat(RpcController controller,
request.getXceiverCount(), request.getFailedVolumes(),
volumeFailureSummary, request.getRequestFullBlockReportLease(),
PBHelper.convertSlowPeerInfo(request.getSlowPeersList()),
PBHelper.convertSlowDiskInfo(request.getSlowDisksList()));
PBHelper.convertSlowDiskInfo(request.getSlowDisksList()),
PBHelper.convertBlksMovResults(
request.getBlksMovementResultsList()));
} catch (IOException e) {
throw new ServiceException(e);
}

View File

@ -56,6 +56,7 @@
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportContextProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockStorageMovementCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockStorageMovementProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlocksStorageMovementResultProto;
import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.BlockECReconstructionInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
@ -102,6 +103,8 @@
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult.Status;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.StripedBlockWithLocations;
@ -956,6 +959,55 @@ public static SlowDiskReports convertSlowDiskInfo(
return SlowDiskReports.create(slowDisksMap);
}
public static BlocksStorageMovementResult[] convertBlksMovResults(
List<BlocksStorageMovementResultProto> protos) {
BlocksStorageMovementResult[] results =
new BlocksStorageMovementResult[protos.size()];
for (int i = 0; i < protos.size(); i++) {
BlocksStorageMovementResultProto resultProto = protos.get(i);
BlocksStorageMovementResult.Status status;
switch (resultProto.getStatus()) {
case SUCCESS:
status = Status.SUCCESS;
break;
case FAILURE:
status = Status.FAILURE;
break;
default:
throw new AssertionError("Unknown status: " + resultProto.getStatus());
}
results[i] = new BlocksStorageMovementResult(resultProto.getTrackID(),
status);
}
return results;
}
public static List<BlocksStorageMovementResultProto> convertBlksMovResults(
BlocksStorageMovementResult[] blocksMovementResults) {
List<BlocksStorageMovementResultProto> blocksMovementResultsProto =
new ArrayList<>();
BlocksStorageMovementResultProto.Builder builder =
BlocksStorageMovementResultProto.newBuilder();
for (int i = 0; i < blocksMovementResults.length; i++) {
BlocksStorageMovementResult report = blocksMovementResults[i];
builder.setTrackID(report.getTrackId());
BlocksStorageMovementResultProto.Status status;
switch (report.getStatus()) {
case SUCCESS:
status = BlocksStorageMovementResultProto.Status.SUCCESS;
break;
case FAILURE:
status = BlocksStorageMovementResultProto.Status.FAILURE;
break;
default:
throw new AssertionError("Unknown status: " + report.getStatus());
}
builder.setStatus(status);
blocksMovementResultsProto.add(builder.build());
}
return blocksMovementResultsProto;
}
public static JournalInfo convert(JournalInfoProto info) {
int lv = info.hasLayoutVersion() ? info.getLayoutVersion() : 0;
int nsID = info.hasNamespaceID() ? info.getNamespaceID() : 0;

View File

@ -5000,4 +5000,8 @@ public ProvidedStorageMap getProvidedStorageMap() {
public void satisfyStoragePolicy(long id) {
storageMovementNeeded.add(id);
}
public StoragePolicySatisfier getStoragePolicySatisfier() {
return sps;
}
}

View File

@ -50,6 +50,7 @@
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
@ -511,6 +512,10 @@ HeartbeatResponse sendHeartBeat(boolean requestBlockReportLease)
outliersReportDue && dn.getDiskMetrics() != null ?
SlowDiskReports.create(dn.getDiskMetrics().getDiskOutliersStats()) :
SlowDiskReports.EMPTY_REPORT;
BlocksStorageMovementResult[] blksMovementResults =
getBlocksMovementResults();
HeartbeatResponse response = bpNamenode.sendHeartbeat(bpRegistration,
reports,
dn.getFSDataset().getCacheCapacity(),
@ -521,15 +526,33 @@ HeartbeatResponse sendHeartBeat(boolean requestBlockReportLease)
volumeFailureSummary,
requestBlockReportLease,
slowPeers,
slowDisks);
slowDisks,
blksMovementResults);
if (outliersReportDue) {
// If the report was due and successfully sent, schedule the next one.
scheduler.scheduleNextOutlierReport();
}
// Remove the blocks movement results after successfully transferring
// to namenode.
dn.getStoragePolicySatisfyWorker().getBlocksMovementsCompletionHandler()
.remove(blksMovementResults);
return response;
}
private BlocksStorageMovementResult[] getBlocksMovementResults() {
List<BlocksStorageMovementResult> trackIdVsMovementStatus = dn
.getStoragePolicySatisfyWorker().getBlocksMovementsCompletionHandler()
.getBlksMovementResults();
BlocksStorageMovementResult[] blksMovementResult =
new BlocksStorageMovementResult[trackIdVsMovementStatus.size()];
trackIdVsMovementStatus.toArray(blksMovementResult);
return blksMovementResult;
}
@VisibleForTesting
void sendLifelineForTests() throws IOException {
lifelineSender.sendLifeline();

View File

@ -29,6 +29,7 @@
import java.net.Socket;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.Callable;
@ -57,6 +58,7 @@
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.token.Token;
@ -218,7 +220,8 @@ private BlockMovementStatus moveBlock() {
OutputStream unbufOut = sock.getOutputStream();
InputStream unbufIn = sock.getInputStream();
Token<BlockTokenIdentifier> accessToken = datanode.getBlockAccessToken(
extendedBlock, EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE));
extendedBlock, EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE),
new StorageType[]{targetStorageType}, new String[0]);
DataEncryptionKeyFactory keyFactory = datanode
.getDataEncryptionKeyFactoryForBlock(extendedBlock);
@ -257,7 +260,7 @@ private void sendRequest(DataOutputStream out, ExtendedBlock eb,
Token<BlockTokenIdentifier> accessToken, DatanodeInfo srcDn,
StorageType destinStorageType) throws IOException {
new Sender(out).replaceBlock(eb, destinStorageType, accessToken,
srcDn.getDatanodeUuid(), srcDn);
srcDn.getDatanodeUuid(), srcDn, null);
}
/** Receive a reportedBlock copy response from the input stream. */
@ -276,7 +279,7 @@ private void receiveResponse(DataInputStream in) throws IOException {
/**
* Block movement status code.
*/
enum BlockMovementStatus {
public static enum BlockMovementStatus {
/** Success. */
DN_BLK_STORAGE_MOVEMENT_SUCCESS(0),
/**
@ -343,26 +346,72 @@ public String toString() {
/**
* Blocks movements completion handler, which is used to collect details of
* the completed list of block movements and notify the namenode about the
* success or failures.
* the completed list of block movements and this status(success or failure)
* will be send to the namenode via heartbeat.
*/
static class BlocksMovementsCompletionHandler {
private final List<BlockMovementResult> completedBlocks = new ArrayList<>();
private final List<BlocksStorageMovementResult> trackIdVsMovementStatus =
new ArrayList<>();
/**
* Collect all the block movement results and notify namenode.
* Collect all the block movement results. Later this will be send to
* namenode via heart beat.
*
* @param results
* result of all the block movements per trackId
*/
void handle(List<BlockMovementResult> results) {
completedBlocks.addAll(results);
// TODO: notify namenode about the success/failures.
void handle(List<BlockMovementResult> resultsPerTrackId) {
BlocksStorageMovementResult.Status status =
BlocksStorageMovementResult.Status.SUCCESS;
long trackId = -1;
for (BlockMovementResult blockMovementResult : resultsPerTrackId) {
trackId = blockMovementResult.getTrackId();
if (blockMovementResult.status ==
BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_FAILURE) {
status = BlocksStorageMovementResult.Status.FAILURE;
// If any of the block movement is failed, then mark as failure so
// that namenode can take a decision to retry the blocks associated to
// the given trackId.
break;
}
}
// Adding to the tracking results list. Later this will be send to
// namenode via datanode heartbeat.
synchronized (trackIdVsMovementStatus) {
trackIdVsMovementStatus.add(
new BlocksStorageMovementResult(trackId, status));
}
}
@VisibleForTesting
List<BlockMovementResult> getCompletedBlocks() {
return completedBlocks;
/**
* @return unmodifiable list of blocks storage movement results.
*/
List<BlocksStorageMovementResult> getBlksMovementResults() {
synchronized (trackIdVsMovementStatus) {
if (trackIdVsMovementStatus.size() <= 0) {
return new ArrayList<>();
}
List<BlocksStorageMovementResult> results = Collections
.unmodifiableList(trackIdVsMovementStatus);
return results;
}
}
/**
* Remove the blocks storage movement results.
*
* @param results
* set of blocks storage movement results
*/
void remove(BlocksStorageMovementResult[] results) {
if (results != null) {
synchronized (trackIdVsMovementStatus) {
for (BlocksStorageMovementResult blocksMovementResult : results) {
trackIdVsMovementStatus.remove(blocksMovementResult);
}
}
}
}
}

View File

@ -264,6 +264,7 @@
import org.apache.hadoop.hdfs.server.namenode.top.TopConf;
import org.apache.hadoop.hdfs.server.namenode.top.metrics.TopMetrics;
import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager;
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@ -3860,7 +3861,8 @@ HeartbeatResponse handleHeartbeat(DatanodeRegistration nodeReg,
VolumeFailureSummary volumeFailureSummary,
boolean requestFullBlockReportLease,
@Nonnull SlowPeerReports slowPeers,
@Nonnull SlowDiskReports slowDisks) throws IOException {
@Nonnull SlowDiskReports slowDisks,
BlocksStorageMovementResult[] blksMovementResults) throws IOException {
readLock();
try {
//get datanode commands
@ -3874,6 +3876,12 @@ nodeReg, reports, getBlockPoolId(), cacheCapacity, cacheUsed,
if (requestFullBlockReportLease) {
blockReportLeaseId = blockManager.requestBlockReportLeaseId(nodeReg);
}
// TODO: Handle blocks movement results send by the coordinator datanode.
// This has to be revisited as part of HDFS-11029.
blockManager.getStoragePolicySatisfier()
.handleBlocksStorageMovementResults(blksMovementResults);
//create ha status
final NNHAStatusHeartbeat haState = new NNHAStatusHeartbeat(
haContext.getState().getServiceState(),

View File

@ -156,6 +156,7 @@
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
@ -1498,13 +1499,15 @@ public HeartbeatResponse sendHeartbeat(DatanodeRegistration nodeReg,
int failedVolumes, VolumeFailureSummary volumeFailureSummary,
boolean requestFullBlockReportLease,
@Nonnull SlowPeerReports slowPeers,
@Nonnull SlowDiskReports slowDisks) throws IOException {
@Nonnull SlowDiskReports slowDisks,
BlocksStorageMovementResult[] blkMovementStatus) throws IOException {
checkNNStartup();
verifyRequest(nodeReg);
return namesystem.handleHeartbeat(nodeReg, report,
dnCacheCapacity, dnCacheUsed, xceiverCount, xmitsInProgress,
failedVolumes, volumeFailureSummary, requestFullBlockReportLease,
slowPeers, slowDisks);
slowPeers, slowDisks,
blkMovementStatus);
}
@Override // DatanodeProtocol

View File

@ -39,11 +39,14 @@
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.util.Daemon;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
/**
* Setting storagePolicy on a file after the file write will only update the new
* storage policy type in Namespace, but physical block storage movement will
@ -394,4 +397,24 @@ private List<DatanodeDescriptor> getNodesWithStorages(StorageType type) {
return typeNodeMap.get(type);
}
}
// TODO: Temporarily keeping the results for assertion. This has to be
// revisited as part of HDFS-11029.
@VisibleForTesting
List<BlocksStorageMovementResult> results = new ArrayList<>();
/**
* Receives the movement results of collection of blocks associated to a
* trackId.
*
* @param blksMovementResults
* movement status of the set of blocks associated to a trackId.
*/
void handleBlocksStorageMovementResults(
BlocksStorageMovementResult[] blksMovementResults) {
if (blksMovementResults.length <= 0) {
return;
}
results.addAll(Arrays.asList(blksMovementResults));
}
}

View File

@ -0,0 +1,64 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.protocol;
/**
* This class represents, movement status of a set of blocks associated to a
* track Id.
*/
public class BlocksStorageMovementResult {
private final long trackId;
private final Status status;
/**
* SUCCESS - If all the blocks associated to track id has moved successfully
* or maximum possible movements done.
*
* <p>
* FAILURE - If any of its(trackId) blocks movement failed and requires to
* retry these failed blocks movements. Example selected target node is no
* more running or no space. So, retrying by selecting new target node might
* work.
*/
public static enum Status {
SUCCESS, FAILURE;
}
/**
* BlocksStorageMovementResult constructor.
*
* @param trackId
* tracking identifier
* @param status
* block movement status
*/
public BlocksStorageMovementResult(long trackId, Status status) {
this.trackId = trackId;
this.status = status;
}
public long getTrackId() {
return trackId;
}
public Status getStatus() {
return status;
}
}

View File

@ -111,6 +111,8 @@ public DatanodeRegistration registerDatanode(DatanodeRegistration registration
* @param slowPeers Details of peer DataNodes that were detected as being
* slow to respond to packet writes. Empty report if no
* slow peers were detected by the DataNode.
* @param blksMovementResults array of movement status of a set of blocks
* associated to a trackId.
* @throws IOException on error
*/
@Idempotent
@ -124,7 +126,8 @@ public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration,
VolumeFailureSummary volumeFailureSummary,
boolean requestFullBlockReportLease,
@Nonnull SlowPeerReports slowPeers,
@Nonnull SlowDiskReports slowDisks)
@Nonnull SlowDiskReports slowDisks,
BlocksStorageMovementResult[] blksMovementResults)
throws IOException;
/**

View File

@ -176,6 +176,18 @@ message BlockStorageMovementProto {
required StorageTypesProto targetStorageTypes = 5;
}
/**
* Movement status of the set of blocks associated to a trackId.
*/
message BlocksStorageMovementResultProto {
enum Status {
SUCCESS = 1; // block movement succeeded
FAILURE = 2; // block movement failed and needs to retry
}
required uint64 trackID = 1;
required Status status = 2;
}
/**
* registration - Information of the datanode registering with the namenode
*/
@ -219,6 +231,7 @@ message VolumeFailureSummaryProto {
* volumeFailureSummary - info about volume failures
* slowPeers - info about peer DataNodes that are suspected to be slow.
* slowDisks - info about DataNode disks that are suspected to be slow.
* blksMovementResults - status of the scheduled blocks movements
*/
message HeartbeatRequestProto {
required DatanodeRegistrationProto registration = 1; // Datanode info
@ -232,6 +245,7 @@ message HeartbeatRequestProto {
optional bool requestFullBlockReportLease = 9 [ default = false ];
repeated SlowPeerReportProto slowPeers = 10;
repeated SlowDiskReportProto slowDisks = 11;
repeated BlocksStorageMovementResultProto blksMovementResults = 12;
}
/**

View File

@ -39,6 +39,7 @@
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
@ -116,7 +117,7 @@ private static void runTest(final String testCaseName,
cluster.stopDataNode(0);
cluster.getNameNodeRpc().sendHeartbeat(dnReg, prunedReports, 0L, 0L, 0, 0,
0, null, true, SlowPeerReports.EMPTY_REPORT,
SlowDiskReports.EMPTY_REPORT);
SlowDiskReports.EMPTY_REPORT, new BlocksStorageMovementResult[0]);
// Check that the missing storage was pruned.
assertThat(dnDescriptor.getStorageInfos().length, is(expectedStoragesAfterTest));

View File

@ -37,6 +37,7 @@
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
@ -167,7 +168,8 @@ public DatanodeRegistration answer(InvocationOnMock invocation)
Mockito.anyInt(), Mockito.any(VolumeFailureSummary.class),
Mockito.anyBoolean(),
Mockito.any(SlowPeerReports.class),
Mockito.any(SlowDiskReports.class))).thenReturn(
Mockito.any(SlowDiskReports.class),
Mockito.any(BlocksStorageMovementResult[].class))).thenReturn(
new HeartbeatResponse(new DatanodeCommand[0], new NNHAStatusHeartbeat(
HAServiceState.ACTIVE, 1), null, ThreadLocalRandom.current()
.nextLong() | 1L));

View File

@ -49,6 +49,7 @@
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@ -123,6 +124,8 @@ public void setupMocks() throws Exception {
Mockito.doReturn(new DNConf(mockDn)).when(mockDn).getDnConf();
Mockito.doReturn(DataNodeMetrics.create(conf, "fake dn"))
.when(mockDn).getMetrics();
Mockito.doReturn(new StoragePolicySatisfyWorker(conf, mockDn)).when(mockDn)
.getStoragePolicySatisfyWorker();
// Set up a simulated dataset with our fake BP
mockFSDataset = Mockito.spy(new SimulatedFSDataset(null, conf));
@ -157,7 +160,8 @@ private DatanodeProtocolClientSideTranslatorPB setupNNMock(int nnIdx)
Mockito.any(VolumeFailureSummary.class),
Mockito.anyBoolean(),
Mockito.any(SlowPeerReports.class),
Mockito.any(SlowDiskReports.class));
Mockito.any(SlowDiskReports.class),
Mockito.any(BlocksStorageMovementResult[].class));
mockHaStatuses[nnIdx] = new NNHAStatusHeartbeat(HAServiceState.STANDBY, 0);
datanodeCommands[nnIdx] = new DatanodeCommand[0];
return mock;
@ -376,6 +380,8 @@ public void testBPInitErrorHandling() throws Exception {
Mockito.doReturn(new DNConf(mockDn)).when(mockDn).getDnConf();
Mockito.doReturn(DataNodeMetrics.create(conf, "fake dn")).
when(mockDn).getMetrics();
Mockito.doReturn(new StoragePolicySatisfyWorker(conf, mockDn)).when(mockDn)
.getStoragePolicySatisfyWorker();
final AtomicInteger count = new AtomicInteger();
Mockito.doAnswer(new Answer<Void>() {
@Override

View File

@ -93,6 +93,7 @@
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringStripedBlock;
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@ -232,7 +233,8 @@ public DatanodeRegistration answer(InvocationOnMock invocation)
Mockito.any(VolumeFailureSummary.class),
Mockito.anyBoolean(),
Mockito.any(SlowPeerReports.class),
Mockito.any(SlowDiskReports.class)))
Mockito.any(SlowDiskReports.class),
Mockito.any(BlocksStorageMovementResult[].class)))
.thenReturn(new HeartbeatResponse(
new DatanodeCommand[0],
new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1),

View File

@ -50,6 +50,7 @@
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
@ -172,7 +173,8 @@ public void testSendLifelineIfHeartbeatBlocked() throws Exception {
any(VolumeFailureSummary.class),
anyBoolean(),
any(SlowPeerReports.class),
any(SlowDiskReports.class));
any(SlowDiskReports.class),
any(BlocksStorageMovementResult[].class));
// Intercept lifeline to trigger latch count-down on each call.
doAnswer(new LatchCountingAnswer<Void>(lifelinesSent))
@ -237,7 +239,8 @@ public void testNoLifelineSentIfHeartbeatsOnTime() throws Exception {
any(VolumeFailureSummary.class),
anyBoolean(),
any(SlowPeerReports.class),
any(SlowDiskReports.class));
any(SlowDiskReports.class),
any(BlocksStorageMovementResult[].class));
// While waiting on the latch for the expected number of heartbeat messages,
// poll DataNode tracking information. We expect that the DataNode always

View File

@ -44,6 +44,7 @@
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
@ -222,7 +223,8 @@ public HeartbeatResponse answer(InvocationOnMock invocation)
Mockito.any(VolumeFailureSummary.class),
Mockito.anyBoolean(),
Mockito.any(SlowPeerReports.class),
Mockito.any(SlowDiskReports.class));
Mockito.any(SlowDiskReports.class),
Mockito.any(BlocksStorageMovementResult[].class));
dn = new DataNode(conf, locations, null, null) {
@Override

View File

@ -66,6 +66,7 @@
import org.apache.hadoop.hdfs.server.namenode.FSImage;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand;
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@ -208,7 +209,8 @@ private static void setHeartbeatResponse(DatanodeCommand[] cmds)
(StorageReport[]) any(), anyLong(), anyLong(),
anyInt(), anyInt(), anyInt(), (VolumeFailureSummary) any(),
anyBoolean(), any(SlowPeerReports.class),
any(SlowDiskReports.class));
any(SlowDiskReports.class),
(BlocksStorageMovementResult[]) any());
} finally {
lock.writeLock().unlock();
}

View File

@ -34,10 +34,9 @@
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlockMovementResult;
import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlockMovementStatus;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After;
@ -191,12 +190,12 @@ private void waitForBlockMovementCompletion(
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
List<BlockMovementResult> completedBlocks = worker
.getBlocksMovementsCompletionHandler().getCompletedBlocks();
List<BlocksStorageMovementResult> completedBlocks = worker
.getBlocksMovementsCompletionHandler().getBlksMovementResults();
int failedCount = 0;
for (BlockMovementResult blockMovementResult : completedBlocks) {
if (BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_FAILURE ==
blockMovementResult.getStatus()) {
for (BlocksStorageMovementResult blkMovementResult : completedBlocks) {
if (blkMovementResult.getStatus() ==
BlocksStorageMovementResult.Status.FAILURE) {
failedCount++;
}
}

View File

@ -29,6 +29,7 @@
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
@ -110,7 +111,8 @@ public void testStorageReportHasStorageTypeAndState() throws IOException {
anyLong(), anyLong(), anyInt(), anyInt(), anyInt(),
Mockito.any(VolumeFailureSummary.class), Mockito.anyBoolean(),
Mockito.any(SlowPeerReports.class),
Mockito.any(SlowDiskReports.class));
Mockito.any(SlowDiskReports.class),
Mockito.any(BlocksStorageMovementResult[].class));
StorageReport[] reports = captor.getValue();

View File

@ -56,6 +56,7 @@
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@ -956,8 +957,8 @@ void sendHeartbeat() throws IOException {
DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED, 0L) };
DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration, rep,
0L, 0L, 0, 0, 0, null, true,
SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT)
.getCommands();
SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT,
new BlocksStorageMovementResult[0]).getCommands();
if(cmds != null) {
for (DatanodeCommand cmd : cmds ) {
if(LOG.isDebugEnabled()) {
@ -1007,8 +1008,8 @@ int replicateBlocks() throws IOException {
false, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED, 0) };
DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration,
rep, 0L, 0L, 0, 0, 0, null, true,
SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT)
.getCommands();
SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT,
new BlocksStorageMovementResult[0]).getCommands();
if (cmds != null) {
for (DatanodeCommand cmd : cmds) {
if (cmd.getAction() == DatanodeProtocol.DNA_TRANSFER) {

View File

@ -40,6 +40,7 @@
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.MkdirOp;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
import org.apache.hadoop.hdfs.server.namenode.ha.EditLogTailer;
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
@ -130,7 +131,8 @@ public static HeartbeatResponse sendHeartBeat(DatanodeRegistration nodeReg,
return namesystem.handleHeartbeat(nodeReg,
BlockManagerTestUtil.getStorageReportsForDatanode(dd),
dd.getCacheCapacity(), dd.getCacheRemaining(), 0, 0, 0, null, true,
SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT);
SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT,
new BlocksStorageMovementResult[0]);
}
public static boolean setReplication(final FSNamesystem ns,

View File

@ -44,6 +44,7 @@
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@ -139,8 +140,8 @@ public void testDeadDatanode() throws Exception {
false, 0, 0, 0, 0, 0) };
DatanodeCommand[] cmd =
dnp.sendHeartbeat(reg, rep, 0L, 0L, 0, 0, 0, null, true,
SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT)
.getCommands();
SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT,
new BlocksStorageMovementResult[0]).getCommands();
assertEquals(1, cmd.length);
assertEquals(cmd[0].getAction(), RegisterCommand.REGISTER
.getAction());

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.namenode;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
@ -27,6 +28,7 @@
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Before;
import org.junit.Test;
@ -146,6 +148,54 @@ public void testWhenStoragePolicySetToONESSD()
}
}
/**
* Tests to verify that the block storage movement results will be propagated
* to Namenode via datanode heartbeat.
*/
@Test(timeout = 300000)
public void testPerTrackIdBlocksStorageMovementResults() throws Exception {
try {
// Change policy to ONE_SSD
distributedFS.setStoragePolicy(new Path(file), "ONE_SSD");
FSNamesystem namesystem = hdfsCluster.getNamesystem();
INode inode = namesystem.getFSDirectory().getINode(file);
StorageType[][] newtypes =
new StorageType[][]{{StorageType.SSD, StorageType.DISK}};
// Making sure SDD based nodes added to cluster. Adding SSD based
// datanodes.
startAdditionalDNs(config, 1, numOfDatanodes, newtypes,
storagesPerDatanode, capacity, hdfsCluster);
namesystem.getBlockManager().satisfyStoragePolicy(inode.getId());
hdfsCluster.triggerHeartbeats();
// Wait till the block is moved to SSD areas
waitExpectedStorageType(file, StorageType.SSD, distributedFS, 1, 30000);
waitExpectedStorageType(file, StorageType.DISK, distributedFS, 2, 30000);
// TODO: Temporarily using the results from StoragePolicySatisfier class.
// This has to be revisited as part of HDFS-11029.
waitForBlocksMovementResult(1, 30000);
} finally {
hdfsCluster.shutdown();
}
}
private void waitForBlocksMovementResult(int expectedResultsCount,
int timeout) throws TimeoutException, InterruptedException {
BlockManager blockManager = hdfsCluster.getNamesystem().getBlockManager();
final StoragePolicySatisfier sps = blockManager.getStoragePolicySatisfier();
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
LOG.info("expectedResultsCount={} actualResultsCount={}",
expectedResultsCount, sps.results.size());
return expectedResultsCount == sps.results.size();
}
}, 100, timeout);
}
private void writeContent(final DistributedFileSystem dfs,
final String fileName) throws IOException {
// write to DISK