HDFS-11695: [SPS]: Namenode failed to start while loading SPS xAttrs from the edits log. Contributed by Surendra Singh Lilhore.
This commit is contained in:
parent
6fe6c549e8
commit
5ce332dc9a
@ -27,7 +27,6 @@
|
|||||||
import org.apache.hadoop.fs.XAttrSetFlag;
|
import org.apache.hadoop.fs.XAttrSetFlag;
|
||||||
import org.apache.hadoop.fs.permission.FsAction;
|
import org.apache.hadoop.fs.permission.FsAction;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.hdfs.XAttrHelper;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
|
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
|
||||||
@ -43,14 +42,12 @@
|
|||||||
|
|
||||||
import java.io.FileNotFoundException;
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_QUOTA_BY_STORAGETYPE_ENABLED_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_QUOTA_BY_STORAGETYPE_ENABLED_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY;
|
||||||
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY;
|
|
||||||
|
|
||||||
public class FSDirAttrOp {
|
public class FSDirAttrOp {
|
||||||
static FileStatus setPermission(
|
static FileStatus setPermission(
|
||||||
@ -193,29 +190,6 @@ static FileStatus setStoragePolicy(FSDirectory fsd, FSPermissionChecker pc,
|
|||||||
return fsd.getAuditFileInfo(iip);
|
return fsd.getAuditFileInfo(iip);
|
||||||
}
|
}
|
||||||
|
|
||||||
static FileStatus satisfyStoragePolicy(FSDirectory fsd, BlockManager bm,
|
|
||||||
String src, boolean logRetryCache) throws IOException {
|
|
||||||
|
|
||||||
FSPermissionChecker pc = fsd.getPermissionChecker();
|
|
||||||
List<XAttr> xAttrs = Lists.newArrayListWithCapacity(1);
|
|
||||||
INodesInPath iip;
|
|
||||||
fsd.writeLock();
|
|
||||||
try {
|
|
||||||
|
|
||||||
// check operation permission.
|
|
||||||
iip = fsd.resolvePath(pc, src, DirOp.WRITE);
|
|
||||||
if (fsd.isPermissionEnabled()) {
|
|
||||||
fsd.checkPathAccess(pc, iip, FsAction.WRITE);
|
|
||||||
}
|
|
||||||
XAttr satisfyXAttr = unprotectedSatisfyStoragePolicy(iip, bm, fsd);
|
|
||||||
xAttrs.add(satisfyXAttr);
|
|
||||||
} finally {
|
|
||||||
fsd.writeUnlock();
|
|
||||||
}
|
|
||||||
fsd.getEditLog().logSetXAttrs(src, xAttrs, logRetryCache);
|
|
||||||
return fsd.getAuditFileInfo(iip);
|
|
||||||
}
|
|
||||||
|
|
||||||
static BlockStoragePolicy[] getStoragePolicies(BlockManager bm)
|
static BlockStoragePolicy[] getStoragePolicies(BlockManager bm)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
return bm.getStoragePolicies();
|
return bm.getStoragePolicies();
|
||||||
@ -477,71 +451,6 @@ static void unprotectedSetStoragePolicy(FSDirectory fsd, BlockManager bm,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static XAttr unprotectedSatisfyStoragePolicy(INodesInPath iip,
|
|
||||||
BlockManager bm, FSDirectory fsd) throws IOException {
|
|
||||||
|
|
||||||
final INode inode = FSDirectory.resolveLastINode(iip);
|
|
||||||
final int snapshotId = iip.getLatestSnapshotId();
|
|
||||||
final List<INode> candidateNodes = new ArrayList<>();
|
|
||||||
|
|
||||||
// TODO: think about optimization here, label the dir instead
|
|
||||||
// of the sub-files of the dir.
|
|
||||||
if (inode.isFile()) {
|
|
||||||
candidateNodes.add(inode);
|
|
||||||
} else if (inode.isDirectory()) {
|
|
||||||
for (INode node : inode.asDirectory().getChildrenList(snapshotId)) {
|
|
||||||
if (node.isFile()) {
|
|
||||||
candidateNodes.add(node);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// If node has satisfy xattr, then stop adding it
|
|
||||||
// to satisfy movement queue.
|
|
||||||
if (inodeHasSatisfyXAttr(candidateNodes)) {
|
|
||||||
throw new IOException(
|
|
||||||
"Cannot request to call satisfy storage policy on path "
|
|
||||||
+ iip.getPath()
|
|
||||||
+ ", as this file/dir was already called for satisfying "
|
|
||||||
+ "storage policy.");
|
|
||||||
}
|
|
||||||
|
|
||||||
final List<XAttr> xattrs = Lists.newArrayListWithCapacity(1);
|
|
||||||
final XAttr satisfyXAttr =
|
|
||||||
XAttrHelper.buildXAttr(XATTR_SATISFY_STORAGE_POLICY);
|
|
||||||
xattrs.add(satisfyXAttr);
|
|
||||||
|
|
||||||
for (INode node : candidateNodes) {
|
|
||||||
bm.satisfyStoragePolicy(node.getId());
|
|
||||||
List<XAttr> existingXAttrs = XAttrStorage.readINodeXAttrs(node);
|
|
||||||
List<XAttr> newXAttrs = FSDirXAttrOp.setINodeXAttrs(
|
|
||||||
fsd, existingXAttrs, xattrs, EnumSet.of(XAttrSetFlag.CREATE));
|
|
||||||
XAttrStorage.updateINodeXAttrs(node, newXAttrs, snapshotId);
|
|
||||||
}
|
|
||||||
return satisfyXAttr;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static boolean inodeHasSatisfyXAttr(List<INode> candidateNodes) {
|
|
||||||
// If the node is a directory and one of the child files
|
|
||||||
// has satisfy xattr, then return true for this directory.
|
|
||||||
for (INode inode : candidateNodes) {
|
|
||||||
final XAttrFeature f = inode.getXAttrFeature();
|
|
||||||
if (inode.isFile() &&
|
|
||||||
f != null && f.getXAttr(XATTR_SATISFY_STORAGE_POLICY) != null) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void unprotectedRemoveSPSXAttr(INode inode, XAttr spsXAttr)
|
|
||||||
throws IOException{
|
|
||||||
List<XAttr> existingXAttrs = XAttrStorage.readINodeXAttrs(inode);
|
|
||||||
existingXAttrs.remove(spsXAttr);
|
|
||||||
XAttrStorage.updateINodeXAttrs(inode, existingXAttrs,
|
|
||||||
INodesInPath.fromINode(inode).getLatestSnapshotId());
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void setDirStoragePolicy(
|
private static void setDirStoragePolicy(
|
||||||
FSDirectory fsd, INodesInPath iip, byte policyId) throws IOException {
|
FSDirectory fsd, INodesInPath iip, byte policyId) throws IOException {
|
||||||
INode inode = FSDirectory.resolveLastINode(iip);
|
INode inode = FSDirectory.resolveLastINode(iip);
|
||||||
|
@ -0,0 +1,145 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.namenode;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.EnumSet;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
import org.apache.hadoop.fs.XAttr;
|
||||||
|
import org.apache.hadoop.fs.XAttrSetFlag;
|
||||||
|
import org.apache.hadoop.fs.permission.FsAction;
|
||||||
|
import org.apache.hadoop.hdfs.XAttrHelper;
|
||||||
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
|
||||||
|
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Helper class to perform storage policy satisfier related operations.
|
||||||
|
*/
|
||||||
|
final class FSDirSatisfyStoragePolicyOp {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Private constructor for preventing FSDirSatisfyStoragePolicyOp object
|
||||||
|
* creation. Static-only class.
|
||||||
|
*/
|
||||||
|
private FSDirSatisfyStoragePolicyOp() {
|
||||||
|
}
|
||||||
|
|
||||||
|
static FileStatus satisfyStoragePolicy(FSDirectory fsd, BlockManager bm,
|
||||||
|
String src, boolean logRetryCache) throws IOException {
|
||||||
|
|
||||||
|
assert fsd.getFSNamesystem().hasWriteLock();
|
||||||
|
FSPermissionChecker pc = fsd.getPermissionChecker();
|
||||||
|
List<XAttr> xAttrs = Lists.newArrayListWithCapacity(1);
|
||||||
|
INodesInPath iip;
|
||||||
|
fsd.writeLock();
|
||||||
|
try {
|
||||||
|
|
||||||
|
// check operation permission.
|
||||||
|
iip = fsd.resolvePath(pc, src, DirOp.WRITE);
|
||||||
|
if (fsd.isPermissionEnabled()) {
|
||||||
|
fsd.checkPathAccess(pc, iip, FsAction.WRITE);
|
||||||
|
}
|
||||||
|
XAttr satisfyXAttr = unprotectedSatisfyStoragePolicy(iip, bm, fsd);
|
||||||
|
xAttrs.add(satisfyXAttr);
|
||||||
|
fsd.getEditLog().logSetXAttrs(src, xAttrs, logRetryCache);
|
||||||
|
} finally {
|
||||||
|
fsd.writeUnlock();
|
||||||
|
}
|
||||||
|
return fsd.getAuditFileInfo(iip);
|
||||||
|
}
|
||||||
|
|
||||||
|
static XAttr unprotectedSatisfyStoragePolicy(INodesInPath iip,
|
||||||
|
BlockManager bm, FSDirectory fsd) throws IOException {
|
||||||
|
|
||||||
|
final INode inode = FSDirectory.resolveLastINode(iip);
|
||||||
|
final int snapshotId = iip.getLatestSnapshotId();
|
||||||
|
final List<INode> candidateNodes = new ArrayList<>();
|
||||||
|
|
||||||
|
// TODO: think about optimization here, label the dir instead
|
||||||
|
// of the sub-files of the dir.
|
||||||
|
if (inode.isFile()) {
|
||||||
|
candidateNodes.add(inode);
|
||||||
|
} else if (inode.isDirectory()) {
|
||||||
|
for (INode node : inode.asDirectory().getChildrenList(snapshotId)) {
|
||||||
|
if (node.isFile()) {
|
||||||
|
candidateNodes.add(node);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// If node has satisfy xattr, then stop adding it
|
||||||
|
// to satisfy movement queue.
|
||||||
|
if (inodeHasSatisfyXAttr(candidateNodes)) {
|
||||||
|
throw new IOException(
|
||||||
|
"Cannot request to call satisfy storage policy on path "
|
||||||
|
+ iip.getPath()
|
||||||
|
+ ", as this file/dir was already called for satisfying "
|
||||||
|
+ "storage policy.");
|
||||||
|
}
|
||||||
|
|
||||||
|
final List<XAttr> xattrs = Lists.newArrayListWithCapacity(1);
|
||||||
|
final XAttr satisfyXAttr = XAttrHelper
|
||||||
|
.buildXAttr(XATTR_SATISFY_STORAGE_POLICY);
|
||||||
|
xattrs.add(satisfyXAttr);
|
||||||
|
|
||||||
|
for (INode node : candidateNodes) {
|
||||||
|
bm.satisfyStoragePolicy(node.getId());
|
||||||
|
List<XAttr> existingXAttrs = XAttrStorage.readINodeXAttrs(node);
|
||||||
|
List<XAttr> newXAttrs = FSDirXAttrOp.setINodeXAttrs(fsd, existingXAttrs,
|
||||||
|
xattrs, EnumSet.of(XAttrSetFlag.CREATE));
|
||||||
|
XAttrStorage.updateINodeXAttrs(node, newXAttrs, snapshotId);
|
||||||
|
}
|
||||||
|
return satisfyXAttr;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static boolean inodeHasSatisfyXAttr(List<INode> candidateNodes) {
|
||||||
|
// If the node is a directory and one of the child files
|
||||||
|
// has satisfy xattr, then return true for this directory.
|
||||||
|
for (INode inode : candidateNodes) {
|
||||||
|
final XAttrFeature f = inode.getXAttrFeature();
|
||||||
|
if (inode.isFile() && f != null
|
||||||
|
&& f.getXAttr(XATTR_SATISFY_STORAGE_POLICY) != null) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void removeSPSXattr(FSDirectory fsd, INode inode, XAttr spsXAttr)
|
||||||
|
throws IOException {
|
||||||
|
try {
|
||||||
|
fsd.writeLock();
|
||||||
|
List<XAttr> existingXAttrs = XAttrStorage.readINodeXAttrs(inode);
|
||||||
|
existingXAttrs.remove(spsXAttr);
|
||||||
|
XAttrStorage.updateINodeXAttrs(inode, existingXAttrs, INodesInPath
|
||||||
|
.fromINode(inode).getLatestSnapshotId());
|
||||||
|
List<XAttr> xAttrs = Lists.newArrayListWithCapacity(1);
|
||||||
|
xAttrs.add(spsXAttr);
|
||||||
|
fsd.getEditLog().logRemoveXAttrs(inode.getFullPathName(), xAttrs, false);
|
||||||
|
} finally {
|
||||||
|
fsd.writeUnlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -297,7 +297,7 @@ static INode unprotectedSetXAttrs(
|
|||||||
|
|
||||||
// Add inode id to movement queue if xattrs contain satisfy xattr.
|
// Add inode id to movement queue if xattrs contain satisfy xattr.
|
||||||
if (XATTR_SATISFY_STORAGE_POLICY.equals(xaName)) {
|
if (XATTR_SATISFY_STORAGE_POLICY.equals(xaName)) {
|
||||||
FSDirAttrOp.unprotectedSatisfyStoragePolicy(iip,
|
FSDirSatisfyStoragePolicyOp.unprotectedSatisfyStoragePolicy(iip,
|
||||||
fsd.getBlockManager(), fsd);
|
fsd.getBlockManager(), fsd);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -1418,22 +1418,6 @@ private void addStoragePolicySatisfier(INodeWithAdditionalFields inode,
|
|||||||
getBlockManager().satisfyStoragePolicy(inode.getId());
|
getBlockManager().satisfyStoragePolicy(inode.getId());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Remove the SPS xattr from the inode, retrieve the inode from the
|
|
||||||
* block collection id.
|
|
||||||
* @param id
|
|
||||||
* - file block collection id.
|
|
||||||
*/
|
|
||||||
public void removeSPSXattr(long id) throws IOException {
|
|
||||||
final INode inode = getInode(id);
|
|
||||||
final XAttrFeature xaf = inode.getXAttrFeature();
|
|
||||||
final XAttr spsXAttr = xaf.getXAttr(XATTR_SATISFY_STORAGE_POLICY);
|
|
||||||
|
|
||||||
if (spsXAttr != null) {
|
|
||||||
FSDirAttrOp.unprotectedRemoveSPSXAttr(inode, spsXAttr);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void addEncryptionZone(INodeWithAdditionalFields inode,
|
private void addEncryptionZone(INodeWithAdditionalFields inode,
|
||||||
XAttrFeature xaf) {
|
XAttrFeature xaf) {
|
||||||
if (xaf == null) {
|
if (xaf == null) {
|
||||||
|
@ -2262,10 +2262,12 @@ void satisfyStoragePolicy(String src, boolean logRetryCache)
|
|||||||
+ " by admin. Seek for an admin help to activate it "
|
+ " by admin. Seek for an admin help to activate it "
|
||||||
+ "or use Mover tool.");
|
+ "or use Mover tool.");
|
||||||
}
|
}
|
||||||
FSDirAttrOp.satisfyStoragePolicy(dir, blockManager, src, logRetryCache);
|
FSDirSatisfyStoragePolicyOp.satisfyStoragePolicy(dir, blockManager, src,
|
||||||
|
logRetryCache);
|
||||||
} finally {
|
} finally {
|
||||||
writeUnlock();
|
writeUnlock();
|
||||||
}
|
}
|
||||||
|
getEditLog().logSync();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -7855,6 +7857,26 @@ void removeXAttr(String src, XAttr xAttr, boolean logRetryCache)
|
|||||||
logAuditEvent(true, operationName, src, null, auditStat);
|
logAuditEvent(true, operationName, src, null, auditStat);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void removeXattr(long id, String xattrName) throws IOException {
|
||||||
|
writeLock();
|
||||||
|
try {
|
||||||
|
final INode inode = dir.getInode(id);
|
||||||
|
final XAttrFeature xaf = inode.getXAttrFeature();
|
||||||
|
if (xaf == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
final XAttr spsXAttr = xaf.getXAttr(xattrName);
|
||||||
|
|
||||||
|
if (spsXAttr != null) {
|
||||||
|
FSDirSatisfyStoragePolicyOp.removeSPSXattr(dir, inode, spsXAttr);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
writeUnlock("removeXAttr");
|
||||||
|
}
|
||||||
|
getEditLog().logSync();
|
||||||
|
}
|
||||||
|
|
||||||
void checkAccess(String src, FsAction mode) throws IOException {
|
void checkAccess(String src, FsAction mode) throws IOException {
|
||||||
final String operationName = "checkAccess";
|
final String operationName = "checkAccess";
|
||||||
checkOperation(OperationCategory.READ);
|
checkOperation(OperationCategory.READ);
|
||||||
|
@ -17,6 +17,8 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.namenode;
|
package org.apache.hadoop.hdfs.server.namenode;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
|
import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
|
||||||
@ -52,4 +54,12 @@ public interface Namesystem extends RwLock, SafeMode {
|
|||||||
* @return true if valid write lease exists, otherwise return false.
|
* @return true if valid write lease exists, otherwise return false.
|
||||||
*/
|
*/
|
||||||
boolean isFileOpenedForWrite(String filePath);
|
boolean isFileOpenedForWrite(String filePath);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Remove xAttr from the inode.
|
||||||
|
* @param id
|
||||||
|
* @param xattrName
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
void removeXattr(long id, String xattrName) throws IOException;
|
||||||
}
|
}
|
||||||
|
@ -17,6 +17,8 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.namenode;
|
package org.apache.hadoop.hdfs.server.namenode;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
@ -829,6 +831,6 @@ private void clearQueuesWithNotification() {
|
|||||||
*/
|
*/
|
||||||
public void postBlkStorageMovementCleanup(long trackId)
|
public void postBlkStorageMovementCleanup(long trackId)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
this.namesystem.getFSDirectory().removeSPSXattr(trackId);
|
this.namesystem.removeXattr(trackId, XATTR_SATISFY_STORAGE_POLICY);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -32,6 +32,7 @@
|
|||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY;
|
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY;
|
||||||
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test persistence of satisfying files/directories.
|
* Test persistence of satisfying files/directories.
|
||||||
@ -60,11 +61,9 @@ public class TestPersistentStoragePolicySatisfier {
|
|||||||
private static final String ALL_SSD = "ALL_SSD";
|
private static final String ALL_SSD = "ALL_SSD";
|
||||||
|
|
||||||
private static StorageType[][] storageTypes = new StorageType[][] {
|
private static StorageType[][] storageTypes = new StorageType[][] {
|
||||||
{StorageType.ARCHIVE, StorageType.DISK},
|
{StorageType.DISK, StorageType.ARCHIVE, StorageType.SSD},
|
||||||
{StorageType.DISK, StorageType.SSD},
|
{StorageType.DISK, StorageType.ARCHIVE, StorageType.SSD},
|
||||||
{StorageType.SSD, StorageType.RAM_DISK},
|
{StorageType.DISK, StorageType.ARCHIVE, StorageType.SSD}
|
||||||
{StorageType.ARCHIVE, StorageType.DISK},
|
|
||||||
{StorageType.ARCHIVE, StorageType.SSD}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
private final int timeout = 300000;
|
private final int timeout = 300000;
|
||||||
@ -94,10 +93,13 @@ public void clusterSetUp(Configuration hdfsConf) throws Exception {
|
|||||||
private void clusterSetUp(boolean isHAEnabled, Configuration newConf)
|
private void clusterSetUp(boolean isHAEnabled, Configuration newConf)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
conf = newConf;
|
conf = newConf;
|
||||||
|
conf.set(
|
||||||
|
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY,
|
||||||
|
"3000");
|
||||||
final int dnNumber = storageTypes.length;
|
final int dnNumber = storageTypes.length;
|
||||||
final short replication = 3;
|
final short replication = 3;
|
||||||
MiniDFSCluster.Builder clusterBuilder = new MiniDFSCluster.Builder(conf)
|
MiniDFSCluster.Builder clusterBuilder = new MiniDFSCluster.Builder(conf)
|
||||||
.storageTypes(storageTypes)
|
.storageTypes(storageTypes).storagesPerDatanode(3)
|
||||||
.numDataNodes(dnNumber);
|
.numDataNodes(dnNumber);
|
||||||
if (isHAEnabled) {
|
if (isHAEnabled) {
|
||||||
clusterBuilder.nnTopology(MiniDFSNNTopology.simpleHATopology());
|
clusterBuilder.nnTopology(MiniDFSNNTopology.simpleHATopology());
|
||||||
@ -277,9 +279,10 @@ public void testWithRestarts() throws Exception {
|
|||||||
*/
|
*/
|
||||||
@Test(timeout = 300000)
|
@Test(timeout = 300000)
|
||||||
public void testWithFederationHA() throws Exception {
|
public void testWithFederationHA() throws Exception {
|
||||||
|
MiniDFSCluster haCluster = null;
|
||||||
try {
|
try {
|
||||||
conf = new HdfsConfiguration();
|
conf = new HdfsConfiguration();
|
||||||
final MiniDFSCluster haCluster = new MiniDFSCluster
|
haCluster = new MiniDFSCluster
|
||||||
.Builder(conf)
|
.Builder(conf)
|
||||||
.nnTopology(MiniDFSNNTopology.simpleHAFederatedTopology(2))
|
.nnTopology(MiniDFSNNTopology.simpleHAFederatedTopology(2))
|
||||||
.storageTypes(storageTypes)
|
.storageTypes(storageTypes)
|
||||||
@ -305,7 +308,14 @@ public void testWithFederationHA() throws Exception {
|
|||||||
testFileName, StorageType.ARCHIVE, 2, timeout, fs);
|
testFileName, StorageType.ARCHIVE, 2, timeout, fs);
|
||||||
|
|
||||||
} finally {
|
} finally {
|
||||||
clusterShutdown();
|
if(fs != null) {
|
||||||
|
fs.close();
|
||||||
|
fs = null;
|
||||||
|
}
|
||||||
|
if(haCluster != null) {
|
||||||
|
haCluster.shutdown(true);
|
||||||
|
haCluster = null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -403,6 +413,70 @@ public void testSPSShouldNotLeakXattrIfStorageAlreadySatisfied()
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test loading of SPS xAttrs from the edits log when satisfyStoragePolicy
|
||||||
|
* called on child file and parent directory.
|
||||||
|
* 1. Create one directory and create one child file.
|
||||||
|
* 2. Set storage policy for child file and call
|
||||||
|
* satisfyStoragePolicy.
|
||||||
|
* 3. wait for SPS to remove xAttr for file child file.
|
||||||
|
* 4. Set storage policy for parent directory and call
|
||||||
|
* satisfyStoragePolicy.
|
||||||
|
* 5. restart the namenode.
|
||||||
|
* NameNode should be started successfully.
|
||||||
|
*/
|
||||||
|
@Test(timeout = 300000)
|
||||||
|
public void testNameNodeRestartWhenSPSCalledOnChildFileAndParentDir()
|
||||||
|
throws Exception {
|
||||||
|
try {
|
||||||
|
clusterSetUp();
|
||||||
|
fs.setStoragePolicy(childFile, "COLD");
|
||||||
|
fs.satisfyStoragePolicy(childFile);
|
||||||
|
DFSTestUtil.waitExpectedStorageType(childFile.toUri().getPath(),
|
||||||
|
StorageType.ARCHIVE, 3, 30000, cluster.getFileSystem());
|
||||||
|
// wait for SPS to remove Xattr from file
|
||||||
|
Thread.sleep(30000);
|
||||||
|
fs.setStoragePolicy(childDir, "COLD");
|
||||||
|
fs.satisfyStoragePolicy(childDir);
|
||||||
|
try {
|
||||||
|
cluster.restartNameNodes();
|
||||||
|
} catch (Exception e) {
|
||||||
|
assertFalse(e.getMessage().contains(
|
||||||
|
"Cannot request to call satisfy storage policy"));
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
clusterShutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test SPS when satisfyStoragePolicy called on child file and
|
||||||
|
* parent directory.
|
||||||
|
* 1. Create one parent directory and child directory.
|
||||||
|
* 2. Create some file in both the directory.
|
||||||
|
* 3. Set storage policy for parent directory and call
|
||||||
|
* satisfyStoragePolicy.
|
||||||
|
* 4. Set storage policy for child directory and call
|
||||||
|
* satisfyStoragePolicy.
|
||||||
|
* 5. restart the namenode.
|
||||||
|
* All the file blocks should satisfy the policy.
|
||||||
|
*/
|
||||||
|
@Test(timeout = 300000)
|
||||||
|
public void testSPSOnChildAndParentDirectory() throws Exception {
|
||||||
|
try {
|
||||||
|
clusterSetUp();
|
||||||
|
fs.setStoragePolicy(parentDir, "COLD");
|
||||||
|
fs.satisfyStoragePolicy(childDir);
|
||||||
|
fs.satisfyStoragePolicy(parentDir);
|
||||||
|
DFSTestUtil.waitExpectedStorageType(childFileName, StorageType.ARCHIVE,
|
||||||
|
3, 30000, cluster.getFileSystem());
|
||||||
|
DFSTestUtil.waitExpectedStorageType(parentFileName, StorageType.ARCHIVE,
|
||||||
|
3, 30000, cluster.getFileSystem());
|
||||||
|
} finally {
|
||||||
|
clusterShutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Restart the hole env and trigger the DataNode's heart beats.
|
* Restart the hole env and trigger the DataNode's heart beats.
|
||||||
* @throws Exception
|
* @throws Exception
|
||||||
|
@ -42,6 +42,7 @@
|
|||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.NameNodeProxies;
|
import org.apache.hadoop.hdfs.NameNodeProxies;
|
||||||
|
import org.apache.hadoop.hdfs.StripedFileTestUtil;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsAdmin;
|
import org.apache.hadoop.hdfs.client.HdfsAdmin;
|
||||||
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
@ -855,7 +856,9 @@ public void testSPSShouldNotLeakXattrIfSatisfyStoragePolicyCallOnECFiles()
|
|||||||
{StorageType.DISK, StorageType.SSD}};
|
{StorageType.DISK, StorageType.SSD}};
|
||||||
|
|
||||||
int defaultStripedBlockSize =
|
int defaultStripedBlockSize =
|
||||||
ErasureCodingPolicyManager.getSystemPolicies()[0].getCellSize() * 4;
|
StripedFileTestUtil.getDefaultECPolicy().getCellSize() * 4;
|
||||||
|
config.set(DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_ENABLED_KEY,
|
||||||
|
StripedFileTestUtil.getDefaultECPolicy().getName());
|
||||||
config.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, defaultStripedBlockSize);
|
config.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, defaultStripedBlockSize);
|
||||||
config.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
|
config.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
|
||||||
config.setLong(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY,
|
config.setLong(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY,
|
||||||
|
Loading…
Reference in New Issue
Block a user