diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java new file mode 100644 index 0000000000..580d0d6178 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java @@ -0,0 +1,221 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import static org.apache.hadoop.util.Time.monotonicNow; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult; +import org.apache.hadoop.util.Daemon; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; + +/** + * A monitor class for checking whether block storage movements finished or not. + * If block storage movement results from datanode indicates about the movement + * success, then it will just remove the entries from tracking. If it reports + * failure, then it will add back to needed block storage movements list. If no + * DN reports about movement for longer time, then such items will be retries + * automatically after timeout. The default timeout would be 30mins. + */ +public class BlockStorageMovementAttemptedItems { + public static final Logger LOG = + LoggerFactory.getLogger(BlockStorageMovementAttemptedItems.class); + // A map holds the items which are already taken for blocks movements + // processing and sent to DNs. + private final Map storageMovementAttemptedItems; + private final List storageMovementAttemptedResults; + private volatile boolean spsRunning = true; + private Daemon timerThread = null; + // + // It might take anywhere between 30 to 60 minutes before + // a request is timed out. + // + private long selfRetryTimeout = 30 * 60 * 1000; + + // + // It might take anywhere between 5 to 10 minutes before + // a request is timed out. + // + private long checkTimeout = 5 * 60 * 1000; // minimum value + private BlockStorageMovementNeeded blockStorageMovementNeeded; + + public BlockStorageMovementAttemptedItems(long timeoutPeriod, + long selfRetryTimeout, + BlockStorageMovementNeeded unsatisfiedStorageMovementFiles) { + if (timeoutPeriod > 0) { + this.checkTimeout = Math.min(checkTimeout, timeoutPeriod); + } + + this.selfRetryTimeout = selfRetryTimeout; + this.blockStorageMovementNeeded = unsatisfiedStorageMovementFiles; + storageMovementAttemptedItems = new HashMap<>(); + storageMovementAttemptedResults = new ArrayList<>(); + } + + /** + * Add item to block storage movement attempted items map which holds the + * tracking/blockCollection id versus time stamp. + * + * @param blockCollectionID + * - tracking id / block collection id + */ + public void add(Long blockCollectionID) { + synchronized (storageMovementAttemptedItems) { + storageMovementAttemptedItems.put(blockCollectionID, monotonicNow()); + } + } + + /** + * Add the trackIDBlocksStorageMovementResults to + * storageMovementAttemptedResults. + * + * @param blksMovementResults + */ + public void addResults(BlocksStorageMovementResult[] blksMovementResults) { + if (blksMovementResults.length == 0) { + return; + } + synchronized (storageMovementAttemptedResults) { + storageMovementAttemptedResults + .addAll(Arrays.asList(blksMovementResults)); + } + } + + /** + * Starts the monitor thread. + */ + void start() { + timerThread = new Daemon(new BlocksStorageMovementAttemptResultMonitor()); + timerThread.setName("BlocksStorageMovementAttemptResultMonitor"); + timerThread.start(); + } + + /** + * Stops the monitor thread. + */ + public void stop() { + spsRunning = false; + } + + /** + * A monitor class for checking block storage movement result and long waiting + * items periodically. + */ + private class BlocksStorageMovementAttemptResultMonitor implements Runnable { + @Override + public void run() { + while (spsRunning) { + try { + blockStorageMovementResultCheck(); + blocksStorageMovementUnReportedItemsCheck(); + Thread.sleep(checkTimeout); + } catch (InterruptedException ie) { + LOG.debug("BlocksStorageMovementAttemptResultMonitor thread " + + "is interrupted.", ie); + } + } + } + + private void blocksStorageMovementUnReportedItemsCheck() { + synchronized (storageMovementAttemptedItems) { + Iterator> iter = + storageMovementAttemptedItems.entrySet().iterator(); + long now = monotonicNow(); + while (iter.hasNext()) { + Entry entry = iter.next(); + if (now > entry.getValue() + selfRetryTimeout) { + Long blockCollectionID = entry.getKey(); + synchronized (storageMovementAttemptedResults) { + boolean exist = isExistInResult(blockCollectionID); + if (!exist) { + blockStorageMovementNeeded.add(blockCollectionID); + } else { + LOG.info("Blocks storage movement results for the" + + " tracking id : " + blockCollectionID + + " is reported from one of the co-ordinating datanode." + + " So, the result will be processed soon."); + } + iter.remove(); + } + } + } + + } + } + + private boolean isExistInResult(Long blockCollectionID) { + Iterator iter = + storageMovementAttemptedResults.iterator(); + while (iter.hasNext()) { + BlocksStorageMovementResult storageMovementAttemptedResult = + iter.next(); + if (storageMovementAttemptedResult.getTrackId() == blockCollectionID) { + return true; + } + } + return false; + } + + private void blockStorageMovementResultCheck() { + synchronized (storageMovementAttemptedResults) { + Iterator iter = + storageMovementAttemptedResults.iterator(); + while (iter.hasNext()) { + BlocksStorageMovementResult storageMovementAttemptedResult = + iter.next(); + if (storageMovementAttemptedResult + .getStatus() == BlocksStorageMovementResult.Status.FAILURE) { + blockStorageMovementNeeded + .add(storageMovementAttemptedResult.getTrackId()); + LOG.warn("Blocks storage movement results for the tracking id : " + + storageMovementAttemptedResult.getTrackId() + + " is reported from co-ordinating datanode, but result" + + " status is FAILURE. So, added for retry"); + } else { + synchronized (storageMovementAttemptedItems) { + storageMovementAttemptedItems + .remove(storageMovementAttemptedResult.getTrackId()); + } + LOG.info("Blocks storage movement results for the tracking id : " + + storageMovementAttemptedResult.getTrackId() + + " is reported from co-ordinating datanode. " + + "The result status is SUCCESS."); + } + iter.remove(); // remove from results as processed above + } + } + + } + } + + @VisibleForTesting + public int resultsCount() { + return storageMovementAttemptedResults.size(); + } + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java index fbe686a796..6fa9302229 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java @@ -69,6 +69,7 @@ public class StoragePolicySatisfier implements Runnable { private final Namesystem namesystem; private final BlockManager blockManager; private final BlockStorageMovementNeeded storageMovementNeeded; + private final BlockStorageMovementAttemptedItems storageMovementsMonitor; public StoragePolicySatisfier(final Namesystem namesystem, final BlockStorageMovementNeeded storageMovementNeeded, @@ -76,15 +77,22 @@ public StoragePolicySatisfier(final Namesystem namesystem, this.namesystem = namesystem; this.storageMovementNeeded = storageMovementNeeded; this.blockManager = blkManager; + // TODO: below selfRetryTimeout and checkTimeout can be configurable later + // Now, the default values of selfRetryTimeout and checkTimeout are 30mins + // and 5mins respectively + this.storageMovementsMonitor = new BlockStorageMovementAttemptedItems( + 5 * 60 * 1000, 30 * 60 * 1000, storageMovementNeeded); } /** - * Start storage policy satisfier demon thread. + * Start storage policy satisfier demon thread. Also start block storage + * movements monitor for retry the attempts if needed. */ public void start() { storagePolicySatisfierThread = new Daemon(this); storagePolicySatisfierThread.setName("StoragePolicySatisfier"); storagePolicySatisfierThread.start(); + this.storageMovementsMonitor.start(); } /** @@ -99,6 +107,7 @@ public void stop() { storagePolicySatisfierThread.join(3000); } catch (InterruptedException ie) { } + this.storageMovementsMonitor.stop(); } @Override @@ -108,6 +117,7 @@ public void run() { Long blockCollectionID = storageMovementNeeded.get(); if (blockCollectionID != null) { computeAndAssignStorageMismatchedBlocksToDNs(blockCollectionID); + this.storageMovementsMonitor.add(blockCollectionID); } // TODO: We can think to make this as configurable later, how frequently // we want to check block movements. @@ -398,11 +408,6 @@ private List getNodesWithStorages(StorageType type) { } } - // TODO: Temporarily keeping the results for assertion. This has to be - // revisited as part of HDFS-11029. - @VisibleForTesting - List results = new ArrayList<>(); - /** * Receives the movement results of collection of blocks associated to a * trackId. @@ -415,6 +420,11 @@ void handleBlocksStorageMovementResults( if (blksMovementResults.length <= 0) { return; } - results.addAll(Arrays.asList(blksMovementResults)); + storageMovementsMonitor.addResults(blksMovementResults); + } + + @VisibleForTesting + BlockStorageMovementAttemptedItems getAttemptedItemsMonitor() { + return storageMovementsMonitor; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java new file mode 100644 index 0000000000..8c70d99b94 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import static org.apache.hadoop.util.Time.monotonicNow; +import static org.junit.Assert.*; + +import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * Tests that block storage movement attempt failures are reported from DN and + * processed them correctly or not. + */ +public class TestBlockStorageMovementAttemptedItems { + + private BlockStorageMovementAttemptedItems bsmAttemptedItems = null; + private BlockStorageMovementNeeded unsatisfiedStorageMovementFiles = null; + + @Before + public void setup() { + unsatisfiedStorageMovementFiles = new BlockStorageMovementNeeded(); + bsmAttemptedItems = new BlockStorageMovementAttemptedItems(100, 500, + unsatisfiedStorageMovementFiles); + bsmAttemptedItems.start(); + } + + @After + public void teardown() { + if (bsmAttemptedItems != null) { + bsmAttemptedItems.stop(); + } + } + + private boolean checkItemMovedForRetry(Long item, long retryTimeout) + throws InterruptedException { + long stopTime = monotonicNow() + (retryTimeout * 2); + boolean isItemFound = false; + while (monotonicNow() < (stopTime)) { + Long ele = null; + while ((ele = unsatisfiedStorageMovementFiles.get()) != null) { + if (item.longValue() == ele.longValue()) { + isItemFound = true; + break; + } + } + if (!isItemFound) { + Thread.sleep(100); + } else { + break; + } + } + return isItemFound; + } + + @Test(timeout = 30000) + public void testAddResultWithFailureResult() throws Exception { + Long item = new Long(1234); + bsmAttemptedItems.add(item); + bsmAttemptedItems.addResults( + new BlocksStorageMovementResult[]{new BlocksStorageMovementResult( + item.longValue(), BlocksStorageMovementResult.Status.FAILURE)}); + assertTrue(checkItemMovedForRetry(item, 200)); + } + + @Test(timeout = 30000) + public void testAddResultWithSucessResult() throws Exception { + Long item = new Long(1234); + bsmAttemptedItems.add(item); + bsmAttemptedItems.addResults( + new BlocksStorageMovementResult[]{new BlocksStorageMovementResult( + item.longValue(), BlocksStorageMovementResult.Status.SUCCESS)}); + assertFalse(checkItemMovedForRetry(item, 200)); + } + + @Test(timeout = 30000) + public void testNoResultAdded() throws Exception { + Long item = new Long(1234); + bsmAttemptedItems.add(item); + // After selfretry timeout, it should be added back for retry + assertTrue(checkItemMovedForRetry(item, 600)); + } + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java index cbfdfc6df8..6f5c71770e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java @@ -174,8 +174,6 @@ public void testPerTrackIdBlocksStorageMovementResults() throws Exception { waitExpectedStorageType(file, StorageType.SSD, distributedFS, 1, 30000); waitExpectedStorageType(file, StorageType.DISK, distributedFS, 2, 30000); - // TODO: Temporarily using the results from StoragePolicySatisfier class. - // This has to be revisited as part of HDFS-11029. waitForBlocksMovementResult(1, 30000); } finally { hdfsCluster.shutdown(); @@ -190,8 +188,10 @@ private void waitForBlocksMovementResult(int expectedResultsCount, @Override public Boolean get() { LOG.info("expectedResultsCount={} actualResultsCount={}", - expectedResultsCount, sps.results.size()); - return expectedResultsCount == sps.results.size(); + expectedResultsCount, + sps.getAttemptedItemsMonitor().resultsCount()); + return expectedResultsCount == sps.getAttemptedItemsMonitor() + .resultsCount(); } }, 100, timeout); }