HDFS-8450. Erasure Coding: Consolidate erasure coding zone related implementation into a single class (Contributed by Rakesh R)
This commit is contained in:
parent
b7d6ea8e91
commit
98d340745b
@ -293,3 +293,6 @@
|
|||||||
HDFS-8556. Erasure Coding: Fix usage of 'createZone' (vinayakumarb)
|
HDFS-8556. Erasure Coding: Fix usage of 'createZone' (vinayakumarb)
|
||||||
|
|
||||||
HDFS-8571. Fix TestErasureCodingCli test (Vinayakumar B via waltersu4549)
|
HDFS-8571. Fix TestErasureCodingCli test (Vinayakumar B via waltersu4549)
|
||||||
|
|
||||||
|
HDFS-8450. Erasure Coding: Consolidate erasure coding zone related
|
||||||
|
implementation into a single class (Rakesh R via vinayakumarb)
|
||||||
|
@ -60,14 +60,14 @@ public ErasureCodingZoneManager(FSDirectory dir) {
|
|||||||
this.dir = dir;
|
this.dir = dir;
|
||||||
}
|
}
|
||||||
|
|
||||||
ECSchema getECSchema(INodesInPath iip) throws IOException {
|
ECSchema getErasureCodingSchema(INodesInPath iip) throws IOException {
|
||||||
ErasureCodingZone ecZone = getECZone(iip);
|
ErasureCodingZone ecZone = getErasureCodingZone(iip);
|
||||||
return ecZone == null ? null : ecZone.getSchema();
|
return ecZone == null ? null : ecZone.getSchema();
|
||||||
}
|
}
|
||||||
|
|
||||||
ErasureCodingZone getECZone(INodesInPath iip) throws IOException {
|
ErasureCodingZone getErasureCodingZone(INodesInPath iip) throws IOException {
|
||||||
assert dir.hasReadLock();
|
assert dir.hasReadLock();
|
||||||
Preconditions.checkNotNull(iip);
|
Preconditions.checkNotNull(iip, "INodes cannot be null");
|
||||||
List<INode> inodes = iip.getReadOnlyINodes();
|
List<INode> inodes = iip.getReadOnlyINodes();
|
||||||
for (int i = inodes.size() - 1; i >= 0; i--) {
|
for (int i = inodes.size() - 1; i >= 0; i--) {
|
||||||
final INode inode = inodes.get(i);
|
final INode inode = inodes.get(i);
|
||||||
@ -90,8 +90,8 @@ ErasureCodingZone getECZone(INodesInPath iip) throws IOException {
|
|||||||
DataInputStream dIn=new DataInputStream(bIn);
|
DataInputStream dIn=new DataInputStream(bIn);
|
||||||
int cellSize = WritableUtils.readVInt(dIn);
|
int cellSize = WritableUtils.readVInt(dIn);
|
||||||
String schemaName = WritableUtils.readString(dIn);
|
String schemaName = WritableUtils.readString(dIn);
|
||||||
ECSchema schema = dir.getFSNamesystem().getECSchemaManager()
|
ECSchema schema = dir.getFSNamesystem()
|
||||||
.getSchema(schemaName);
|
.getErasureCodingSchemaManager().getSchema(schemaName);
|
||||||
return new ErasureCodingZone(dir.getInode(inode.getId())
|
return new ErasureCodingZone(dir.getInode(inode.getId())
|
||||||
.getFullPathName(), schema, cellSize);
|
.getFullPathName(), schema, cellSize);
|
||||||
}
|
}
|
||||||
@ -100,22 +100,22 @@ ErasureCodingZone getECZone(INodesInPath iip) throws IOException {
|
|||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
XAttr createErasureCodingZone(String src, ECSchema schema, int cellSize)
|
List<XAttr> createErasureCodingZone(final INodesInPath srcIIP,
|
||||||
throws IOException {
|
ECSchema schema, int cellSize) throws IOException {
|
||||||
assert dir.hasWriteLock();
|
assert dir.hasWriteLock();
|
||||||
final INodesInPath srcIIP = dir.getINodesInPath4Write(src, false);
|
Preconditions.checkNotNull(srcIIP, "INodes cannot be null");
|
||||||
|
String src = srcIIP.getPath();
|
||||||
if (dir.isNonEmptyDirectory(srcIIP)) {
|
if (dir.isNonEmptyDirectory(srcIIP)) {
|
||||||
throw new IOException(
|
throw new IOException(
|
||||||
"Attempt to create an erasure coding zone for a " +
|
"Attempt to create an erasure coding zone for a " +
|
||||||
"non-empty directory.");
|
"non-empty directory " + src);
|
||||||
}
|
}
|
||||||
if (srcIIP != null &&
|
if (srcIIP.getLastINode() != null &&
|
||||||
srcIIP.getLastINode() != null &&
|
|
||||||
!srcIIP.getLastINode().isDirectory()) {
|
!srcIIP.getLastINode().isDirectory()) {
|
||||||
throw new IOException("Attempt to create an erasure coding zone " +
|
throw new IOException("Attempt to create an erasure coding zone " +
|
||||||
"for a file.");
|
"for a file " + src);
|
||||||
}
|
}
|
||||||
if (getECSchema(srcIIP) != null) {
|
if (getErasureCodingSchema(srcIIP) != null) {
|
||||||
throw new IOException("Directory " + src + " is already in an " +
|
throw new IOException("Directory " + src + " is already in an " +
|
||||||
"erasure coding zone.");
|
"erasure coding zone.");
|
||||||
}
|
}
|
||||||
@ -147,14 +147,14 @@ XAttr createErasureCodingZone(String src, ECSchema schema, int cellSize)
|
|||||||
xattrs.add(ecXAttr);
|
xattrs.add(ecXAttr);
|
||||||
FSDirXAttrOp.unprotectedSetXAttrs(dir, src, xattrs,
|
FSDirXAttrOp.unprotectedSetXAttrs(dir, src, xattrs,
|
||||||
EnumSet.of(XAttrSetFlag.CREATE));
|
EnumSet.of(XAttrSetFlag.CREATE));
|
||||||
return ecXAttr;
|
return xattrs;
|
||||||
}
|
}
|
||||||
|
|
||||||
void checkMoveValidity(INodesInPath srcIIP, INodesInPath dstIIP, String src)
|
void checkMoveValidity(INodesInPath srcIIP, INodesInPath dstIIP, String src)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
assert dir.hasReadLock();
|
assert dir.hasReadLock();
|
||||||
final ErasureCodingZone srcZone = getECZone(srcIIP);
|
final ErasureCodingZone srcZone = getErasureCodingZone(srcIIP);
|
||||||
final ErasureCodingZone dstZone = getECZone(dstIIP);
|
final ErasureCodingZone dstZone = getErasureCodingZone(dstIIP);
|
||||||
if (srcZone != null && srcZone.getDir().equals(src) && dstZone == null) {
|
if (srcZone != null && srcZone.getDir().equals(src) && dstZone == null) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,217 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.namenode;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.XAttr;
|
||||||
|
import org.apache.hadoop.fs.permission.FsAction;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.ErasureCodingZone;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||||
|
import org.apache.hadoop.io.erasurecode.ECSchema;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Helper class to perform erasure coding related operations.
|
||||||
|
*/
|
||||||
|
final class FSDirErasureCodingOp {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Private constructor for preventing FSDirErasureCodingOp object
|
||||||
|
* creation. Static-only class.
|
||||||
|
*/
|
||||||
|
private FSDirErasureCodingOp() {}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create an erasure coding zone on directory src.
|
||||||
|
*
|
||||||
|
* @param fsn namespace
|
||||||
|
* @param srcArg the path of a directory which will be the root of the
|
||||||
|
* erasure coding zone. The directory must be empty.
|
||||||
|
* @param schema ECSchema for the erasure coding zone
|
||||||
|
* @param cellSize Cell size of stripe
|
||||||
|
* @param logRetryCache whether to record RPC ids in editlog for retry
|
||||||
|
* cache rebuilding
|
||||||
|
* @return {@link HdfsFileStatus}
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
static HdfsFileStatus createErasureCodingZone(final FSNamesystem fsn,
|
||||||
|
final String srcArg, final ECSchema schema, final int cellSize,
|
||||||
|
final boolean logRetryCache) throws IOException {
|
||||||
|
assert fsn.hasWriteLock();
|
||||||
|
|
||||||
|
String src = srcArg;
|
||||||
|
FSPermissionChecker pc = null;
|
||||||
|
byte[][] pathComponents = null;
|
||||||
|
pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
|
||||||
|
pc = fsn.getPermissionChecker();
|
||||||
|
FSDirectory fsd = fsn.getFSDirectory();
|
||||||
|
src = fsd.resolvePath(pc, src, pathComponents);
|
||||||
|
final INodesInPath iip;
|
||||||
|
List<XAttr> xAttrs;
|
||||||
|
fsd.writeLock();
|
||||||
|
try {
|
||||||
|
iip = fsd.getINodesInPath4Write(src, false);
|
||||||
|
xAttrs = fsn.getErasureCodingZoneManager().createErasureCodingZone(
|
||||||
|
iip, schema, cellSize);
|
||||||
|
} finally {
|
||||||
|
fsd.writeUnlock();
|
||||||
|
}
|
||||||
|
fsn.getEditLog().logSetXAttrs(src, xAttrs, logRetryCache);
|
||||||
|
return fsd.getAuditFileInfo(iip);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the erasure coding zone information for specified path.
|
||||||
|
*
|
||||||
|
* @param fsn namespace
|
||||||
|
* @param src path
|
||||||
|
* @return {@link ErasureCodingZone}
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
static ErasureCodingZone getErasureCodingZone(final FSNamesystem fsn,
|
||||||
|
final String src) throws IOException {
|
||||||
|
assert fsn.hasReadLock();
|
||||||
|
|
||||||
|
final INodesInPath iip = getINodesInPath(fsn, src);
|
||||||
|
return getErasureCodingZoneForPath(fsn, iip);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get erasure coding zone information for specified path.
|
||||||
|
*
|
||||||
|
* @param fsn namespace
|
||||||
|
* @param iip inodes in the path containing the file
|
||||||
|
* @return {@link ErasureCodingZone}
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
static ErasureCodingZone getErasureCodingZone(final FSNamesystem fsn,
|
||||||
|
final INodesInPath iip) throws IOException {
|
||||||
|
assert fsn.hasReadLock();
|
||||||
|
|
||||||
|
return getErasureCodingZoneForPath(fsn, iip);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if the file is in erasure coding zone.
|
||||||
|
*
|
||||||
|
* @param fsn namespace
|
||||||
|
* @param srcArg path
|
||||||
|
* @return true represents the file is in erasure coding zone, false otw
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
static boolean isInErasureCodingZone(final FSNamesystem fsn,
|
||||||
|
final String srcArg) throws IOException {
|
||||||
|
assert fsn.hasReadLock();
|
||||||
|
|
||||||
|
final INodesInPath iip = getINodesInPath(fsn, srcArg);
|
||||||
|
return getErasureCodingSchemaForPath(fsn, iip) != null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if the file is in erasure coding zone.
|
||||||
|
*
|
||||||
|
* @param fsn namespace
|
||||||
|
* @param iip inodes in the path containing the file
|
||||||
|
* @return true represents the file is in erasure coding zone, false otw
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
static boolean isInErasureCodingZone(final FSNamesystem fsn,
|
||||||
|
final INodesInPath iip) throws IOException {
|
||||||
|
return getErasureCodingSchema(fsn, iip) != null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get erasure coding schema.
|
||||||
|
*
|
||||||
|
* @param fsn namespace
|
||||||
|
* @param iip inodes in the path containing the file
|
||||||
|
* @return {@link ECSchema}
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
static ECSchema getErasureCodingSchema(final FSNamesystem fsn,
|
||||||
|
final INodesInPath iip) throws IOException {
|
||||||
|
assert fsn.hasReadLock();
|
||||||
|
|
||||||
|
return getErasureCodingSchemaForPath(fsn, iip);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get available erasure coding schemas.
|
||||||
|
*
|
||||||
|
* @param fsn namespace
|
||||||
|
* @return {@link ECSchema} array
|
||||||
|
*/
|
||||||
|
static ECSchema[] getErasureCodingSchemas(final FSNamesystem fsn)
|
||||||
|
throws IOException {
|
||||||
|
assert fsn.hasReadLock();
|
||||||
|
|
||||||
|
return fsn.getErasureCodingSchemaManager().getSchemas();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the ECSchema specified by the name.
|
||||||
|
*
|
||||||
|
* @param fsn namespace
|
||||||
|
* @param schemaName schema name
|
||||||
|
* @return {@link ECSchema}
|
||||||
|
*/
|
||||||
|
static ECSchema getErasureCodingSchema(final FSNamesystem fsn,
|
||||||
|
final String schemaName) throws IOException {
|
||||||
|
assert fsn.hasReadLock();
|
||||||
|
|
||||||
|
return fsn.getErasureCodingSchemaManager().getSchema(schemaName);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static INodesInPath getINodesInPath(final FSNamesystem fsn,
|
||||||
|
final String srcArg) throws IOException {
|
||||||
|
String src = srcArg;
|
||||||
|
final byte[][] pathComponents = FSDirectory
|
||||||
|
.getPathComponentsForReservedPath(src);
|
||||||
|
final FSDirectory fsd = fsn.getFSDirectory();
|
||||||
|
final FSPermissionChecker pc = fsn.getPermissionChecker();
|
||||||
|
src = fsd.resolvePath(pc, src, pathComponents);
|
||||||
|
INodesInPath iip = fsd.getINodesInPath(src, true);
|
||||||
|
if (fsn.isPermissionEnabled()) {
|
||||||
|
fsn.getFSDirectory().checkPathAccess(pc, iip, FsAction.READ);
|
||||||
|
}
|
||||||
|
return iip;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static ErasureCodingZone getErasureCodingZoneForPath(
|
||||||
|
final FSNamesystem fsn, final INodesInPath iip) throws IOException {
|
||||||
|
final FSDirectory fsd = fsn.getFSDirectory();
|
||||||
|
fsd.readLock();
|
||||||
|
try {
|
||||||
|
return fsn.getErasureCodingZoneManager().getErasureCodingZone(iip);
|
||||||
|
} finally {
|
||||||
|
fsd.readUnlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static ECSchema getErasureCodingSchemaForPath(final FSNamesystem fsn,
|
||||||
|
final INodesInPath iip) throws IOException {
|
||||||
|
final FSDirectory fsd = fsn.getFSDirectory();
|
||||||
|
fsd.readLock();
|
||||||
|
try {
|
||||||
|
return fsn.getErasureCodingZoneManager().getErasureCodingSchema(iip);
|
||||||
|
} finally {
|
||||||
|
fsd.readUnlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -386,7 +386,8 @@ static HdfsFileStatus createFileStatus(
|
|||||||
final FileEncryptionInfo feInfo = isRawPath ? null :
|
final FileEncryptionInfo feInfo = isRawPath ? null :
|
||||||
fsd.getFileEncryptionInfo(node, snapshot, iip);
|
fsd.getFileEncryptionInfo(node, snapshot, iip);
|
||||||
|
|
||||||
final ErasureCodingZone ecZone = fsd.getECZone(iip);
|
final ErasureCodingZone ecZone = FSDirErasureCodingOp.getErasureCodingZone(
|
||||||
|
fsd.getFSNamesystem(), iip);
|
||||||
final ECSchema schema = ecZone != null ? ecZone.getSchema() : null;
|
final ECSchema schema = ecZone != null ? ecZone.getSchema() : null;
|
||||||
final int cellSize = ecZone != null ? ecZone.getCellSize() : 0;
|
final int cellSize = ecZone != null ? ecZone.getCellSize() : 0;
|
||||||
|
|
||||||
@ -468,7 +469,8 @@ private static HdfsLocatedFileStatus createLocatedFileStatus(
|
|||||||
}
|
}
|
||||||
int childrenNum = node.isDirectory() ?
|
int childrenNum = node.isDirectory() ?
|
||||||
node.asDirectory().getChildrenNum(snapshot) : 0;
|
node.asDirectory().getChildrenNum(snapshot) : 0;
|
||||||
final ErasureCodingZone ecZone = fsd.getECZone(iip);
|
final ErasureCodingZone ecZone = FSDirErasureCodingOp.getErasureCodingZone(
|
||||||
|
fsd.getFSNamesystem(), iip);
|
||||||
final ECSchema schema = ecZone != null ? ecZone.getSchema() : null;
|
final ECSchema schema = ecZone != null ? ecZone.getSchema() : null;
|
||||||
final int cellSize = ecZone != null ? ecZone.getCellSize() : 0;
|
final int cellSize = ecZone != null ? ecZone.getCellSize() : 0;
|
||||||
|
|
||||||
|
@ -495,7 +495,8 @@ static INodeFile addFileForEditLog(
|
|||||||
INodesInPath iip = fsd.addINode(existing, newNode);
|
INodesInPath iip = fsd.addINode(existing, newNode);
|
||||||
if (iip != null) {
|
if (iip != null) {
|
||||||
// check if the file is in an EC zone
|
// check if the file is in an EC zone
|
||||||
if (fsd.isInECZone(iip)) {
|
if (FSDirErasureCodingOp.isInErasureCodingZone(fsd.getFSNamesystem(),
|
||||||
|
iip)) {
|
||||||
newNode.addStripedBlocksFeature();
|
newNode.addStripedBlocksFeature();
|
||||||
}
|
}
|
||||||
if (aclEntries != null) {
|
if (aclEntries != null) {
|
||||||
@ -530,7 +531,8 @@ private static BlockInfo addBlock(FSDirectory fsd, String path,
|
|||||||
// associate new last block for the file
|
// associate new last block for the file
|
||||||
final BlockInfo blockInfo;
|
final BlockInfo blockInfo;
|
||||||
if (isStriped) {
|
if (isStriped) {
|
||||||
ECSchema ecSchema = fsd.getECSchema(inodesInPath);
|
ECSchema ecSchema = FSDirErasureCodingOp.getErasureCodingSchema(
|
||||||
|
fsd.getFSNamesystem(), inodesInPath);
|
||||||
short numDataUnits = (short) ecSchema.getNumDataUnits();
|
short numDataUnits = (short) ecSchema.getNumDataUnits();
|
||||||
short numParityUnits = (short) ecSchema.getNumParityUnits();
|
short numParityUnits = (short) ecSchema.getNumParityUnits();
|
||||||
short numLocations = (short) (numDataUnits + numParityUnits);
|
short numLocations = (short) (numDataUnits + numParityUnits);
|
||||||
@ -586,7 +588,9 @@ private static INodesInPath addFile(
|
|||||||
fsd.writeLock();
|
fsd.writeLock();
|
||||||
try {
|
try {
|
||||||
newiip = fsd.addINode(existing, newNode);
|
newiip = fsd.addINode(existing, newNode);
|
||||||
if (newiip != null && fsd.isInECZone(newiip)) {
|
if (newiip != null
|
||||||
|
&& FSDirErasureCodingOp.isInErasureCodingZone(fsd.getFSNamesystem(),
|
||||||
|
newiip)) {
|
||||||
newNode.addStripedBlocksFeature();
|
newNode.addStripedBlocksFeature();
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -40,7 +40,6 @@
|
|||||||
import org.apache.hadoop.hdfs.XAttrHelper;
|
import org.apache.hadoop.hdfs.XAttrHelper;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||||
import org.apache.hadoop.hdfs.protocol.ErasureCodingZone;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.EncryptionZone;
|
import org.apache.hadoop.hdfs.protocol.EncryptionZone;
|
||||||
import org.apache.hadoop.hdfs.protocol.FSLimitException.MaxDirectoryItemsExceededException;
|
import org.apache.hadoop.hdfs.protocol.FSLimitException.MaxDirectoryItemsExceededException;
|
||||||
import org.apache.hadoop.hdfs.protocol.FSLimitException.PathComponentTooLongException;
|
import org.apache.hadoop.hdfs.protocol.FSLimitException.PathComponentTooLongException;
|
||||||
@ -58,7 +57,6 @@
|
|||||||
import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
|
import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
|
||||||
import org.apache.hadoop.hdfs.util.ByteArray;
|
import org.apache.hadoop.hdfs.util.ByteArray;
|
||||||
import org.apache.hadoop.hdfs.util.EnumCounters;
|
import org.apache.hadoop.hdfs.util.EnumCounters;
|
||||||
import org.apache.hadoop.io.erasurecode.ECSchema;
|
|
||||||
import org.apache.hadoop.security.AccessControlException;
|
import org.apache.hadoop.security.AccessControlException;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
@ -1225,38 +1223,6 @@ FileEncryptionInfo getFileEncryptionInfo(INode inode, int snapshotId,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
XAttr createErasureCodingZone(String src, ECSchema schema, int cellSize)
|
|
||||||
throws IOException {
|
|
||||||
writeLock();
|
|
||||||
try {
|
|
||||||
return ecZoneManager.createErasureCodingZone(src, schema, cellSize);
|
|
||||||
} finally {
|
|
||||||
writeUnlock();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean isInECZone(INodesInPath iip) throws IOException {
|
|
||||||
return getECSchema(iip) != null;
|
|
||||||
}
|
|
||||||
|
|
||||||
ECSchema getECSchema(INodesInPath iip) throws IOException {
|
|
||||||
readLock();
|
|
||||||
try {
|
|
||||||
return ecZoneManager.getECSchema(iip);
|
|
||||||
} finally {
|
|
||||||
readUnlock();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
ErasureCodingZone getECZone(INodesInPath iip) throws IOException {
|
|
||||||
readLock();
|
|
||||||
try {
|
|
||||||
return ecZoneManager.getECZone(iip);
|
|
||||||
} finally {
|
|
||||||
readUnlock();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static INode resolveLastINode(INodesInPath iip) throws FileNotFoundException {
|
static INode resolveLastINode(INodesInPath iip) throws FileNotFoundException {
|
||||||
INode inode = iip.getLastINode();
|
INode inode = iip.getLastINode();
|
||||||
if (inode == null) {
|
if (inode == null) {
|
||||||
|
@ -417,8 +417,9 @@ private long applyEditLogOp(FSEditLogOp op, FSDirectory fsDir,
|
|||||||
// Update the salient file attributes.
|
// Update the salient file attributes.
|
||||||
newFile.setAccessTime(addCloseOp.atime, Snapshot.CURRENT_STATE_ID);
|
newFile.setAccessTime(addCloseOp.atime, Snapshot.CURRENT_STATE_ID);
|
||||||
newFile.setModificationTime(addCloseOp.mtime, Snapshot.CURRENT_STATE_ID);
|
newFile.setModificationTime(addCloseOp.mtime, Snapshot.CURRENT_STATE_ID);
|
||||||
updateBlocks(fsDir, addCloseOp, iip, newFile,
|
ECSchema ecSchema = FSDirErasureCodingOp.getErasureCodingSchema(
|
||||||
fsDir.getECSchema(iip), fsDir.isInECZone(iip));
|
fsDir.getFSNamesystem(), iip);
|
||||||
|
updateBlocks(fsDir, addCloseOp, iip, newFile, ecSchema);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case OP_CLOSE: {
|
case OP_CLOSE: {
|
||||||
@ -438,8 +439,9 @@ private long applyEditLogOp(FSEditLogOp op, FSDirectory fsDir,
|
|||||||
// Update the salient file attributes.
|
// Update the salient file attributes.
|
||||||
file.setAccessTime(addCloseOp.atime, Snapshot.CURRENT_STATE_ID);
|
file.setAccessTime(addCloseOp.atime, Snapshot.CURRENT_STATE_ID);
|
||||||
file.setModificationTime(addCloseOp.mtime, Snapshot.CURRENT_STATE_ID);
|
file.setModificationTime(addCloseOp.mtime, Snapshot.CURRENT_STATE_ID);
|
||||||
updateBlocks(fsDir, addCloseOp, iip, file,
|
ECSchema ecSchema = FSDirErasureCodingOp.getErasureCodingSchema(
|
||||||
fsDir.getECSchema(iip), fsDir.isInECZone(iip));
|
fsDir.getFSNamesystem(), iip);
|
||||||
|
updateBlocks(fsDir, addCloseOp, iip, file, ecSchema);
|
||||||
|
|
||||||
// Now close the file
|
// Now close the file
|
||||||
if (!file.isUnderConstruction() &&
|
if (!file.isUnderConstruction() &&
|
||||||
@ -497,8 +499,9 @@ private long applyEditLogOp(FSEditLogOp op, FSDirectory fsDir,
|
|||||||
INodesInPath iip = fsDir.getINodesInPath(path, true);
|
INodesInPath iip = fsDir.getINodesInPath(path, true);
|
||||||
INodeFile oldFile = INodeFile.valueOf(iip.getLastINode(), path);
|
INodeFile oldFile = INodeFile.valueOf(iip.getLastINode(), path);
|
||||||
// Update in-memory data structures
|
// Update in-memory data structures
|
||||||
updateBlocks(fsDir, updateOp, iip, oldFile,
|
ECSchema ecSchema = FSDirErasureCodingOp.getErasureCodingSchema(
|
||||||
fsDir.getECSchema(iip), fsDir.isInECZone(iip));
|
fsDir.getFSNamesystem(), iip);
|
||||||
|
updateBlocks(fsDir, updateOp, iip, oldFile, ecSchema);
|
||||||
|
|
||||||
if (toAddRetryCache) {
|
if (toAddRetryCache) {
|
||||||
fsNamesys.addCacheEntry(updateOp.rpcClientId, updateOp.rpcCallId);
|
fsNamesys.addCacheEntry(updateOp.rpcClientId, updateOp.rpcCallId);
|
||||||
@ -515,8 +518,9 @@ private long applyEditLogOp(FSEditLogOp op, FSDirectory fsDir,
|
|||||||
INodesInPath iip = fsDir.getINodesInPath(path, true);
|
INodesInPath iip = fsDir.getINodesInPath(path, true);
|
||||||
INodeFile oldFile = INodeFile.valueOf(iip.getLastINode(), path);
|
INodeFile oldFile = INodeFile.valueOf(iip.getLastINode(), path);
|
||||||
// add the new block to the INodeFile
|
// add the new block to the INodeFile
|
||||||
addNewBlock(addBlockOp, oldFile,
|
ECSchema ecSchema = FSDirErasureCodingOp.getErasureCodingSchema(
|
||||||
fsDir.getECSchema(iip), fsDir.isInECZone(iip));
|
fsDir.getFSNamesystem(), iip);
|
||||||
|
addNewBlock(addBlockOp, oldFile, ecSchema);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case OP_SET_REPLICATION: {
|
case OP_SET_REPLICATION: {
|
||||||
@ -957,8 +961,7 @@ private static String formatEditLogReplayError(EditLogInputStream in,
|
|||||||
/**
|
/**
|
||||||
* Add a new block into the given INodeFile
|
* Add a new block into the given INodeFile
|
||||||
*/
|
*/
|
||||||
private void addNewBlock(AddBlockOp op, INodeFile file,
|
private void addNewBlock(AddBlockOp op, INodeFile file, ECSchema ecSchema)
|
||||||
ECSchema schema, boolean isStriped)
|
|
||||||
throws IOException {
|
throws IOException {
|
||||||
BlockInfo[] oldBlocks = file.getBlocks();
|
BlockInfo[] oldBlocks = file.getBlocks();
|
||||||
Block pBlock = op.getPenultimateBlock();
|
Block pBlock = op.getPenultimateBlock();
|
||||||
@ -986,8 +989,9 @@ private void addNewBlock(AddBlockOp op, INodeFile file,
|
|||||||
}
|
}
|
||||||
// add the new block
|
// add the new block
|
||||||
final BlockInfo newBlockInfo;
|
final BlockInfo newBlockInfo;
|
||||||
|
boolean isStriped = ecSchema != null;
|
||||||
if (isStriped) {
|
if (isStriped) {
|
||||||
newBlockInfo = new BlockInfoStripedUnderConstruction(newBlock, schema);
|
newBlockInfo = new BlockInfoStripedUnderConstruction(newBlock, ecSchema);
|
||||||
} else {
|
} else {
|
||||||
newBlockInfo = new BlockInfoContiguousUnderConstruction(newBlock,
|
newBlockInfo = new BlockInfoContiguousUnderConstruction(newBlock,
|
||||||
file.getPreferredBlockReplication());
|
file.getPreferredBlockReplication());
|
||||||
@ -1002,8 +1006,7 @@ private void addNewBlock(AddBlockOp op, INodeFile file,
|
|||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
private void updateBlocks(FSDirectory fsDir, BlockListUpdatingOp op,
|
private void updateBlocks(FSDirectory fsDir, BlockListUpdatingOp op,
|
||||||
INodesInPath iip, INodeFile file, ECSchema schema,
|
INodesInPath iip, INodeFile file, ECSchema ecSchema) throws IOException {
|
||||||
boolean isStriped) throws IOException {
|
|
||||||
// Update its block list
|
// Update its block list
|
||||||
BlockInfo[] oldBlocks = file.getBlocks();
|
BlockInfo[] oldBlocks = file.getBlocks();
|
||||||
Block[] newBlocks = op.getBlocks();
|
Block[] newBlocks = op.getBlocks();
|
||||||
@ -1062,6 +1065,7 @@ private void updateBlocks(FSDirectory fsDir, BlockListUpdatingOp op,
|
|||||||
throw new IOException("Trying to delete non-existant block " + oldBlock);
|
throw new IOException("Trying to delete non-existant block " + oldBlock);
|
||||||
}
|
}
|
||||||
} else if (newBlocks.length > oldBlocks.length) {
|
} else if (newBlocks.length > oldBlocks.length) {
|
||||||
|
final boolean isStriped = ecSchema != null;
|
||||||
// We're adding blocks
|
// We're adding blocks
|
||||||
for (int i = oldBlocks.length; i < newBlocks.length; i++) {
|
for (int i = oldBlocks.length; i < newBlocks.length; i++) {
|
||||||
Block newBlock = newBlocks[i];
|
Block newBlock = newBlocks[i];
|
||||||
@ -1071,7 +1075,7 @@ private void updateBlocks(FSDirectory fsDir, BlockListUpdatingOp op,
|
|||||||
// what about an old-version fsync() where fsync isn't called
|
// what about an old-version fsync() where fsync isn't called
|
||||||
// until several blocks in?
|
// until several blocks in?
|
||||||
if (isStriped) {
|
if (isStriped) {
|
||||||
newBI = new BlockInfoStripedUnderConstruction(newBlock, schema);
|
newBI = new BlockInfoStripedUnderConstruction(newBlock, ecSchema);
|
||||||
} else {
|
} else {
|
||||||
newBI = new BlockInfoContiguousUnderConstruction(newBlock,
|
newBI = new BlockInfoContiguousUnderConstruction(newBlock,
|
||||||
file.getPreferredBlockReplication());
|
file.getPreferredBlockReplication());
|
||||||
|
@ -2408,7 +2408,7 @@ private HdfsFileStatus startFileInt(final String src,
|
|||||||
readLock();
|
readLock();
|
||||||
try {
|
try {
|
||||||
checkOperation(OperationCategory.READ);
|
checkOperation(OperationCategory.READ);
|
||||||
if (!isInECZone(src)) {
|
if (!FSDirErasureCodingOp.isInErasureCodingZone(this, src)) {
|
||||||
blockManager.verifyReplication(src, replication, clientMachine);
|
blockManager.verifyReplication(src, replication, clientMachine);
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
@ -3675,7 +3675,8 @@ void commitOrCompleteLastBlock(
|
|||||||
final long diff;
|
final long diff;
|
||||||
final short replicationFactor;
|
final short replicationFactor;
|
||||||
if (fileINode.isStriped()) {
|
if (fileINode.isStriped()) {
|
||||||
final ECSchema ecSchema = dir.getECSchema(iip);
|
final ECSchema ecSchema = FSDirErasureCodingOp.getErasureCodingSchema(
|
||||||
|
this, iip);
|
||||||
final short numDataUnits = (short) ecSchema.getNumDataUnits();
|
final short numDataUnits = (short) ecSchema.getNumDataUnits();
|
||||||
final short numParityUnits = (short) ecSchema.getNumParityUnits();
|
final short numParityUnits = (short) ecSchema.getNumParityUnits();
|
||||||
|
|
||||||
@ -6670,11 +6671,16 @@ public CacheManager getCacheManager() {
|
|||||||
return cacheManager;
|
return cacheManager;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** @return the schema manager. */
|
/** @return the ErasureCodingSchemaManager. */
|
||||||
public ErasureCodingSchemaManager getECSchemaManager() {
|
public ErasureCodingSchemaManager getErasureCodingSchemaManager() {
|
||||||
return ecSchemaManager;
|
return ecSchemaManager;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** @return the ErasureCodingZoneManager. */
|
||||||
|
public ErasureCodingZoneManager getErasureCodingZoneManager() {
|
||||||
|
return dir.ecZoneManager;
|
||||||
|
}
|
||||||
|
|
||||||
@Override // NameNodeMXBean
|
@Override // NameNodeMXBean
|
||||||
public String getCorruptFiles() {
|
public String getCorruptFiles() {
|
||||||
List<String> list = new ArrayList<String>();
|
List<String> list = new ArrayList<String>();
|
||||||
@ -7579,47 +7585,25 @@ BatchedListEntries<EncryptionZone> listEncryptionZones(long prevId)
|
|||||||
void createErasureCodingZone(final String srcArg, final ECSchema schema,
|
void createErasureCodingZone(final String srcArg, final ECSchema schema,
|
||||||
int cellSize, final boolean logRetryCache) throws IOException,
|
int cellSize, final boolean logRetryCache) throws IOException,
|
||||||
UnresolvedLinkException, SafeModeException, AccessControlException {
|
UnresolvedLinkException, SafeModeException, AccessControlException {
|
||||||
String src = srcArg;
|
|
||||||
HdfsFileStatus resultingStat = null;
|
|
||||||
FSPermissionChecker pc = null;
|
|
||||||
byte[][] pathComponents = null;
|
|
||||||
boolean success = false;
|
|
||||||
try {
|
|
||||||
checkSuperuserPrivilege();
|
checkSuperuserPrivilege();
|
||||||
checkOperation(OperationCategory.WRITE);
|
checkOperation(OperationCategory.WRITE);
|
||||||
pathComponents =
|
HdfsFileStatus resultingStat = null;
|
||||||
FSDirectory.getPathComponentsForReservedPath(src);
|
boolean success = false;
|
||||||
pc = getPermissionChecker();
|
|
||||||
} catch (Throwable e) {
|
|
||||||
logAuditEvent(success, "createErasureCodingZone", srcArg);
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
writeLock();
|
writeLock();
|
||||||
try {
|
try {
|
||||||
checkSuperuserPrivilege();
|
|
||||||
checkOperation(OperationCategory.WRITE);
|
checkOperation(OperationCategory.WRITE);
|
||||||
checkNameNodeSafeMode("Cannot create erasure coding zone on " + src);
|
checkNameNodeSafeMode("Cannot create erasure coding zone on " + srcArg);
|
||||||
src = dir.resolvePath(pc, src, pathComponents);
|
resultingStat = FSDirErasureCodingOp.createErasureCodingZone(this,
|
||||||
|
srcArg, schema, cellSize, logRetryCache);
|
||||||
final XAttr ecXAttr = dir.createErasureCodingZone(src, schema, cellSize);
|
|
||||||
List<XAttr> xAttrs = Lists.newArrayListWithCapacity(1);
|
|
||||||
xAttrs.add(ecXAttr);
|
|
||||||
getEditLog().logSetXAttrs(src, xAttrs, logRetryCache);
|
|
||||||
final INodesInPath iip = dir.getINodesInPath4Write(src, false);
|
|
||||||
resultingStat = dir.getAuditFileInfo(iip);
|
|
||||||
success = true;
|
success = true;
|
||||||
} finally {
|
} finally {
|
||||||
writeUnlock();
|
writeUnlock();
|
||||||
}
|
if (success) {
|
||||||
getEditLog().logSync();
|
getEditLog().logSync();
|
||||||
logAuditEvent(success, "createErasureCodingZone", srcArg, null, resultingStat);
|
|
||||||
}
|
}
|
||||||
|
logAuditEvent(success, "createErasureCodingZone", srcArg, null,
|
||||||
private boolean isInECZone(String src) throws IOException {
|
resultingStat);
|
||||||
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
|
}
|
||||||
src = FSDirectory.resolvePath(src, pathComponents, dir);
|
|
||||||
final INodesInPath iip = dir.getINodesInPath(src, true);
|
|
||||||
return dir.isInECZone(iip);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -7638,15 +7622,15 @@ ErasureCodingZone getErasureCodingZone(String src)
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get available ECSchemas
|
* Get available erasure coding schemas
|
||||||
*/
|
*/
|
||||||
ECSchema[] getECSchemas() throws IOException {
|
ECSchema[] getErasureCodingSchemas() throws IOException {
|
||||||
checkOperation(OperationCategory.READ);
|
checkOperation(OperationCategory.READ);
|
||||||
waitForLoadingFSImage();
|
waitForLoadingFSImage();
|
||||||
readLock();
|
readLock();
|
||||||
try {
|
try {
|
||||||
checkOperation(OperationCategory.READ);
|
checkOperation(OperationCategory.READ);
|
||||||
return ecSchemaManager.getSchemas();
|
return FSDirErasureCodingOp.getErasureCodingSchemas(this);
|
||||||
} finally {
|
} finally {
|
||||||
readUnlock();
|
readUnlock();
|
||||||
}
|
}
|
||||||
@ -7655,13 +7639,13 @@ ECSchema[] getECSchemas() throws IOException {
|
|||||||
/**
|
/**
|
||||||
* Get the ECSchema specified by the name
|
* Get the ECSchema specified by the name
|
||||||
*/
|
*/
|
||||||
ECSchema getECSchema(String schemaName) throws IOException {
|
ECSchema getErasureCodingSchema(String schemaName) throws IOException {
|
||||||
checkOperation(OperationCategory.READ);
|
checkOperation(OperationCategory.READ);
|
||||||
waitForLoadingFSImage();
|
waitForLoadingFSImage();
|
||||||
readLock();
|
readLock();
|
||||||
try {
|
try {
|
||||||
checkOperation(OperationCategory.READ);
|
checkOperation(OperationCategory.READ);
|
||||||
return ecSchemaManager.getSchema(schemaName);
|
return FSDirErasureCodingOp.getErasureCodingSchema(this, schemaName);
|
||||||
} finally {
|
} finally {
|
||||||
readUnlock();
|
readUnlock();
|
||||||
}
|
}
|
||||||
@ -7854,16 +7838,7 @@ private static void enableAsyncAuditLog() {
|
|||||||
@Override
|
@Override
|
||||||
public ErasureCodingZone getErasureCodingZoneForPath(String src)
|
public ErasureCodingZone getErasureCodingZoneForPath(String src)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
final byte[][] pathComponents = FSDirectory
|
return FSDirErasureCodingOp.getErasureCodingZone(this, src);
|
||||||
.getPathComponentsForReservedPath(src);
|
|
||||||
final FSPermissionChecker pc = getPermissionChecker();
|
|
||||||
src = dir.resolvePath(pc, src, pathComponents);
|
|
||||||
final INodesInPath iip = dir.getINodesInPath(src, true);
|
|
||||||
if (isPermissionEnabled) {
|
|
||||||
dir.checkPathAccess(pc, iip, FsAction.READ);
|
|
||||||
}
|
}
|
||||||
return dir.getECZone(iip);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2037,7 +2037,7 @@ public void removeSpanReceiver(long id) throws IOException {
|
|||||||
@Override // ClientProtocol
|
@Override // ClientProtocol
|
||||||
public ECSchema[] getECSchemas() throws IOException {
|
public ECSchema[] getECSchemas() throws IOException {
|
||||||
checkNNStartup();
|
checkNNStartup();
|
||||||
return namesystem.getECSchemas();
|
return namesystem.getErasureCodingSchemas();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override // ClientProtocol
|
@Override // ClientProtocol
|
||||||
|
@ -572,9 +572,17 @@ private void collectBlocksSummary(String parent, HdfsFileStatus file, Result res
|
|||||||
// count expected replicas
|
// count expected replicas
|
||||||
short targetFileReplication;
|
short targetFileReplication;
|
||||||
if(file.getReplication() == 0) {
|
if(file.getReplication() == 0) {
|
||||||
INode inode = namenode.getNamesystem().getFSDirectory().getINode(path);
|
final FSNamesystem fsn = namenode.getNamesystem();
|
||||||
|
final ECSchema ecSchema;
|
||||||
|
fsn.readLock();
|
||||||
|
try {
|
||||||
|
INode inode = namenode.getNamesystem().getFSDirectory()
|
||||||
|
.getINode(path);
|
||||||
INodesInPath iip = INodesInPath.fromINode(inode);
|
INodesInPath iip = INodesInPath.fromINode(inode);
|
||||||
ECSchema ecSchema = namenode.getNamesystem().getFSDirectory().getECSchema(iip);
|
ecSchema = FSDirErasureCodingOp.getErasureCodingSchema(fsn, iip);
|
||||||
|
} finally {
|
||||||
|
fsn.readUnlock();
|
||||||
|
}
|
||||||
targetFileReplication = (short) (ecSchema.getNumDataUnits() + ecSchema.getNumParityUnits());
|
targetFileReplication = (short) (ecSchema.getNumDataUnits() + ecSchema.getNumParityUnits());
|
||||||
} else {
|
} else {
|
||||||
targetFileReplication = file.getReplication();
|
targetFileReplication = file.getReplication();
|
||||||
|
@ -71,8 +71,13 @@ public static LocatedBlocks getBlockLocations(NameNode namenode,
|
|||||||
public static HdfsFileStatus getFileInfo(NameNode namenode, String src,
|
public static HdfsFileStatus getFileInfo(NameNode namenode, String src,
|
||||||
boolean resolveLink) throws AccessControlException, UnresolvedLinkException,
|
boolean resolveLink) throws AccessControlException, UnresolvedLinkException,
|
||||||
StandbyException, IOException {
|
StandbyException, IOException {
|
||||||
|
namenode.getNamesystem().readLock();
|
||||||
|
try {
|
||||||
return FSDirStatAndListingOp.getFileInfo(namenode.getNamesystem()
|
return FSDirStatAndListingOp.getFileInfo(namenode.getNamesystem()
|
||||||
.getFSDirectory(), src, resolveLink);
|
.getFSDirectory(), src, resolveLink);
|
||||||
|
} finally {
|
||||||
|
namenode.getNamesystem().readUnlock();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static boolean mkdirs(NameNode namenode, String src,
|
public static boolean mkdirs(NameNode namenode, String src,
|
||||||
|
Loading…
Reference in New Issue
Block a user