HDFS-13075. [SPS]: Provide External Context implementation. Contributed by Uma Maheswara Rao G.

This commit is contained in:
Surendra Singh Lilhore 2018-01-28 20:46:56 +05:30 committed by Uma Maheswara Rao Gangumalla
parent 3b83110d5e
commit 99594b48b8
15 changed files with 652 additions and 54 deletions

View File

@ -23,6 +23,8 @@
import org.apache.hadoop.hdfs.protocol.DatanodeInfo.DatanodeInfoBuilder;
import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.VersionRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.VersionResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.CheckDNSpaceRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.CheckDNSpaceResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.EndCheckpointRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.EndCheckpointResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.ErrorReportRequestProto;
@ -33,10 +35,16 @@
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlocksResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetEditLogManifestRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetEditLogManifestResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetFilePathRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetFilePathResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetMostRecentCheckpointTxIdRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetMostRecentCheckpointTxIdResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetNextSPSPathIdRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetNextSPSPathIdResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetTransactionIdRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetTransactionIdResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.HasLowRedundancyBlocksRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.HasLowRedundancyBlocksResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.IsRollingUpgradeRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.IsRollingUpgradeResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.IsUpgradeFinalizedRequestProto;
@ -257,4 +265,63 @@ public IsRollingUpgradeResponseProto isRollingUpgrade(
return IsRollingUpgradeResponseProto.newBuilder()
.setIsRollingUpgrade(isRollingUpgrade).build();
}
@Override
public GetNextSPSPathIdResponseProto getNextSPSPathId(
RpcController controller, GetNextSPSPathIdRequestProto request)
throws ServiceException {
try {
Long nextSPSPathId = impl.getNextSPSPathId();
if (nextSPSPathId == null) {
return GetNextSPSPathIdResponseProto.newBuilder().build();
}
return GetNextSPSPathIdResponseProto.newBuilder().setFileId(nextSPSPathId)
.build();
} catch (IOException e) {
throw new ServiceException(e);
}
}
@Override
public GetFilePathResponseProto getFilePath(RpcController controller,
GetFilePathRequestProto request) throws ServiceException {
try {
return GetFilePathResponseProto.newBuilder()
.setSrcPath(impl.getFilePath(request.getFileId())).build();
} catch (IOException e) {
throw new ServiceException(e);
}
}
@Override
public CheckDNSpaceResponseProto checkDNSpaceForScheduling(
RpcController controller, CheckDNSpaceRequestProto request)
throws ServiceException {
try {
CheckDNSpaceResponseProto build = CheckDNSpaceResponseProto.newBuilder()
.setIsGoodDatanodeWithSpace(impl.checkDNSpaceForScheduling(
PBHelperClient.convert(request.getDnInfo()),
PBHelperClient.convertStorageType(request.getStorageType()),
request.getEstimatedSize()))
.build();
return build;
} catch (IOException e) {
throw new ServiceException(e);
}
}
@Override
public HasLowRedundancyBlocksResponseProto hasLowRedundancyBlocks(
RpcController controller, HasLowRedundancyBlocksRequestProto request)
throws ServiceException {
try {
return HasLowRedundancyBlocksResponseProto.newBuilder()
.setHasLowRedundancyBlocks(
impl.hasLowRedundancyBlocks(request.getInodeId()))
.build();
} catch (IOException e) {
throw new ServiceException(e);
}
}
}

View File

@ -22,18 +22,24 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.NamenodeCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.VersionRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.CheckDNSpaceRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.EndCheckpointRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.ErrorReportRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlockKeysRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlockKeysResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlocksRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetEditLogManifestRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetFilePathRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetMostRecentCheckpointTxIdRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetNextSPSPathIdRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetNextSPSPathIdResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetTransactionIdRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.HasLowRedundancyBlocksRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.IsRollingUpgradeRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.IsRollingUpgradeResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.IsUpgradeFinalizedRequestProto;
@ -263,4 +269,56 @@ public boolean isRollingUpgrade() throws IOException {
throw ProtobufHelper.getRemoteException(e);
}
}
@Override
public Long getNextSPSPathId() throws IOException {
GetNextSPSPathIdRequestProto req =
GetNextSPSPathIdRequestProto.newBuilder().build();
try {
GetNextSPSPathIdResponseProto nextSPSPathId =
rpcProxy.getNextSPSPathId(NULL_CONTROLLER, req);
return nextSPSPathId.hasFileId() ? nextSPSPathId.getFileId() : null;
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
@Override
public String getFilePath(Long inodeId) throws IOException {
GetFilePathRequestProto req =
GetFilePathRequestProto.newBuilder().setFileId(inodeId).build();
try {
return rpcProxy.getFilePath(NULL_CONTROLLER, req).getSrcPath();
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
@Override
public boolean checkDNSpaceForScheduling(DatanodeInfo dn, StorageType type,
long estimatedSize) throws IOException {
CheckDNSpaceRequestProto req = CheckDNSpaceRequestProto.newBuilder()
.setDnInfo(PBHelperClient.convert(dn))
.setStorageType(PBHelperClient.convertStorageType(type))
.setEstimatedSize(estimatedSize).build();
try {
return rpcProxy.checkDNSpaceForScheduling(NULL_CONTROLLER, req)
.getIsGoodDatanodeWithSpace();
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
@Override
public boolean hasLowRedundancyBlocks(long inodeId) throws IOException {
HasLowRedundancyBlocksRequestProto req = HasLowRedundancyBlocksRequestProto
.newBuilder().setInodeId(inodeId).build();
try {
return rpcProxy.hasLowRedundancyBlocks(NULL_CONTROLLER, req)
.getHasLowRedundancyBlocks();
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
}

View File

@ -66,7 +66,8 @@ public class NameNodeConnector implements Closeable {
public static final int DEFAULT_MAX_IDLE_ITERATIONS = 5;
private static boolean write2IdFile = true;
private static boolean checkOtherInstanceRunning = true;
/** Create {@link NameNodeConnector} for the given namenodes. */
public static List<NameNodeConnector> newNameNodeConnectors(
Collection<URI> namenodes, String name, Path idPath, Configuration conf,
@ -101,6 +102,11 @@ public static void setWrite2IdFile(boolean write2IdFile) {
NameNodeConnector.write2IdFile = write2IdFile;
}
@VisibleForTesting
public static void checkOtherInstanceRunning(boolean toCheck) {
NameNodeConnector.checkOtherInstanceRunning = toCheck;
}
private final URI nameNodeUri;
private final String blockpoolID;
@ -111,7 +117,7 @@ public static void setWrite2IdFile(boolean write2IdFile) {
private final DistributedFileSystem fs;
private final Path idPath;
private final OutputStream out;
private OutputStream out;
private final List<Path> targetPaths;
private final AtomicLong bytesMoved = new AtomicLong();
@ -141,10 +147,12 @@ public NameNodeConnector(String name, URI nameNodeUri, Path idPath,
this.keyManager = new KeyManager(blockpoolID, namenode,
defaults.getEncryptDataTransfer(), conf);
// if it is for test, we do not create the id file
out = checkAndMarkRunning();
if (out == null) {
// Exit if there is another one running.
throw new IOException("Another " + name + " is running.");
if (checkOtherInstanceRunning) {
out = checkAndMarkRunning();
if (out == null) {
// Exit if there is another one running.
throw new IOException("Another " + name + " is running.");
}
}
}
@ -285,13 +293,19 @@ public void close() {
IOUtils.closeStream(out);
if (fs != null) {
try {
fs.delete(idPath, true);
if (checkOtherInstanceRunning) {
fs.delete(idPath, true);
}
} catch(IOException ioe) {
LOG.warn("Failed to delete " + idPath, ioe);
}
}
}
public NamenodeProtocol getNNProtocolConnection() {
return this.namenode;
}
@Override
public String toString() {
return getClass().getSimpleName() + "[namenodeUri=" + nameNodeUri

View File

@ -5020,6 +5020,25 @@ public ProvidedStorageMap getProvidedStorageMap() {
return providedStorageMap;
}
/**
* Check whether file id has low redundancy blocks.
*
* @param inodeID
* - inode id
*/
public boolean hasLowRedundancyBlocks(long inodeID) {
namesystem.readLock();
try {
BlockCollection bc = namesystem.getBlockCollection(inodeID);
if (bc == null) {
return false;
}
return hasLowRedundancyBlocks(bc);
} finally {
namesystem.readUnlock();
}
}
/**
* Gets the storage policy satisfier instance.
*

View File

@ -30,6 +30,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
@ -2067,5 +2068,22 @@ public DatanodeStorageReport[] getDatanodeStorageReport(
}
return reports;
}
public boolean verifyTargetDatanodeHasSpaceForScheduling(DatanodeInfo dn,
StorageType type, long estimatedSize) {
namesystem.readLock();
try {
DatanodeDescriptor datanode =
blockManager.getDatanodeManager().getDatanode(dn.getDatanodeUuid());
if (datanode == null) {
LOG.debug("Target datanode: " + dn + " doesn't exists");
return false;
}
return null != datanode.chooseStorage4Block(type, estimatedSize);
} finally {
namesystem.readUnlock();
}
}
}

View File

@ -365,8 +365,7 @@ enum BlockUCState {
String XATTR_ERASURECODING_POLICY =
"system.hdfs.erasurecoding.policy";
String XATTR_SATISFY_STORAGE_POLICY =
"system.hdfs.satisfy.storage.policy";
String XATTR_SATISFY_STORAGE_POLICY = "user.hdfs.sps.xattr";
Path MOVER_ID_PATH = new Path("/system/mover.id");

View File

@ -2537,10 +2537,15 @@ public List<String> listReconfigurableProperties() throws IOException {
@Override
public boolean isStoragePolicySatisfierRunning() throws IOException {
checkNNStartup();
String operationName = "isStoragePolicySatisfierRunning";
namesystem.checkSuperuserPrivilege(operationName);
if (nn.isStandbyState()) {
throw new StandbyException("Not supported by Standby Namenode.");
}
return namesystem.getBlockManager().isStoragePolicySatisfierRunning();
boolean isSPSRunning =
namesystem.getBlockManager().isStoragePolicySatisfierRunning();
namesystem.logAuditEvent(true, operationName, null);
return isSPSRunning;
}
@Override
@ -2553,4 +2558,50 @@ public StoragePolicySatisfyPathStatus checkStoragePolicySatisfyPathStatus(
return namesystem.getBlockManager().checkStoragePolicySatisfyPathStatus(
path);
}
@Override
public String getFilePath(Long inodeId) throws IOException {
checkNNStartup();
String operationName = "getFilePath";
namesystem.checkSuperuserPrivilege(operationName);
if (nn.isStandbyState()) {
throw new StandbyException("Not supported by Standby Namenode.");
}
return namesystem.getFilePath(inodeId);
}
@Override
public Long getNextSPSPathId() throws IOException {
checkNNStartup();
String operationName = "getNextSPSPathId";
namesystem.checkSuperuserPrivilege(operationName);
if (nn.isStandbyState()) {
throw new StandbyException("Not supported by Standby Namenode.");
}
return namesystem.getBlockManager().getNextSPSPathId();
}
@Override
public boolean checkDNSpaceForScheduling(DatanodeInfo dn,
StorageType type, long estimatedSize) throws IOException {
checkNNStartup();
String operationName = "checkDNSpaceForScheduling";
namesystem.checkSuperuserPrivilege(operationName);
if (nn.isStandbyState()) {
throw new StandbyException("Not supported by Standby Namenode.");
}
return namesystem.getBlockManager().getDatanodeManager()
.verifyTargetDatanodeHasSpaceForScheduling(dn, type, estimatedSize);
}
@Override
public boolean hasLowRedundancyBlocks(long inodeId) throws IOException {
checkNNStartup();
String operationName = "hasLowRedundancyBlocks";
namesystem.checkSuperuserPrivilege(operationName);
if (nn.isStandbyState()) {
throw new StandbyException("Not supported by Standby Namenode.");
}
return namesystem.getBlockManager().hasLowRedundancyBlocks(inodeId);
}
}

View File

@ -319,12 +319,16 @@ public void run() {
String reClass = t.getClass().getName();
if (InterruptedException.class.getName().equals(reClass)) {
LOG.info("SPSPathIdProcessor thread is interrupted. Stopping..");
Thread.currentThread().interrupt();
break;
}
LOG.warn("Exception while scanning file inodes to satisfy the policy",
t);
// TODO: may be we should retry the current inode id?
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
LOG.info("Interrupted while waiting in SPSPathIdProcessor", t);
break;
}
}
}
}

View File

@ -149,8 +149,8 @@ DatanodeStorageReport[] getLiveDatanodeStorageReport()
* @return true if the given datanode has sufficient space to occupy blockSize
* data, false otherwise.
*/
boolean verifyTargetDatanodeHasSpaceForScheduling(DatanodeInfo dn,
StorageType type, long blockSize);
boolean checkDNSpaceForScheduling(DatanodeInfo dn, StorageType type,
long blockSize);
/**
* @return next SPS path id to process.
@ -175,4 +175,9 @@ boolean verifyTargetDatanodeHasSpaceForScheduling(DatanodeInfo dn,
*/
String getFilePath(Long inodeId);
/**
* Close the resources.
*/
void close() throws IOException;
}

View File

@ -30,7 +30,6 @@
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
@ -98,17 +97,8 @@ public DatanodeStorageReport[] getLiveDatanodeStorageReport()
}
@Override
public boolean hasLowRedundancyBlocks(long inodeID) {
namesystem.readLock();
try {
BlockCollection bc = namesystem.getBlockCollection(inodeID);
if (bc == null) {
return false;
}
return blockManager.hasLowRedundancyBlocks(bc);
} finally {
namesystem.readUnlock();
}
public boolean hasLowRedundancyBlocks(long inodeId) {
return blockManager.hasLowRedundancyBlocks(inodeId);
}
@Override
@ -170,8 +160,8 @@ public long getFileID(String path) throws UnresolvedLinkException,
}
@Override
public boolean verifyTargetDatanodeHasSpaceForScheduling(DatanodeInfo dn,
StorageType type, long blockSize) {
public boolean checkDNSpaceForScheduling(DatanodeInfo dn, StorageType type,
long blockSize) {
namesystem.readLock();
try {
DatanodeDescriptor datanode = blockManager.getDatanodeManager()
@ -205,4 +195,9 @@ public void removeAllSPSPathIds() {
public String getFilePath(Long inodeId) {
return namesystem.getFilePath(inodeId);
}
@Override
public void close() throws IOException {
// Nothing to clean.
}
}

View File

@ -325,6 +325,9 @@ public void run() {
}
}
}
} else {
LOG.info("Namenode is in safemode. It will retry again.");
Thread.sleep(3000);
}
int numLiveDn = ctxt.getNumLiveDataNodes();
if (storageMovementNeeded.size() == 0
@ -706,8 +709,8 @@ private void buildStripedBlockMovingInfos(LocatedBlock blockInfo,
private StorageTypeNodePair chooseTargetTypeInSameNode(LocatedBlock blockInfo,
DatanodeInfo source, List<StorageType> targetTypes) {
for (StorageType t : targetTypes) {
boolean goodTargetDn = ctxt.verifyTargetDatanodeHasSpaceForScheduling(
source, t, blockInfo.getBlockSize());
boolean goodTargetDn =
ctxt.checkDNSpaceForScheduling(source, t, blockInfo.getBlockSize());
if (goodTargetDn) {
return new StorageTypeNodePair(t, source);
}
@ -720,8 +723,8 @@ private StorageTypeNodePair chooseTarget(LocatedBlock block,
StorageTypeNodeMap locsForExpectedStorageTypes,
List<DatanodeInfo> excludeNodes) {
for (StorageType t : targetTypes) {
List<DatanodeInfo> nodesWithStorages = locsForExpectedStorageTypes
.getNodesWithStorages(t);
List<DatanodeInfo> nodesWithStorages =
locsForExpectedStorageTypes.getNodesWithStorages(t);
if (nodesWithStorages == null || nodesWithStorages.isEmpty()) {
continue; // no target nodes with the required storage type.
}
@ -729,8 +732,8 @@ private StorageTypeNodePair chooseTarget(LocatedBlock block,
for (DatanodeInfo target : nodesWithStorages) {
if (!excludeNodes.contains(target)
&& matcher.match(ctxt.getNetworkTopology(), source, target)) {
boolean goodTargetDn = ctxt.verifyTargetDatanodeHasSpaceForScheduling(
target, t, block.getBlockSize());
boolean goodTargetDn =
ctxt.checkDNSpaceForScheduling(target, t, block.getBlockSize());
if (goodTargetDn) {
return new StorageTypeNodePair(t, target);
}

View File

@ -21,6 +21,7 @@
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
@ -31,7 +32,8 @@
/*****************************************************************************
* Protocol that a secondary NameNode uses to communicate with the NameNode.
* It's used to get part of the name node state
* Also used by external storage policy satisfier. It's used to get part of the
* name node state
*****************************************************************************/
@KerberosInfo(
serverPrincipal = DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY)
@ -202,5 +204,47 @@ public RemoteEditLogManifest getEditLogManifest(long sinceTxId)
*/
@Idempotent
boolean isRollingUpgrade() throws IOException;
/**
* Gets the file path for the given file id. This API used by External SPS.
*
* @param inodeId
* - file inode id.
* @return path
*/
@Idempotent
String getFilePath(Long inodeId) throws IOException;
/**
* @return Gets the next available sps path id, otherwise null. This API used
* by External SPS.
*/
@AtMostOnce
Long getNextSPSPathId() throws IOException;
/**
* Verifies whether the given Datanode has the enough estimated size with
* given storage type for scheduling the block. This API used by External SPS.
*
* @param dn
* - datanode
* @param type
* - storage type
* @param estimatedSize
* - size
*/
@Idempotent
boolean checkDNSpaceForScheduling(DatanodeInfo dn, StorageType type,
long estimatedSize) throws IOException;
/**
* Check if any low redundancy blocks for given file id. This API used by
* External SPS.
*
* @param inodeID
* - inode id.
*/
@Idempotent
boolean hasLowRedundancyBlocks(long inodeID) throws IOException;
}

View File

@ -0,0 +1,271 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.sps;
import java.io.IOException;
import java.net.URI;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.sps.Context;
import org.apache.hadoop.hdfs.server.namenode.sps.SPSService;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.security.AccessControlException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class used to connect to Namenode and gets the required information to
* SPS from Namenode state.
*/
@InterfaceAudience.Private
public class ExternalSPSContext implements Context {
public static final Logger LOG =
LoggerFactory.getLogger(ExternalSPSContext.class);
private SPSService service;
private NameNodeConnector nnc = null;
private Object nnConnectionLock = new Object();
private BlockStoragePolicySuite createDefaultSuite =
BlockStoragePolicySuite.createDefaultSuite();
public ExternalSPSContext(SPSService service) {
this.service = service;
initializeNamenodeConnector();
}
@Override
public boolean isRunning() {
return service.isRunning();
}
@Override
public boolean isInSafeMode() {
initializeNamenodeConnector();
try {
return nnc != null ? nnc.getDistributedFileSystem().isInSafeMode()
: false;
} catch (IOException e) {
LOG.warn("Exception while creating Namenode Connector..", e);
return false;
}
}
@Override
public boolean isMoverRunning() {
initializeNamenodeConnector();
try {
FSDataOutputStream out = nnc.getDistributedFileSystem()
.append(HdfsServerConstants.MOVER_ID_PATH);
out.close();
return false;
} catch (IOException ioe) {
LOG.warn("Exception while checking mover is running..", ioe);
return true;
}
}
@Override
public long getFileID(String path) throws UnresolvedLinkException,
AccessControlException, ParentNotDirectoryException {
initializeNamenodeConnector();
HdfsFileStatus fs = null;
try {
fs = (HdfsFileStatus) nnc.getDistributedFileSystem().getFileStatus(
new Path(path));
LOG.info("Fetched the fileID:{} for the path:{}", fs.getFileId(), path);
} catch (IllegalArgumentException | IOException e) {
LOG.warn("Exception while getting file is for the given path:{}.", path,
e);
}
return fs != null ? fs.getFileId() : 0;
}
@Override
public NetworkTopology getNetworkTopology() {
return NetworkTopology.getInstance(service.getConf());
}
@Override
public boolean isFileExist(long inodeId) {
initializeNamenodeConnector();
String filePath = null;
try {
filePath = getFilePath(inodeId);
return nnc.getDistributedFileSystem().exists(new Path(filePath));
} catch (IllegalArgumentException | IOException e) {
LOG.warn("Exception while getting file is for the given path:{} "
+ "and fileId:{}", filePath, inodeId, e);
}
return false;
}
@Override
public BlockStoragePolicy getStoragePolicy(byte policyId) {
return createDefaultSuite.getPolicy(policyId);
}
@Override
public void addDropPreviousSPSWorkAtDNs() {
// Nothing todo
}
@Override
public void removeSPSHint(long inodeId) throws IOException {
initializeNamenodeConnector();
nnc.getDistributedFileSystem().removeXAttr(new Path(getFilePath(inodeId)),
HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY);
}
@Override
public int getNumLiveDataNodes() {
initializeNamenodeConnector();
try {
return nnc.getDistributedFileSystem()
.getDataNodeStats(DatanodeReportType.LIVE).length;
} catch (IOException e) {
LOG.warn("Exception while getting number of live datanodes.", e);
}
return 0;
}
@Override
public HdfsFileStatus getFileInfo(long inodeID) throws IOException {
initializeNamenodeConnector();
return nnc.getDistributedFileSystem().getClient()
.getLocatedFileInfo(getFilePath(inodeID), false);
}
@Override
public DatanodeStorageReport[] getLiveDatanodeStorageReport()
throws IOException {
initializeNamenodeConnector();
return nnc.getLiveDatanodeStorageReport();
}
@Override
public boolean hasLowRedundancyBlocks(long inodeID) {
initializeNamenodeConnector();
try {
return nnc.getNNProtocolConnection().hasLowRedundancyBlocks(inodeID);
} catch (IOException e) {
LOG.warn("Failed to check whether fileid:{} has low redundancy blocks.",
inodeID, e);
return false;
}
}
@Override
public boolean checkDNSpaceForScheduling(DatanodeInfo dn, StorageType type,
long estimatedSize) {
initializeNamenodeConnector();
try {
return nnc.getNNProtocolConnection().checkDNSpaceForScheduling(dn, type,
estimatedSize);
} catch (IOException e) {
LOG.warn("Verify the given datanode:{} is good and has "
+ "estimated space in it.", dn, e);
return false;
}
}
@Override
public Long getNextSPSPathId() {
initializeNamenodeConnector();
try {
return nnc.getNNProtocolConnection().getNextSPSPathId();
} catch (IOException e) {
LOG.warn("Exception while getting next sps path id from Namenode.", e);
return null;
}
}
@Override
public void removeSPSPathId(long pathId) {
// We need not specifically implement for external.
}
@Override
public void removeAllSPSPathIds() {
// We need not specifically implement for external.
}
@Override
public String getFilePath(Long inodeId) {
try {
return nnc.getNNProtocolConnection().getFilePath(inodeId);
} catch (IOException e) {
LOG.warn("Exception while getting file path id:{} from Namenode.",
inodeId, e);
return null;
}
}
@Override
public void close() throws IOException {
synchronized (nnConnectionLock) {
if (nnc != null) {
nnc.close();
}
}
}
private void initializeNamenodeConnector() {
synchronized (nnConnectionLock) {
if (nnc == null) {
try {
nnc = getNameNodeConnector(service.getConf());
} catch (IOException e) {
LOG.warn("Exception while creating Namenode Connector.."
+ "Namenode might not have started.", e);
}
}
}
}
public static NameNodeConnector getNameNodeConnector(Configuration conf)
throws IOException {
final Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
List<NameNodeConnector> nncs = Collections.emptyList();
NameNodeConnector.checkOtherInstanceRunning(false);
nncs = NameNodeConnector.newNameNodeConnectors(namenodes,
ExternalSPSContext.class.getSimpleName(),
HdfsServerConstants.MOVER_ID_PATH, conf,
NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS);
return nncs.get(0);
}
}

View File

@ -206,6 +206,39 @@ message IsRollingUpgradeResponseProto {
required bool isRollingUpgrade = 1;
}
message GetFilePathRequestProto {
required uint64 fileId = 1;
}
message GetFilePathResponseProto {
required string srcPath = 1;
}
message GetNextSPSPathIdRequestProto {
}
message GetNextSPSPathIdResponseProto {
optional uint64 fileId = 1;
}
message CheckDNSpaceRequestProto {
required DatanodeInfoProto dnInfo = 1;
required StorageTypeProto storageType = 2;
required uint64 estimatedSize = 3;
}
message CheckDNSpaceResponseProto {
required bool isGoodDatanodeWithSpace = 1;
}
message HasLowRedundancyBlocksRequestProto {
required uint64 inodeId = 1;
}
message HasLowRedundancyBlocksResponseProto {
required bool hasLowRedundancyBlocks = 1;
}
/**
* Protocol used by the sub-ordinate namenode to send requests
* the active/primary namenode.
@ -287,4 +320,28 @@ service NamenodeProtocolService {
*/
rpc isRollingUpgrade(IsRollingUpgradeRequestProto)
returns (IsRollingUpgradeResponseProto);
/**
* Return the corresponding file path for give file id
*/
rpc getFilePath(GetFilePathRequestProto)
returns (GetFilePathResponseProto);
/**
* Return the sps path id from namenode
*/
rpc getNextSPSPathId(GetNextSPSPathIdRequestProto)
returns (GetNextSPSPathIdResponseProto);
/**
* Return the sps path id from namenode
*/
rpc checkDNSpaceForScheduling(CheckDNSpaceRequestProto)
returns (CheckDNSpaceResponseProto);
/**
* check whether given file id has low redundancy blocks.
*/
rpc hasLowRedundancyBlocks(HasLowRedundancyBlocksRequestProto)
returns (HasLowRedundancyBlocksResponseProto);
}

View File

@ -37,7 +37,6 @@
import org.apache.hadoop.hdfs.server.namenode.sps.BlockMovementListener;
import org.apache.hadoop.hdfs.server.namenode.sps.Context;
import org.apache.hadoop.hdfs.server.namenode.sps.FileIdCollector;
import org.apache.hadoop.hdfs.server.namenode.sps.IntraSPSNameNodeContext;
import org.apache.hadoop.hdfs.server.namenode.sps.SPSService;
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
import org.apache.hadoop.hdfs.server.namenode.sps.TestStoragePolicySatisfier;
@ -96,14 +95,8 @@ public MiniDFSCluster startCluster(final Configuration conf,
SPSService spsService = blkMgr.getSPSService();
spsService.stopGracefully();
// TODO: Since External is not fully implemented, just used INTERNAL now.
// Need to set External context here.
IntraSPSNameNodeContext context = new IntraSPSNameNodeContext(
cluster.getNameNode().getNamesystem(), blkMgr, blkMgr.getSPSService()) {
public boolean isRunning() {
return true;
};
};
ExternalSPSContext context = new ExternalSPSContext(spsService);
ExternalBlockMovementListener blkMoveListener =
new ExternalBlockMovementListener();
ExternalSPSBlockMoveTaskHandler externalHandler =
@ -131,15 +124,7 @@ public void restartNamenode() throws IOException{
spsService = blkMgr.getSPSService();
spsService.stopGracefully();
// TODO: Since External is not fully implemented, just used INTERNAL now.
// Need to set External context here.
IntraSPSNameNodeContext context = new IntraSPSNameNodeContext(
getCluster().getNameNode().getNamesystem(), blkMgr,
blkMgr.getSPSService()) {
public boolean isRunning() {
return true;
};
};
ExternalSPSContext context = new ExternalSPSContext(spsService);
ExternalBlockMovementListener blkMoveListener =
new ExternalBlockMovementListener();
ExternalSPSBlockMoveTaskHandler externalHandler =
@ -180,7 +165,7 @@ private NameNodeConnector getNameNodeConnector(Configuration conf)
for (URI nn : namenodes) {
nnMap.put(nn, null);
}
final Path externalSPSPathId = new Path("/system/externalSPS.id");
final Path externalSPSPathId = new Path("/system/tmp.id");
final List<NameNodeConnector> nncs = NameNodeConnector
.newNameNodeConnectors(nnMap,
StoragePolicySatisfier.class.getSimpleName(), externalSPSPathId,
@ -204,6 +189,14 @@ public void testBatchProcessingForSPSDirectory() throws Exception {
public void testStoragePolicySatisfyPathStatus() throws Exception {
}
/**
* This test case is more specific to internal.
*/
@Ignore("This test is specific to internal, so skipping here.")
public void testWhenMoverIsAlreadyRunningBeforeStoragePolicySatisfier()
throws Exception {
}
/**
* Status won't be supported for external SPS, now. So, ignoring it.
*/