HDFS-15842. HDFS mover to emit metrics. (#2738)

This commit is contained in:
LeonGao 2021-06-19 15:39:46 -07:00 committed by GitHub
parent 51991c4907
commit 643dfd60e4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 196 additions and 2 deletions

View File

@ -398,6 +398,7 @@ private void dispatch() {
LOG.info("Successfully moved " + this); LOG.info("Successfully moved " + this);
} catch (IOException e) { } catch (IOException e) {
LOG.warn("Failed to move " + this, e); LOG.warn("Failed to move " + this, e);
nnc.getBlocksFailed().incrementAndGet();
target.getDDatanode().setHasFailure(); target.getDDatanode().setHasFailure();
// Check that the failure is due to block pinning errors. // Check that the failure is due to block pinning errors.
if (e instanceof BlockPinningException) { if (e instanceof BlockPinningException) {

View File

@ -163,6 +163,7 @@ public static void checkOtherInstanceRunning(boolean toCheck) {
private final List<Path> targetPaths; private final List<Path> targetPaths;
private final AtomicLong bytesMoved = new AtomicLong(); private final AtomicLong bytesMoved = new AtomicLong();
private final AtomicLong blocksMoved = new AtomicLong(); private final AtomicLong blocksMoved = new AtomicLong();
private final AtomicLong blocksFailed = new AtomicLong();
private final int maxNotChangedIterations; private final int maxNotChangedIterations;
private int notChangedIterations = 0; private int notChangedIterations = 0;
@ -230,14 +231,18 @@ public String getBlockpoolID() {
return blockpoolID; return blockpoolID;
} }
AtomicLong getBytesMoved() { public AtomicLong getBytesMoved() {
return bytesMoved; return bytesMoved;
} }
AtomicLong getBlocksMoved() { public AtomicLong getBlocksMoved() {
return blocksMoved; return blocksMoved;
} }
public AtomicLong getBlocksFailed() {
return blocksFailed;
}
public void addBytesMoved(long numBytes) { public void addBytesMoved(long numBytes) {
bytesMoved.addAndGet(numBytes); bytesMoved.addAndGet(numBytes);
blocksMoved.incrementAndGet(); blocksMoved.incrementAndGet();

View File

@ -42,6 +42,8 @@
import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.source.JvmMetrics;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.SecurityUtil;
@ -118,6 +120,8 @@ private List<StorageGroup> getTargetStorages(StorageType t) {
private final int retryMaxAttempts; private final int retryMaxAttempts;
private final AtomicInteger retryCount; private final AtomicInteger retryCount;
private final Map<Long, Set<DatanodeInfo>> excludedPinnedBlocks; private final Map<Long, Set<DatanodeInfo>> excludedPinnedBlocks;
private final MoverMetrics metrics;
private final NameNodeConnector nnc;
private final BlockStoragePolicy[] blockStoragePolicies; private final BlockStoragePolicy[] blockStoragePolicies;
@ -155,6 +159,8 @@ Collections.<String> emptySet(), movedWinWidth, moverThreads, 0,
this.blockStoragePolicies = new BlockStoragePolicy[1 << this.blockStoragePolicies = new BlockStoragePolicy[1 <<
BlockStoragePolicySuite.ID_BIT_LENGTH]; BlockStoragePolicySuite.ID_BIT_LENGTH];
this.excludedPinnedBlocks = excludedPinnedBlocks; this.excludedPinnedBlocks = excludedPinnedBlocks;
this.nnc = nnc;
this.metrics = MoverMetrics.create(this);
} }
void init() throws IOException { void init() throws IOException {
@ -196,6 +202,10 @@ private ExitStatus run() {
} }
} }
public NameNodeConnector getNnc() {
return nnc;
}
DBlock newDBlock(LocatedBlock lb, List<MLocation> locations, DBlock newDBlock(LocatedBlock lb, List<MLocation> locations,
ErasureCodingPolicy ecPolicy) { ErasureCodingPolicy ecPolicy) {
Block blk = lb.getBlock().getLocalBlock(); Block blk = lb.getBlock().getLocalBlock();
@ -296,6 +306,7 @@ private boolean isSnapshotPathInCurrent(String path) throws IOException {
* round * round
*/ */
private Result processNamespace() throws IOException { private Result processNamespace() throws IOException {
metrics.setProcessingNamespace(true);
getSnapshottableDirs(); getSnapshottableDirs();
Result result = new Result(); Result result = new Result();
for (Path target : targetPaths) { for (Path target : targetPaths) {
@ -322,6 +333,7 @@ private Result processNamespace() throws IOException {
retryCount.set(0); retryCount.set(0);
} }
result.updateHasRemaining(hasFailed); result.updateHasRemaining(hasFailed);
metrics.setProcessingNamespace(false);
return result; return result;
} }
@ -374,6 +386,7 @@ private void processRecursively(String parent, HdfsFileStatus status,
// the full path is a snapshot path but it is also included in the // the full path is a snapshot path but it is also included in the
// current directory tree, thus ignore it. // current directory tree, thus ignore it.
processFile(fullPath, (HdfsLocatedFileStatus) status, result); processFile(fullPath, (HdfsLocatedFileStatus) status, result);
metrics.incrFilesProcessed();
} }
} catch (IOException e) { } catch (IOException e) {
LOG.warn("Failed to check the status of " + parent LOG.warn("Failed to check the status of " + parent
@ -521,6 +534,7 @@ boolean chooseTargetInSameNode(DBlock db, Source source,
final PendingMove pm = source.addPendingMove(db, target); final PendingMove pm = source.addPendingMove(db, target);
if (pm != null) { if (pm != null) {
dispatcher.executePendingMove(pm); dispatcher.executePendingMove(pm);
metrics.incrBlocksScheduled();
return true; return true;
} }
} }
@ -539,6 +553,7 @@ boolean chooseTarget(DBlock db, Source source,
final PendingMove pm = source.addPendingMove(db, target); final PendingMove pm = source.addPendingMove(db, target);
if (pm != null) { if (pm != null) {
dispatcher.executePendingMove(pm); dispatcher.executePendingMove(pm);
metrics.incrBlocksScheduled();
return true; return true;
} }
} }
@ -650,6 +665,11 @@ static int run(Map<URI, List<Path>> namenodes, Configuration conf)
Map<Long, Set<DatanodeInfo>> excludedPinnedBlocks = new HashMap<>(); Map<Long, Set<DatanodeInfo>> excludedPinnedBlocks = new HashMap<>();
LOG.info("namenodes = " + namenodes); LOG.info("namenodes = " + namenodes);
DefaultMetricsSystem.initialize("Mover");
JvmMetrics.create("Mover",
conf.get(DFSConfigKeys.DFS_METRICS_SESSION_ID_KEY),
DefaultMetricsSystem.instance());
checkKeytabAndInit(conf); checkKeytabAndInit(conf);
List<NameNodeConnector> connectors = Collections.emptyList(); List<NameNodeConnector> connectors = Collections.emptyList();
try { try {
@ -818,6 +838,7 @@ public int run(String[] args) throws Exception {
System.out.println(e + ". Exiting ..."); System.out.println(e + ". Exiting ...");
return ExitStatus.ILLEGAL_ARGUMENTS.getExitCode(); return ExitStatus.ILLEGAL_ARGUMENTS.getExitCode();
} finally { } finally {
DefaultMetricsSystem.shutdown();
System.out.format("%-24s ", DateFormat.getDateTimeInstance().format(new Date())); System.out.format("%-24s ", DateFormat.getDateTimeInstance().format(new Date()));
System.out.println("Mover took " + StringUtils.formatTime(Time.monotonicNow()-startTime)); System.out.println("Mover took " + StringUtils.formatTime(Time.monotonicNow()-startTime));
} }

View File

@ -0,0 +1,83 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.mover;
import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
/**
* Metrics for HDFS Mover of a blockpool.
*/
@Metrics(about="Mover metrics", context="dfs")
final class MoverMetrics {
private final Mover mover;
@Metric("If mover is processing namespace.")
private MutableGaugeInt processingNamespace;
@Metric("Number of blocks being scheduled.")
private MutableCounterLong blocksScheduled;
@Metric("Number of files being processed.")
private MutableCounterLong filesProcessed;
private MoverMetrics(Mover m) {
this.mover = m;
}
public static MoverMetrics create(Mover mover) {
MoverMetrics m = new MoverMetrics(mover);
return DefaultMetricsSystem.instance().register(
m.getName(), null, m);
}
String getName() {
return "Mover-" + mover.getNnc().getBlockpoolID();
}
@Metric("Bytes that already moved by mover.")
public long getBytesMoved() {
return mover.getNnc().getBytesMoved().get();
}
@Metric("Number of blocks that successfully moved by mover.")
public long getBlocksMoved() {
return mover.getNnc().getBlocksMoved().get();
}
@Metric("Number of blocks that failed moved by mover.")
public long getBlocksFailed() {
return mover.getNnc().getBlocksFailed().get();
}
void setProcessingNamespace(boolean processingNamespace) {
this.processingNamespace.set(processingNamespace ? 1 : 0);
}
void incrBlocksScheduled() {
this.blocksScheduled.incr();
}
void incrFilesProcessed() {
this.filesProcessed.incr();
}
}

View File

@ -0,0 +1,27 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Mover is a data migration tool for tiered storage.
* It scans provided paths in HDFS to check
* if the block placement satisfies the storage policy.
* For the blocks violating the storage policy,
* it moves the replicas to a different storage type
* in order to fulfill the storage policy requirement.
*/
package org.apache.hadoop.hdfs.server.mover;

View File

@ -36,6 +36,8 @@
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
import static org.apache.hadoop.test.MetricsAsserts.assertCounter; import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics; import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
@ -86,12 +88,15 @@
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.http.HttpConfig; import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.MetricsSource;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.minikdc.MiniKdc; import org.apache.hadoop.minikdc.MiniKdc;
import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.util.KerberosName; import org.apache.hadoop.security.authentication.util.KerberosName;
import org.apache.hadoop.security.ssl.KeyStoreTestUtil; import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.MetricsAsserts;
import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.ToolRunner;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -1235,6 +1240,58 @@ public void testMoverWhenStoragePolicyUnset() throws Exception {
} }
} }
@Test(timeout=100000)
public void testMoverMetrics() throws Exception {
long blockSize = 10*1024*1024;
final Configuration conf = new HdfsConfiguration();
initConf(conf);
conf.setInt(DFSConfigKeys.DFS_MOVER_MOVERTHREADS_KEY, 1);
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
conf.setLong(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, blockSize);
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(2)
.storageTypes(
new StorageType[][] {{StorageType.DISK, StorageType.DISK},
{StorageType.ARCHIVE, StorageType.ARCHIVE}})
.build();
cluster.waitActive();
final DistributedFileSystem fs = cluster.getFileSystem();
final String file = "/testMaxIterationTime.dat";
final Path path = new Path(file);
short repFactor = 1;
int seed = 0xFAFAFA;
// write to DISK
DFSTestUtil.createFile(fs, path, 4L * blockSize, repFactor, seed);
// move to ARCHIVE
fs.setStoragePolicy(new Path(file), "COLD");
Map<URI, List<Path>> nnWithPath = new HashMap<>();
List<Path> paths = new ArrayList<>();
paths.add(path);
nnWithPath
.put(DFSUtil.getInternalNsRpcUris(conf).iterator().next(), paths);
Mover.run(nnWithPath, conf);
final String moverMetricsName = "Mover-"
+ cluster.getNameNode(0).getNamesystem().getBlockPoolId();
MetricsSource moverMetrics =
DefaultMetricsSystem.instance().getSource(moverMetricsName);
assertNotNull(moverMetrics);
MetricsRecordBuilder rb = MetricsAsserts.getMetrics(moverMetricsName);
// Check metrics
assertEquals(4, MetricsAsserts.getLongCounter("BlocksScheduled", rb));
assertEquals(1, MetricsAsserts.getLongCounter("FilesProcessed", rb));
assertEquals(41943040, MetricsAsserts.getLongGauge("BytesMoved", rb));
assertEquals(4, MetricsAsserts.getLongGauge("BlocksMoved", rb));
assertEquals(0, MetricsAsserts.getLongGauge("BlocksFailed", rb));
}
private void createFileWithFavoredDatanodes(final Configuration conf, private void createFileWithFavoredDatanodes(final Configuration conf,
final MiniDFSCluster cluster, final DistributedFileSystem dfs) final MiniDFSCluster cluster, final DistributedFileSystem dfs)
throws IOException { throws IOException {