diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java index 90c2c49280..25eafdf286 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java @@ -23,6 +23,8 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo.DatanodeInfoBuilder; import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.VersionRequestProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.VersionResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.CheckDNSpaceRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.CheckDNSpaceResponseProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.EndCheckpointRequestProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.EndCheckpointResponseProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.ErrorReportRequestProto; @@ -33,10 +35,16 @@ import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlocksResponseProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetEditLogManifestRequestProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetEditLogManifestResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetFilePathRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetFilePathResponseProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetMostRecentCheckpointTxIdRequestProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetMostRecentCheckpointTxIdResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetNextSPSPathIdRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetNextSPSPathIdResponseProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetTransactionIdRequestProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetTransactionIdResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.HasLowRedundancyBlocksRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.HasLowRedundancyBlocksResponseProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.IsRollingUpgradeRequestProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.IsRollingUpgradeResponseProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.IsUpgradeFinalizedRequestProto; @@ -257,4 +265,63 @@ public IsRollingUpgradeResponseProto isRollingUpgrade( return IsRollingUpgradeResponseProto.newBuilder() .setIsRollingUpgrade(isRollingUpgrade).build(); } + + @Override + public GetNextSPSPathIdResponseProto getNextSPSPathId( + RpcController controller, GetNextSPSPathIdRequestProto request) + throws ServiceException { + try { + Long nextSPSPathId = impl.getNextSPSPathId(); + if (nextSPSPathId == null) { + return GetNextSPSPathIdResponseProto.newBuilder().build(); + } + return GetNextSPSPathIdResponseProto.newBuilder().setFileId(nextSPSPathId) + .build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public GetFilePathResponseProto getFilePath(RpcController controller, + GetFilePathRequestProto request) throws ServiceException { + try { + return GetFilePathResponseProto.newBuilder() + .setSrcPath(impl.getFilePath(request.getFileId())).build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public CheckDNSpaceResponseProto checkDNSpaceForScheduling( + RpcController controller, CheckDNSpaceRequestProto request) + throws ServiceException { + try { + CheckDNSpaceResponseProto build = CheckDNSpaceResponseProto.newBuilder() + .setIsGoodDatanodeWithSpace(impl.checkDNSpaceForScheduling( + PBHelperClient.convert(request.getDnInfo()), + PBHelperClient.convertStorageType(request.getStorageType()), + request.getEstimatedSize())) + .build(); + return build; + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public HasLowRedundancyBlocksResponseProto hasLowRedundancyBlocks( + RpcController controller, HasLowRedundancyBlocksRequestProto request) + throws ServiceException { + try { + return HasLowRedundancyBlocksResponseProto.newBuilder() + .setHasLowRedundancyBlocks( + impl.hasLowRedundancyBlocks(request.getInodeId())) + .build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java index 632f8b7d74..8bff499650 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java @@ -22,18 +22,24 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.NamenodeCommandProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.VersionRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.CheckDNSpaceRequestProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.EndCheckpointRequestProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.ErrorReportRequestProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlockKeysRequestProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlockKeysResponseProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlocksRequestProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetEditLogManifestRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetFilePathRequestProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetMostRecentCheckpointTxIdRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetNextSPSPathIdRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetNextSPSPathIdResponseProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetTransactionIdRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.HasLowRedundancyBlocksRequestProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.IsRollingUpgradeRequestProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.IsRollingUpgradeResponseProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.IsUpgradeFinalizedRequestProto; @@ -263,4 +269,56 @@ public boolean isRollingUpgrade() throws IOException { throw ProtobufHelper.getRemoteException(e); } } + + @Override + public Long getNextSPSPathId() throws IOException { + GetNextSPSPathIdRequestProto req = + GetNextSPSPathIdRequestProto.newBuilder().build(); + try { + GetNextSPSPathIdResponseProto nextSPSPathId = + rpcProxy.getNextSPSPathId(NULL_CONTROLLER, req); + return nextSPSPathId.hasFileId() ? nextSPSPathId.getFileId() : null; + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public String getFilePath(Long inodeId) throws IOException { + GetFilePathRequestProto req = + GetFilePathRequestProto.newBuilder().setFileId(inodeId).build(); + try { + return rpcProxy.getFilePath(NULL_CONTROLLER, req).getSrcPath(); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public boolean checkDNSpaceForScheduling(DatanodeInfo dn, StorageType type, + long estimatedSize) throws IOException { + CheckDNSpaceRequestProto req = CheckDNSpaceRequestProto.newBuilder() + .setDnInfo(PBHelperClient.convert(dn)) + .setStorageType(PBHelperClient.convertStorageType(type)) + .setEstimatedSize(estimatedSize).build(); + try { + return rpcProxy.checkDNSpaceForScheduling(NULL_CONTROLLER, req) + .getIsGoodDatanodeWithSpace(); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public boolean hasLowRedundancyBlocks(long inodeId) throws IOException { + HasLowRedundancyBlocksRequestProto req = HasLowRedundancyBlocksRequestProto + .newBuilder().setInodeId(inodeId).build(); + try { + return rpcProxy.hasLowRedundancyBlocks(NULL_CONTROLLER, req) + .getHasLowRedundancyBlocks(); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java index 6bfbbb3f9c..2b3c1935e0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java @@ -66,7 +66,8 @@ public class NameNodeConnector implements Closeable { public static final int DEFAULT_MAX_IDLE_ITERATIONS = 5; private static boolean write2IdFile = true; - + private static boolean checkOtherInstanceRunning = true; + /** Create {@link NameNodeConnector} for the given namenodes. */ public static List newNameNodeConnectors( Collection namenodes, String name, Path idPath, Configuration conf, @@ -101,6 +102,11 @@ public static void setWrite2IdFile(boolean write2IdFile) { NameNodeConnector.write2IdFile = write2IdFile; } + @VisibleForTesting + public static void checkOtherInstanceRunning(boolean toCheck) { + NameNodeConnector.checkOtherInstanceRunning = toCheck; + } + private final URI nameNodeUri; private final String blockpoolID; @@ -111,7 +117,7 @@ public static void setWrite2IdFile(boolean write2IdFile) { private final DistributedFileSystem fs; private final Path idPath; - private final OutputStream out; + private OutputStream out; private final List targetPaths; private final AtomicLong bytesMoved = new AtomicLong(); @@ -141,10 +147,12 @@ public NameNodeConnector(String name, URI nameNodeUri, Path idPath, this.keyManager = new KeyManager(blockpoolID, namenode, defaults.getEncryptDataTransfer(), conf); // if it is for test, we do not create the id file - out = checkAndMarkRunning(); - if (out == null) { - // Exit if there is another one running. - throw new IOException("Another " + name + " is running."); + if (checkOtherInstanceRunning) { + out = checkAndMarkRunning(); + if (out == null) { + // Exit if there is another one running. + throw new IOException("Another " + name + " is running."); + } } } @@ -285,13 +293,19 @@ public void close() { IOUtils.closeStream(out); if (fs != null) { try { - fs.delete(idPath, true); + if (checkOtherInstanceRunning) { + fs.delete(idPath, true); + } } catch(IOException ioe) { LOG.warn("Failed to delete " + idPath, ioe); } } } + public NamenodeProtocol getNNProtocolConnection() { + return this.namenode; + } + @Override public String toString() { return getClass().getSimpleName() + "[namenodeUri=" + nameNodeUri diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 9ef10456ea..ac6d44b035 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -5020,6 +5020,25 @@ public ProvidedStorageMap getProvidedStorageMap() { return providedStorageMap; } + /** + * Check whether file id has low redundancy blocks. + * + * @param inodeID + * - inode id + */ + public boolean hasLowRedundancyBlocks(long inodeID) { + namesystem.readLock(); + try { + BlockCollection bc = namesystem.getBlockCollection(inodeID); + if (bc == null) { + return false; + } + return hasLowRedundancyBlocks(bc); + } finally { + namesystem.readUnlock(); + } + } + /** * Gets the storage policy satisfier instance. * diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index c24a38bc2a..3542864310 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -30,6 +30,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; @@ -2067,5 +2068,22 @@ public DatanodeStorageReport[] getDatanodeStorageReport( } return reports; } + + public boolean verifyTargetDatanodeHasSpaceForScheduling(DatanodeInfo dn, + StorageType type, long estimatedSize) { + namesystem.readLock(); + try { + DatanodeDescriptor datanode = + blockManager.getDatanodeManager().getDatanode(dn.getDatanodeUuid()); + if (datanode == null) { + LOG.debug("Target datanode: " + dn + " doesn't exists"); + return false; + } + return null != datanode.chooseStorage4Block(type, estimatedSize); + } finally { + namesystem.readUnlock(); + } + } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java index 42a2fc6ff1..1378de2575 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java @@ -365,8 +365,7 @@ enum BlockUCState { String XATTR_ERASURECODING_POLICY = "system.hdfs.erasurecoding.policy"; - String XATTR_SATISFY_STORAGE_POLICY = - "system.hdfs.satisfy.storage.policy"; + String XATTR_SATISFY_STORAGE_POLICY = "user.hdfs.sps.xattr"; Path MOVER_ID_PATH = new Path("/system/mover.id"); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index 4738bf58e5..0e509656e1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -2537,10 +2537,15 @@ public List listReconfigurableProperties() throws IOException { @Override public boolean isStoragePolicySatisfierRunning() throws IOException { checkNNStartup(); + String operationName = "isStoragePolicySatisfierRunning"; + namesystem.checkSuperuserPrivilege(operationName); if (nn.isStandbyState()) { throw new StandbyException("Not supported by Standby Namenode."); } - return namesystem.getBlockManager().isStoragePolicySatisfierRunning(); + boolean isSPSRunning = + namesystem.getBlockManager().isStoragePolicySatisfierRunning(); + namesystem.logAuditEvent(true, operationName, null); + return isSPSRunning; } @Override @@ -2553,4 +2558,50 @@ public StoragePolicySatisfyPathStatus checkStoragePolicySatisfyPathStatus( return namesystem.getBlockManager().checkStoragePolicySatisfyPathStatus( path); } + + @Override + public String getFilePath(Long inodeId) throws IOException { + checkNNStartup(); + String operationName = "getFilePath"; + namesystem.checkSuperuserPrivilege(operationName); + if (nn.isStandbyState()) { + throw new StandbyException("Not supported by Standby Namenode."); + } + return namesystem.getFilePath(inodeId); + } + + @Override + public Long getNextSPSPathId() throws IOException { + checkNNStartup(); + String operationName = "getNextSPSPathId"; + namesystem.checkSuperuserPrivilege(operationName); + if (nn.isStandbyState()) { + throw new StandbyException("Not supported by Standby Namenode."); + } + return namesystem.getBlockManager().getNextSPSPathId(); + } + + @Override + public boolean checkDNSpaceForScheduling(DatanodeInfo dn, + StorageType type, long estimatedSize) throws IOException { + checkNNStartup(); + String operationName = "checkDNSpaceForScheduling"; + namesystem.checkSuperuserPrivilege(operationName); + if (nn.isStandbyState()) { + throw new StandbyException("Not supported by Standby Namenode."); + } + return namesystem.getBlockManager().getDatanodeManager() + .verifyTargetDatanodeHasSpaceForScheduling(dn, type, estimatedSize); + } + + @Override + public boolean hasLowRedundancyBlocks(long inodeId) throws IOException { + checkNNStartup(); + String operationName = "hasLowRedundancyBlocks"; + namesystem.checkSuperuserPrivilege(operationName); + if (nn.isStandbyState()) { + throw new StandbyException("Not supported by Standby Namenode."); + } + return namesystem.getBlockManager().hasLowRedundancyBlocks(inodeId); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java index 39c50a711e..8a10183c08 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java @@ -319,12 +319,16 @@ public void run() { String reClass = t.getClass().getName(); if (InterruptedException.class.getName().equals(reClass)) { LOG.info("SPSPathIdProcessor thread is interrupted. Stopping.."); - Thread.currentThread().interrupt(); break; } LOG.warn("Exception while scanning file inodes to satisfy the policy", t); - // TODO: may be we should retry the current inode id? + try { + Thread.sleep(3000); + } catch (InterruptedException e) { + LOG.info("Interrupted while waiting in SPSPathIdProcessor", t); + break; + } } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java index f103dfe61c..bddbc1b847 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java @@ -149,8 +149,8 @@ DatanodeStorageReport[] getLiveDatanodeStorageReport() * @return true if the given datanode has sufficient space to occupy blockSize * data, false otherwise. */ - boolean verifyTargetDatanodeHasSpaceForScheduling(DatanodeInfo dn, - StorageType type, long blockSize); + boolean checkDNSpaceForScheduling(DatanodeInfo dn, StorageType type, + long blockSize); /** * @return next SPS path id to process. @@ -175,4 +175,9 @@ boolean verifyTargetDatanodeHasSpaceForScheduling(DatanodeInfo dn, */ String getFilePath(Long inodeId); + /** + * Close the resources. + */ + void close() throws IOException; + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java index c658812f8f..191886c70b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java @@ -30,7 +30,6 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; -import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; @@ -98,17 +97,8 @@ public DatanodeStorageReport[] getLiveDatanodeStorageReport() } @Override - public boolean hasLowRedundancyBlocks(long inodeID) { - namesystem.readLock(); - try { - BlockCollection bc = namesystem.getBlockCollection(inodeID); - if (bc == null) { - return false; - } - return blockManager.hasLowRedundancyBlocks(bc); - } finally { - namesystem.readUnlock(); - } + public boolean hasLowRedundancyBlocks(long inodeId) { + return blockManager.hasLowRedundancyBlocks(inodeId); } @Override @@ -170,8 +160,8 @@ public long getFileID(String path) throws UnresolvedLinkException, } @Override - public boolean verifyTargetDatanodeHasSpaceForScheduling(DatanodeInfo dn, - StorageType type, long blockSize) { + public boolean checkDNSpaceForScheduling(DatanodeInfo dn, StorageType type, + long blockSize) { namesystem.readLock(); try { DatanodeDescriptor datanode = blockManager.getDatanodeManager() @@ -205,4 +195,9 @@ public void removeAllSPSPathIds() { public String getFilePath(Long inodeId) { return namesystem.getFilePath(inodeId); } + + @Override + public void close() throws IOException { + // Nothing to clean. + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java index 33ad6f418f..89799fc92f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java @@ -325,6 +325,9 @@ public void run() { } } } + } else { + LOG.info("Namenode is in safemode. It will retry again."); + Thread.sleep(3000); } int numLiveDn = ctxt.getNumLiveDataNodes(); if (storageMovementNeeded.size() == 0 @@ -706,8 +709,8 @@ private void buildStripedBlockMovingInfos(LocatedBlock blockInfo, private StorageTypeNodePair chooseTargetTypeInSameNode(LocatedBlock blockInfo, DatanodeInfo source, List targetTypes) { for (StorageType t : targetTypes) { - boolean goodTargetDn = ctxt.verifyTargetDatanodeHasSpaceForScheduling( - source, t, blockInfo.getBlockSize()); + boolean goodTargetDn = + ctxt.checkDNSpaceForScheduling(source, t, blockInfo.getBlockSize()); if (goodTargetDn) { return new StorageTypeNodePair(t, source); } @@ -720,8 +723,8 @@ private StorageTypeNodePair chooseTarget(LocatedBlock block, StorageTypeNodeMap locsForExpectedStorageTypes, List excludeNodes) { for (StorageType t : targetTypes) { - List nodesWithStorages = locsForExpectedStorageTypes - .getNodesWithStorages(t); + List nodesWithStorages = + locsForExpectedStorageTypes.getNodesWithStorages(t); if (nodesWithStorages == null || nodesWithStorages.isEmpty()) { continue; // no target nodes with the required storage type. } @@ -729,8 +732,8 @@ private StorageTypeNodePair chooseTarget(LocatedBlock block, for (DatanodeInfo target : nodesWithStorages) { if (!excludeNodes.contains(target) && matcher.match(ctxt.getNetworkTopology(), source, target)) { - boolean goodTargetDn = ctxt.verifyTargetDatanodeHasSpaceForScheduling( - target, t, block.getBlockSize()); + boolean goodTargetDn = + ctxt.checkDNSpaceForScheduling(target, t, block.getBlockSize()); if (goodTargetDn) { return new StorageTypeNodePair(t, target); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java index 0c8adc639a..9f5caddd2f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java @@ -21,6 +21,7 @@ import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; @@ -31,7 +32,8 @@ /***************************************************************************** * Protocol that a secondary NameNode uses to communicate with the NameNode. - * It's used to get part of the name node state + * Also used by external storage policy satisfier. It's used to get part of the + * name node state *****************************************************************************/ @KerberosInfo( serverPrincipal = DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY) @@ -202,5 +204,47 @@ public RemoteEditLogManifest getEditLogManifest(long sinceTxId) */ @Idempotent boolean isRollingUpgrade() throws IOException; + + /** + * Gets the file path for the given file id. This API used by External SPS. + * + * @param inodeId + * - file inode id. + * @return path + */ + @Idempotent + String getFilePath(Long inodeId) throws IOException; + + /** + * @return Gets the next available sps path id, otherwise null. This API used + * by External SPS. + */ + @AtMostOnce + Long getNextSPSPathId() throws IOException; + + /** + * Verifies whether the given Datanode has the enough estimated size with + * given storage type for scheduling the block. This API used by External SPS. + * + * @param dn + * - datanode + * @param type + * - storage type + * @param estimatedSize + * - size + */ + @Idempotent + boolean checkDNSpaceForScheduling(DatanodeInfo dn, StorageType type, + long estimatedSize) throws IOException; + + /** + * Check if any low redundancy blocks for given file id. This API used by + * External SPS. + * + * @param inodeID + * - inode id. + */ + @Idempotent + boolean hasLowRedundancyBlocks(long inodeID) throws IOException; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSContext.java new file mode 100644 index 0000000000..e5b04bab04 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSContext.java @@ -0,0 +1,271 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.sps; + +import java.io.IOException; +import java.net.URI; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.ParentNotDirectoryException; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.fs.UnresolvedLinkException; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; +import org.apache.hadoop.hdfs.server.namenode.sps.Context; +import org.apache.hadoop.hdfs.server.namenode.sps.SPSService; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; +import org.apache.hadoop.net.NetworkTopology; +import org.apache.hadoop.security.AccessControlException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class used to connect to Namenode and gets the required information to + * SPS from Namenode state. + */ +@InterfaceAudience.Private +public class ExternalSPSContext implements Context { + public static final Logger LOG = + LoggerFactory.getLogger(ExternalSPSContext.class); + private SPSService service; + private NameNodeConnector nnc = null; + private Object nnConnectionLock = new Object(); + private BlockStoragePolicySuite createDefaultSuite = + BlockStoragePolicySuite.createDefaultSuite(); + + public ExternalSPSContext(SPSService service) { + this.service = service; + initializeNamenodeConnector(); + } + + @Override + public boolean isRunning() { + return service.isRunning(); + } + + @Override + public boolean isInSafeMode() { + initializeNamenodeConnector(); + try { + return nnc != null ? nnc.getDistributedFileSystem().isInSafeMode() + : false; + } catch (IOException e) { + LOG.warn("Exception while creating Namenode Connector..", e); + return false; + } + } + + @Override + public boolean isMoverRunning() { + initializeNamenodeConnector(); + try { + FSDataOutputStream out = nnc.getDistributedFileSystem() + .append(HdfsServerConstants.MOVER_ID_PATH); + out.close(); + return false; + } catch (IOException ioe) { + LOG.warn("Exception while checking mover is running..", ioe); + return true; + } + + } + + @Override + public long getFileID(String path) throws UnresolvedLinkException, + AccessControlException, ParentNotDirectoryException { + initializeNamenodeConnector(); + HdfsFileStatus fs = null; + try { + fs = (HdfsFileStatus) nnc.getDistributedFileSystem().getFileStatus( + new Path(path)); + LOG.info("Fetched the fileID:{} for the path:{}", fs.getFileId(), path); + } catch (IllegalArgumentException | IOException e) { + LOG.warn("Exception while getting file is for the given path:{}.", path, + e); + } + return fs != null ? fs.getFileId() : 0; + } + + @Override + public NetworkTopology getNetworkTopology() { + return NetworkTopology.getInstance(service.getConf()); + } + + @Override + public boolean isFileExist(long inodeId) { + initializeNamenodeConnector(); + String filePath = null; + try { + filePath = getFilePath(inodeId); + return nnc.getDistributedFileSystem().exists(new Path(filePath)); + } catch (IllegalArgumentException | IOException e) { + LOG.warn("Exception while getting file is for the given path:{} " + + "and fileId:{}", filePath, inodeId, e); + } + return false; + } + + @Override + public BlockStoragePolicy getStoragePolicy(byte policyId) { + return createDefaultSuite.getPolicy(policyId); + } + + @Override + public void addDropPreviousSPSWorkAtDNs() { + // Nothing todo + } + + @Override + public void removeSPSHint(long inodeId) throws IOException { + initializeNamenodeConnector(); + nnc.getDistributedFileSystem().removeXAttr(new Path(getFilePath(inodeId)), + HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY); + } + + @Override + public int getNumLiveDataNodes() { + initializeNamenodeConnector(); + try { + return nnc.getDistributedFileSystem() + .getDataNodeStats(DatanodeReportType.LIVE).length; + } catch (IOException e) { + LOG.warn("Exception while getting number of live datanodes.", e); + } + return 0; + } + + @Override + public HdfsFileStatus getFileInfo(long inodeID) throws IOException { + initializeNamenodeConnector(); + return nnc.getDistributedFileSystem().getClient() + .getLocatedFileInfo(getFilePath(inodeID), false); + } + + @Override + public DatanodeStorageReport[] getLiveDatanodeStorageReport() + throws IOException { + initializeNamenodeConnector(); + return nnc.getLiveDatanodeStorageReport(); + } + + @Override + public boolean hasLowRedundancyBlocks(long inodeID) { + initializeNamenodeConnector(); + try { + return nnc.getNNProtocolConnection().hasLowRedundancyBlocks(inodeID); + } catch (IOException e) { + LOG.warn("Failed to check whether fileid:{} has low redundancy blocks.", + inodeID, e); + return false; + } + } + + @Override + public boolean checkDNSpaceForScheduling(DatanodeInfo dn, StorageType type, + long estimatedSize) { + initializeNamenodeConnector(); + try { + return nnc.getNNProtocolConnection().checkDNSpaceForScheduling(dn, type, + estimatedSize); + } catch (IOException e) { + LOG.warn("Verify the given datanode:{} is good and has " + + "estimated space in it.", dn, e); + return false; + } + } + + @Override + public Long getNextSPSPathId() { + initializeNamenodeConnector(); + try { + return nnc.getNNProtocolConnection().getNextSPSPathId(); + } catch (IOException e) { + LOG.warn("Exception while getting next sps path id from Namenode.", e); + return null; + } + } + + @Override + public void removeSPSPathId(long pathId) { + // We need not specifically implement for external. + } + + @Override + public void removeAllSPSPathIds() { + // We need not specifically implement for external. + } + + @Override + public String getFilePath(Long inodeId) { + try { + return nnc.getNNProtocolConnection().getFilePath(inodeId); + } catch (IOException e) { + LOG.warn("Exception while getting file path id:{} from Namenode.", + inodeId, e); + return null; + } + } + + @Override + public void close() throws IOException { + synchronized (nnConnectionLock) { + if (nnc != null) { + nnc.close(); + } + } + } + + private void initializeNamenodeConnector() { + synchronized (nnConnectionLock) { + if (nnc == null) { + try { + nnc = getNameNodeConnector(service.getConf()); + } catch (IOException e) { + LOG.warn("Exception while creating Namenode Connector.." + + "Namenode might not have started.", e); + } + } + } + } + + public static NameNodeConnector getNameNodeConnector(Configuration conf) + throws IOException { + final Collection namenodes = DFSUtil.getInternalNsRpcUris(conf); + List nncs = Collections.emptyList(); + NameNodeConnector.checkOtherInstanceRunning(false); + nncs = NameNodeConnector.newNameNodeConnectors(namenodes, + ExternalSPSContext.class.getSimpleName(), + HdfsServerConstants.MOVER_ID_PATH, conf, + NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS); + return nncs.get(0); + } + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/NamenodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/NamenodeProtocol.proto index 683dc80232..b0e900d450 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/NamenodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/NamenodeProtocol.proto @@ -206,6 +206,39 @@ message IsRollingUpgradeResponseProto { required bool isRollingUpgrade = 1; } +message GetFilePathRequestProto { + required uint64 fileId = 1; +} + +message GetFilePathResponseProto { + required string srcPath = 1; +} + +message GetNextSPSPathIdRequestProto { +} + +message GetNextSPSPathIdResponseProto { + optional uint64 fileId = 1; +} + +message CheckDNSpaceRequestProto { + required DatanodeInfoProto dnInfo = 1; + required StorageTypeProto storageType = 2; + required uint64 estimatedSize = 3; +} + +message CheckDNSpaceResponseProto { + required bool isGoodDatanodeWithSpace = 1; +} + +message HasLowRedundancyBlocksRequestProto { + required uint64 inodeId = 1; +} + +message HasLowRedundancyBlocksResponseProto { + required bool hasLowRedundancyBlocks = 1; +} + /** * Protocol used by the sub-ordinate namenode to send requests * the active/primary namenode. @@ -287,4 +320,28 @@ service NamenodeProtocolService { */ rpc isRollingUpgrade(IsRollingUpgradeRequestProto) returns (IsRollingUpgradeResponseProto); + + /** + * Return the corresponding file path for give file id + */ + rpc getFilePath(GetFilePathRequestProto) + returns (GetFilePathResponseProto); + + /** + * Return the sps path id from namenode + */ + rpc getNextSPSPathId(GetNextSPSPathIdRequestProto) + returns (GetNextSPSPathIdResponseProto); + + /** + * Return the sps path id from namenode + */ + rpc checkDNSpaceForScheduling(CheckDNSpaceRequestProto) + returns (CheckDNSpaceResponseProto); + + /** + * check whether given file id has low redundancy blocks. + */ + rpc hasLowRedundancyBlocks(HasLowRedundancyBlocksRequestProto) + returns (HasLowRedundancyBlocksResponseProto); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java index 42b04da82c..fe08b8ff25 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java @@ -37,7 +37,6 @@ import org.apache.hadoop.hdfs.server.namenode.sps.BlockMovementListener; import org.apache.hadoop.hdfs.server.namenode.sps.Context; import org.apache.hadoop.hdfs.server.namenode.sps.FileIdCollector; -import org.apache.hadoop.hdfs.server.namenode.sps.IntraSPSNameNodeContext; import org.apache.hadoop.hdfs.server.namenode.sps.SPSService; import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier; import org.apache.hadoop.hdfs.server.namenode.sps.TestStoragePolicySatisfier; @@ -96,14 +95,8 @@ public MiniDFSCluster startCluster(final Configuration conf, SPSService spsService = blkMgr.getSPSService(); spsService.stopGracefully(); - // TODO: Since External is not fully implemented, just used INTERNAL now. - // Need to set External context here. - IntraSPSNameNodeContext context = new IntraSPSNameNodeContext( - cluster.getNameNode().getNamesystem(), blkMgr, blkMgr.getSPSService()) { - public boolean isRunning() { - return true; - }; - }; + ExternalSPSContext context = new ExternalSPSContext(spsService); + ExternalBlockMovementListener blkMoveListener = new ExternalBlockMovementListener(); ExternalSPSBlockMoveTaskHandler externalHandler = @@ -131,15 +124,7 @@ public void restartNamenode() throws IOException{ spsService = blkMgr.getSPSService(); spsService.stopGracefully(); - // TODO: Since External is not fully implemented, just used INTERNAL now. - // Need to set External context here. - IntraSPSNameNodeContext context = new IntraSPSNameNodeContext( - getCluster().getNameNode().getNamesystem(), blkMgr, - blkMgr.getSPSService()) { - public boolean isRunning() { - return true; - }; - }; + ExternalSPSContext context = new ExternalSPSContext(spsService); ExternalBlockMovementListener blkMoveListener = new ExternalBlockMovementListener(); ExternalSPSBlockMoveTaskHandler externalHandler = @@ -180,7 +165,7 @@ private NameNodeConnector getNameNodeConnector(Configuration conf) for (URI nn : namenodes) { nnMap.put(nn, null); } - final Path externalSPSPathId = new Path("/system/externalSPS.id"); + final Path externalSPSPathId = new Path("/system/tmp.id"); final List nncs = NameNodeConnector .newNameNodeConnectors(nnMap, StoragePolicySatisfier.class.getSimpleName(), externalSPSPathId, @@ -204,6 +189,14 @@ public void testBatchProcessingForSPSDirectory() throws Exception { public void testStoragePolicySatisfyPathStatus() throws Exception { } + /** + * This test case is more specific to internal. + */ + @Ignore("This test is specific to internal, so skipping here.") + public void testWhenMoverIsAlreadyRunningBeforeStoragePolicySatisfier() + throws Exception { + } + /** * Status won't be supported for external SPS, now. So, ignoring it. */