HDFS-12955: [SPS]: Move SPS classes to a separate package. Contributed by Rakesh R.

This commit is contained in:
Uma Maheswara Rao G 2017-12-22 09:10:12 -08:00 committed by Uma Maheswara Rao Gangumalla
parent c561cb316e
commit 78420719eb
10 changed files with 121 additions and 32 deletions

View File

@ -89,11 +89,12 @@
import org.apache.hadoop.hdfs.server.namenode.CachedBlock; import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo; import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
import org.apache.hadoop.hdfs.server.namenode.INodesInPath; import org.apache.hadoop.hdfs.server.namenode.INodesInPath;
import org.apache.hadoop.hdfs.server.namenode.IntraNNSPSContext;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.Namesystem; import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.server.namenode.StoragePolicySatisfier;
import org.apache.hadoop.hdfs.server.namenode.ha.HAContext; import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics; import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand; import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
@ -478,7 +479,8 @@ public BlockManager(final Namesystem namesystem, boolean haEnabled,
conf.getBoolean( conf.getBoolean(
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_DEFAULT); DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_DEFAULT);
sps = new StoragePolicySatisfier(namesystem, this, conf); StoragePolicySatisfier.Context spsctxt = new IntraNNSPSContext(namesystem);
sps = new StoragePolicySatisfier(namesystem, this, conf, spsctxt);
blockTokenSecretManager = createBlockTokenSecretManager(conf); blockTokenSecretManager = createBlockTokenSecretManager(conf);
providedStorageMap = new ProvidedStorageMap(namesystem, this, conf); providedStorageMap = new ProvidedStorageMap(namesystem, this, conf);

View File

@ -258,6 +258,7 @@
import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectorySnapshottableFeature; import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectorySnapshottableFeature;
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot; import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotManager; import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotManager;
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase; import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress; import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step; import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step;

View File

@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
/**
* This class is the Namenode implementation for analyzing the file blocks which
* are expecting to change its storages and assigning the block storage
* movements to satisfy the storage policy.
*/
// TODO: Now, added one API which is required for sps package. Will refine
// this interface via HDFS-12911.
public class IntraNNSPSContext implements StoragePolicySatisfier.Context {
private final Namesystem namesystem;
public IntraNNSPSContext(Namesystem namesystem) {
this.namesystem = namesystem;
}
@Override
public int getNumLiveDataNodes() {
return namesystem.getFSDirectory().getBlockManager().getDatanodeManager()
.getNumLiveDataNodes();
}
}

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hdfs.server.namenode; package org.apache.hadoop.hdfs.server.namenode.sps;
import static org.apache.hadoop.util.Time.monotonicNow; import static org.apache.hadoop.util.Time.monotonicNow;
@ -26,8 +26,8 @@
import java.util.List; import java.util.List;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.namenode.StoragePolicySatisfier.AttemptedItemInfo; import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.AttemptedItemInfo;
import org.apache.hadoop.hdfs.server.namenode.StoragePolicySatisfier.ItemInfo; import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.ItemInfo;
import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.Daemon;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hdfs.server.namenode; package org.apache.hadoop.hdfs.server.namenode.sps;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY; import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY;
@ -33,8 +33,12 @@
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfyPathStatus; import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfyPathStatus;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
import org.apache.hadoop.hdfs.server.namenode.FSTreeTraverser;
import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.ItemInfo;
import org.apache.hadoop.hdfs.server.namenode.FSTreeTraverser.TraverseInfo; import org.apache.hadoop.hdfs.server.namenode.FSTreeTraverser.TraverseInfo;
import org.apache.hadoop.hdfs.server.namenode.StoragePolicySatisfier.ItemInfo;
import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -367,7 +371,6 @@ protected void checkPauseForTesting() throws InterruptedException {
@Override @Override
protected boolean processFileInode(INode inode, TraverseInfo traverseInfo) protected boolean processFileInode(INode inode, TraverseInfo traverseInfo)
throws IOException, InterruptedException { throws IOException, InterruptedException {
assert getFSDirectory().hasReadLock();
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("Processing {} for statisy the policy", LOG.trace("Processing {} for statisy the policy",
inode.getFullPathName()); inode.getFullPathName());
@ -390,12 +393,9 @@ protected boolean canSubmitCurrentBatch() {
@Override @Override
protected void checkINodeReady(long startId) throws IOException { protected void checkINodeReady(long startId) throws IOException {
FSNamesystem fsn = ((FSNamesystem) namesystem); // SPS work won't be scheduled if NN is in standby. So, skipping NN
fsn.checkNameNodeSafeMode("NN is in safe mode," // standby check.
+ "cannot satisfy the policy."); return;
// SPS work should be cancelled when NN goes to standby. Just
// double checking for sanity.
fsn.checkOperation(NameNode.OperationCategory.WRITE);
} }
@Override @Override
@ -408,8 +408,6 @@ protected void submitCurrentBatch(long startId)
@Override @Override
protected void throttle() throws InterruptedException { protected void throttle() throws InterruptedException {
assert !getFSDirectory().hasReadLock();
assert !namesystem.hasReadLock();
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("StorageMovementNeeded queue remaining capacity is zero," LOG.debug("StorageMovementNeeded queue remaining capacity is zero,"
+ " waiting for some free slots."); + " waiting for some free slots.");

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hdfs.server.namenode; package org.apache.hadoop.hdfs.server.namenode.sps;
import static org.apache.hadoop.util.Time.monotonicNow; import static org.apache.hadoop.util.Time.monotonicNow;
@ -47,6 +47,9 @@
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager;
import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo; import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished; import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.StorageReport;
@ -84,6 +87,18 @@ public class StoragePolicySatisfier implements Runnable {
private int spsWorkMultiplier; private int spsWorkMultiplier;
private long blockCount = 0L; private long blockCount = 0L;
private int blockMovementMaxRetry; private int blockMovementMaxRetry;
private final Context ctxt;
/**
* An interface for analyzing and assigning the block storage movements to
* worker nodes.
*/
// TODO: Now, added one API which is required for sps package. Will refine
// this interface via HDFS-12911.
public interface Context {
int getNumLiveDataNodes();
}
/** /**
* Represents the collective analysis status for all blocks. * Represents the collective analysis status for all blocks.
*/ */
@ -122,7 +137,7 @@ enum Status {
} }
public StoragePolicySatisfier(final Namesystem namesystem, public StoragePolicySatisfier(final Namesystem namesystem,
final BlockManager blkManager, Configuration conf) { final BlockManager blkManager, Configuration conf, Context ctxt) {
this.namesystem = namesystem; this.namesystem = namesystem;
this.storageMovementNeeded = new BlockStorageMovementNeeded(namesystem, this.storageMovementNeeded = new BlockStorageMovementNeeded(namesystem,
this, conf.getInt( this, conf.getInt(
@ -141,6 +156,7 @@ public StoragePolicySatisfier(final Namesystem namesystem,
this.blockMovementMaxRetry = conf.getInt( this.blockMovementMaxRetry = conf.getInt(
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MAX_RETRY_ATTEMPTS_KEY, DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MAX_RETRY_ATTEMPTS_KEY,
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MAX_RETRY_ATTEMPTS_DEFAULT); DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MAX_RETRY_ATTEMPTS_DEFAULT);
this.ctxt = ctxt;
} }
/** /**
@ -313,8 +329,7 @@ public void run() {
} }
} }
} }
int numLiveDn = namesystem.getFSDirectory().getBlockManager() int numLiveDn = ctxt.getNumLiveDataNodes();
.getDatanodeManager().getNumLiveDataNodes();
if (storageMovementNeeded.size() == 0 if (storageMovementNeeded.size() == 0
|| blockCount > (numLiveDn * spsWorkMultiplier)) { || blockCount > (numLiveDn * spsWorkMultiplier)) {
Thread.sleep(3000); Thread.sleep(3000);
@ -816,7 +831,7 @@ private List<DatanodeDescriptor> getNodesWithStorages(StorageType type) {
* @param moveAttemptFinishedBlks * @param moveAttemptFinishedBlks
* set of storage movement attempt finished blocks. * set of storage movement attempt finished blocks.
*/ */
void handleStorageMovementAttemptFinishedBlks( public void handleStorageMovementAttemptFinishedBlks(
BlocksStorageMoveAttemptFinished moveAttemptFinishedBlks) { BlocksStorageMoveAttemptFinished moveAttemptFinishedBlks) {
if (moveAttemptFinishedBlks.getBlocks().length <= 0) { if (moveAttemptFinishedBlks.getBlocks().length <= 0) {
return; return;

View File

@ -0,0 +1,28 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* This package provides a mechanism for satisfying the storage policy of a
* path.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
package org.apache.hadoop.hdfs.server.namenode.sps;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hdfs.server.namenode; package org.apache.hadoop.hdfs.server.namenode.sps;
import static org.apache.hadoop.util.Time.monotonicNow; import static org.apache.hadoop.util.Time.monotonicNow;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
@ -26,9 +26,9 @@
import java.util.List; import java.util.List;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.namenode.StoragePolicySatisfier.AttemptedItemInfo; import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.AttemptedItemInfo;
import org.apache.hadoop.hdfs.server.namenode.StoragePolicySatisfier.ItemInfo; import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.ItemInfo;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hdfs.server.namenode; package org.apache.hadoop.hdfs.server.namenode.sps;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY; import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY;
@ -62,6 +62,10 @@
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
import org.apache.hadoop.hdfs.server.namenode.FSTreeTraverser;
import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.GenericTestUtils.LogCapturer; import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
import org.junit.Assert; import org.junit.Assert;
@ -520,7 +524,7 @@ public void testWhenMoverIsAlreadyRunningBeforeStoragePolicySatisfier()
try { try {
createCluster(); createCluster();
// Stop SPS // Stop SPS
hdfsCluster.getNameNode().reconfigurePropertyImpl( hdfsCluster.getNameNode().reconfigureProperty(
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, "false"); DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, "false");
running = hdfsCluster.getFileSystem() running = hdfsCluster.getFileSystem()
.getClient().isStoragePolicySatisfierRunning(); .getClient().isStoragePolicySatisfierRunning();
@ -531,7 +535,7 @@ public void testWhenMoverIsAlreadyRunningBeforeStoragePolicySatisfier()
HdfsServerConstants.MOVER_ID_PATH); HdfsServerConstants.MOVER_ID_PATH);
// Restart SPS // Restart SPS
hdfsCluster.getNameNode().reconfigurePropertyImpl( hdfsCluster.getNameNode().reconfigureProperty(
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, "true"); DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, "true");
running = hdfsCluster.getFileSystem() running = hdfsCluster.getFileSystem()
@ -546,7 +550,7 @@ public void testWhenMoverIsAlreadyRunningBeforeStoragePolicySatisfier()
HdfsServerConstants.MOVER_ID_PATH, true); HdfsServerConstants.MOVER_ID_PATH, true);
// Restart SPS again // Restart SPS again
hdfsCluster.getNameNode().reconfigurePropertyImpl( hdfsCluster.getNameNode().reconfigureProperty(
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, "true"); DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, "true");
running = hdfsCluster.getFileSystem() running = hdfsCluster.getFileSystem()
.getClient().isStoragePolicySatisfierRunning(); .getClient().isStoragePolicySatisfierRunning();
@ -1295,7 +1299,7 @@ public void testTraverseWhenParentDeleted() throws Exception {
StoragePolicySatisfier sps = Mockito.mock(StoragePolicySatisfier.class); StoragePolicySatisfier sps = Mockito.mock(StoragePolicySatisfier.class);
Mockito.when(sps.isRunning()).thenReturn(true); Mockito.when(sps.isRunning()).thenReturn(true);
BlockStorageMovementNeeded movmentNeededQueue = BlockStorageMovementNeeded movmentNeededQueue =
new BlockStorageMovementNeeded(fsDir.getFSNamesystem(), sps, 10); new BlockStorageMovementNeeded(hdfsCluster.getNamesystem(), sps, 10);
INode rootINode = fsDir.getINode("/root"); INode rootINode = fsDir.getINode("/root");
movmentNeededQueue.addToPendingDirQueue(rootINode.getId()); movmentNeededQueue.addToPendingDirQueue(rootINode.getId());
movmentNeededQueue.init(); movmentNeededQueue.init();
@ -1358,7 +1362,7 @@ public void testTraverseWhenRootParentDeleted() throws Exception {
// Queue limit can control the traverse logic to wait for some free // Queue limit can control the traverse logic to wait for some free
// entry in queue. After 10 files, traverse control will be on U. // entry in queue. After 10 files, traverse control will be on U.
BlockStorageMovementNeeded movmentNeededQueue = BlockStorageMovementNeeded movmentNeededQueue =
new BlockStorageMovementNeeded(fsDir.getFSNamesystem(), sps, 10); new BlockStorageMovementNeeded(hdfsCluster.getNamesystem(), sps, 10);
movmentNeededQueue.init(); movmentNeededQueue.init();
INode rootINode = fsDir.getINode("/root"); INode rootINode = fsDir.getINode("/root");
movmentNeededQueue.addToPendingDirQueue(rootINode.getId()); movmentNeededQueue.addToPendingDirQueue(rootINode.getId());

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hdfs.server.namenode; package org.apache.hadoop.hdfs.server.namenode.sps;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;