HDFS-8289. Erasure Coding: add ECSchema to HdfsFileStatus. Contributed by Yong Zhang.
This commit is contained in:
parent
a17cedb44c
commit
9da927540f
@ -26,6 +26,7 @@
|
|||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.hdfs.DFSUtilClient;
|
import org.apache.hadoop.hdfs.DFSUtilClient;
|
||||||
|
import org.apache.hadoop.io.erasurecode.ECSchema;
|
||||||
|
|
||||||
/** Interface that represents the over the wire information for a file.
|
/** Interface that represents the over the wire information for a file.
|
||||||
*/
|
*/
|
||||||
@ -48,6 +49,8 @@ public class HdfsFileStatus {
|
|||||||
|
|
||||||
private final FileEncryptionInfo feInfo;
|
private final FileEncryptionInfo feInfo;
|
||||||
|
|
||||||
|
private final ECSchema schema;
|
||||||
|
|
||||||
// Used by dir, not including dot and dotdot. Always zero for a regular file.
|
// Used by dir, not including dot and dotdot. Always zero for a regular file.
|
||||||
private final int childrenNum;
|
private final int childrenNum;
|
||||||
private final byte storagePolicy;
|
private final byte storagePolicy;
|
||||||
@ -73,7 +76,7 @@ public HdfsFileStatus(long length, boolean isdir, int block_replication,
|
|||||||
long blocksize, long modification_time, long access_time,
|
long blocksize, long modification_time, long access_time,
|
||||||
FsPermission permission, String owner, String group, byte[] symlink,
|
FsPermission permission, String owner, String group, byte[] symlink,
|
||||||
byte[] path, long fileId, int childrenNum, FileEncryptionInfo feInfo,
|
byte[] path, long fileId, int childrenNum, FileEncryptionInfo feInfo,
|
||||||
byte storagePolicy) {
|
byte storagePolicy, ECSchema schema) {
|
||||||
this.length = length;
|
this.length = length;
|
||||||
this.isdir = isdir;
|
this.isdir = isdir;
|
||||||
this.block_replication = (short)block_replication;
|
this.block_replication = (short)block_replication;
|
||||||
@ -93,6 +96,7 @@ public HdfsFileStatus(long length, boolean isdir, int block_replication,
|
|||||||
this.childrenNum = childrenNum;
|
this.childrenNum = childrenNum;
|
||||||
this.feInfo = feInfo;
|
this.feInfo = feInfo;
|
||||||
this.storagePolicy = storagePolicy;
|
this.storagePolicy = storagePolicy;
|
||||||
|
this.schema = schema;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -250,6 +254,10 @@ public final FileEncryptionInfo getFileEncryptionInfo() {
|
|||||||
return feInfo;
|
return feInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public ECSchema getECSchema() {
|
||||||
|
return schema;
|
||||||
|
}
|
||||||
|
|
||||||
public final int getChildrenNum() {
|
public final int getChildrenNum() {
|
||||||
return childrenNum;
|
return childrenNum;
|
||||||
}
|
}
|
||||||
|
@ -61,7 +61,7 @@ public SnapshottableDirectoryStatus(long modification_time, long access_time,
|
|||||||
int snapshotNumber, int snapshotQuota, byte[] parentFullPath) {
|
int snapshotNumber, int snapshotQuota, byte[] parentFullPath) {
|
||||||
this.dirStatus = new HdfsFileStatus(0, true, 0, 0, modification_time,
|
this.dirStatus = new HdfsFileStatus(0, true, 0, 0, modification_time,
|
||||||
access_time, permission, owner, group, null, localName, inodeId,
|
access_time, permission, owner, group, null, localName, inodeId,
|
||||||
childrenNum, null, HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED);
|
childrenNum, null, HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED, null);
|
||||||
this.snapshotNumber = snapshotNumber;
|
this.snapshotNumber = snapshotNumber;
|
||||||
this.snapshotQuota = snapshotQuota;
|
this.snapshotQuota = snapshotQuota;
|
||||||
this.parentFullPath = parentFullPath;
|
this.parentFullPath = parentFullPath;
|
||||||
|
@ -132,7 +132,7 @@ static HdfsFileStatus toFileStatus(final Map<?, ?> json, boolean includesType) {
|
|||||||
blockSize, mTime, aTime, permission, owner, group,
|
blockSize, mTime, aTime, permission, owner, group,
|
||||||
symlink, DFSUtilClient.string2Bytes(localName),
|
symlink, DFSUtilClient.string2Bytes(localName),
|
||||||
fileId, childrenNum, null,
|
fileId, childrenNum, null,
|
||||||
storagePolicy);
|
storagePolicy, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Convert a Json map to an ExtendedBlock object. */
|
/** Convert a Json map to an ExtendedBlock object. */
|
||||||
|
@ -189,3 +189,6 @@
|
|||||||
|
|
||||||
HDFS-8203. Erasure Coding: Seek and other Ops in DFSStripedInputStream.
|
HDFS-8203. Erasure Coding: Seek and other Ops in DFSStripedInputStream.
|
||||||
(Yi Liu via jing9)
|
(Yi Liu via jing9)
|
||||||
|
|
||||||
|
HDFS-8289. Erasure Coding: add ECSchema to HdfsFileStatus. (Yong Zhang via
|
||||||
|
jing9)
|
||||||
|
@ -1193,9 +1193,9 @@ public DFSInputStream open(String src, int buffersize, boolean verifyChecksum)
|
|||||||
// Get block info from namenode
|
// Get block info from namenode
|
||||||
TraceScope scope = getPathTraceScope("newDFSInputStream", src);
|
TraceScope scope = getPathTraceScope("newDFSInputStream", src);
|
||||||
try {
|
try {
|
||||||
ErasureCodingInfo info = getErasureCodingInfo(src);
|
ECSchema schema = getFileInfo(src).getECSchema();
|
||||||
if (info != null) {
|
if (schema != null) {
|
||||||
return new DFSStripedInputStream(this, src, verifyChecksum, info);
|
return new DFSStripedInputStream(this, src, verifyChecksum, schema);
|
||||||
} else {
|
} else {
|
||||||
return new DFSInputStream(this, src, verifyChecksum);
|
return new DFSInputStream(this, src, verifyChecksum);
|
||||||
}
|
}
|
||||||
|
@ -271,7 +271,7 @@ static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,
|
|||||||
}
|
}
|
||||||
Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!");
|
Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!");
|
||||||
final DFSOutputStream out;
|
final DFSOutputStream out;
|
||||||
if(stat.getReplication() == 0) {
|
if(stat.getECSchema() != null) {
|
||||||
out = new DFSStripedOutputStream(dfsClient, src, stat,
|
out = new DFSStripedOutputStream(dfsClient, src, stat,
|
||||||
flag, progress, checksum, favoredNodes);
|
flag, progress, checksum, favoredNodes);
|
||||||
} else {
|
} else {
|
||||||
|
@ -29,6 +29,7 @@
|
|||||||
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.ReadPortion;
|
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.ReadPortion;
|
||||||
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.planReadPortions;
|
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.planReadPortions;
|
||||||
|
|
||||||
|
import org.apache.hadoop.io.erasurecode.ECSchema;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.htrace.Span;
|
import org.apache.htrace.Span;
|
||||||
import org.apache.htrace.Trace;
|
import org.apache.htrace.Trace;
|
||||||
@ -132,13 +133,13 @@ boolean include(long pos) {
|
|||||||
private final CompletionService<Integer> readingService;
|
private final CompletionService<Integer> readingService;
|
||||||
|
|
||||||
DFSStripedInputStream(DFSClient dfsClient, String src, boolean verifyChecksum,
|
DFSStripedInputStream(DFSClient dfsClient, String src, boolean verifyChecksum,
|
||||||
ErasureCodingInfo ecInfo) throws IOException {
|
ECSchema schema) throws IOException {
|
||||||
super(dfsClient, src, verifyChecksum);
|
super(dfsClient, src, verifyChecksum);
|
||||||
// ECInfo is restored from NN just before reading striped file.
|
|
||||||
assert ecInfo != null;
|
assert schema != null;
|
||||||
cellSize = ecInfo.getSchema().getChunkSize();
|
cellSize = schema.getChunkSize();
|
||||||
dataBlkNum = (short) ecInfo.getSchema().getNumDataUnits();
|
dataBlkNum = (short) schema.getNumDataUnits();
|
||||||
parityBlkNum = (short) ecInfo.getSchema().getNumParityUnits();
|
parityBlkNum = (short) schema.getNumParityUnits();
|
||||||
curStripeRange = new StripeRange(0, 0);
|
curStripeRange = new StripeRange(0, 0);
|
||||||
readingService =
|
readingService =
|
||||||
new ExecutorCompletionService<>(dfsClient.getStripedReadsThreadPool());
|
new ExecutorCompletionService<>(dfsClient.getStripedReadsThreadPool());
|
||||||
|
@ -219,9 +219,7 @@ private StripedDataStreamer getLeadingStreamer() {
|
|||||||
LOG.debug("Creating DFSStripedOutputStream for " + src);
|
LOG.debug("Creating DFSStripedOutputStream for " + src);
|
||||||
}
|
}
|
||||||
|
|
||||||
// ECInfo is restored from NN just before writing striped files.
|
final ECSchema schema = stat.getECSchema();
|
||||||
//TODO reduce an rpc call HDFS-8289
|
|
||||||
final ECSchema schema = dfsClient.getErasureCodingInfo(src).getSchema();
|
|
||||||
final int numParityBlocks = schema.getNumParityUnits();
|
final int numParityBlocks = schema.getNumParityUnits();
|
||||||
cellSize = schema.getChunkSize();
|
cellSize = schema.getChunkSize();
|
||||||
numDataBlocks = schema.getNumDataUnits();
|
numDataBlocks = schema.getNumDataUnits();
|
||||||
|
@ -26,6 +26,7 @@
|
|||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.hdfs.DFSUtilClient;
|
import org.apache.hadoop.hdfs.DFSUtilClient;
|
||||||
|
import org.apache.hadoop.io.erasurecode.ECSchema;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Interface that represents the over the wire information
|
* Interface that represents the over the wire information
|
||||||
@ -58,10 +59,10 @@ public HdfsLocatedFileStatus(long length, boolean isdir,
|
|||||||
int block_replication, long blocksize, long modification_time,
|
int block_replication, long blocksize, long modification_time,
|
||||||
long access_time, FsPermission permission, String owner, String group,
|
long access_time, FsPermission permission, String owner, String group,
|
||||||
byte[] symlink, byte[] path, long fileId, LocatedBlocks locations,
|
byte[] symlink, byte[] path, long fileId, LocatedBlocks locations,
|
||||||
int childrenNum, FileEncryptionInfo feInfo, byte storagePolicy) {
|
int childrenNum, FileEncryptionInfo feInfo, byte storagePolicy, ECSchema schema) {
|
||||||
super(length, isdir, block_replication, blocksize, modification_time,
|
super(length, isdir, block_replication, blocksize, modification_time,
|
||||||
access_time, permission, owner, group, symlink, path, fileId,
|
access_time, permission, owner, group, symlink, path, fileId,
|
||||||
childrenNum, feInfo, storagePolicy);
|
childrenNum, feInfo, storagePolicy, schema);
|
||||||
this.locations = locations;
|
this.locations = locations;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -173,7 +173,7 @@
|
|||||||
import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetErasureCodingInfoRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetErasureCodingInfoRequestProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetErasureCodingInfoResponseProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetErasureCodingInfoResponseProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.CreateErasureCodingZoneRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.CreateErasureCodingZoneRequestProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.ECSchemaProto;
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ECSchemaProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.GetXAttrsRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.GetXAttrsRequestProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.ListXAttrsRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.ListXAttrsRequestProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.RemoveXAttrRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.RemoveXAttrRequestProto;
|
||||||
|
@ -136,9 +136,9 @@
|
|||||||
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportContextProto;
|
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportContextProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.BlockECRecoveryInfoProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.BlockECRecoveryInfoProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.ErasureCodingInfoProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.ErasureCodingInfoProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.ECSchemaOptionEntryProto;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.ECSchemaProto;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.ErasureCodingZoneInfoProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.ErasureCodingZoneInfoProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ECSchemaOptionEntryProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ECSchemaProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockKeyProto;
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockKeyProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockStoragePolicyProto;
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockStoragePolicyProto;
|
||||||
@ -1505,7 +1505,8 @@ public static HdfsFileStatus convert(HdfsFileStatusProto fs) {
|
|||||||
fs.hasChildrenNum() ? fs.getChildrenNum() : -1,
|
fs.hasChildrenNum() ? fs.getChildrenNum() : -1,
|
||||||
fs.hasFileEncryptionInfo() ? convert(fs.getFileEncryptionInfo()) : null,
|
fs.hasFileEncryptionInfo() ? convert(fs.getFileEncryptionInfo()) : null,
|
||||||
fs.hasStoragePolicy() ? (byte) fs.getStoragePolicy()
|
fs.hasStoragePolicy() ? (byte) fs.getStoragePolicy()
|
||||||
: HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED);
|
: HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED,
|
||||||
|
fs.hasEcSchema() ? PBHelper.convertECSchema(fs.getEcSchema()) : null);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static SnapshottableDirectoryStatus convert(
|
public static SnapshottableDirectoryStatus convert(
|
||||||
@ -1566,6 +1567,9 @@ public static HdfsFileStatusProto convert(HdfsFileStatus fs) {
|
|||||||
builder.setLocations(PBHelper.convert(locations));
|
builder.setLocations(PBHelper.convert(locations));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if(fs.getECSchema() != null) {
|
||||||
|
builder.setEcSchema(PBHelper.convertECSchema(fs.getECSchema()));
|
||||||
|
}
|
||||||
return builder.build();
|
return builder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -19,6 +19,7 @@
|
|||||||
package org.apache.hadoop.hdfs.server.namenode;
|
package org.apache.hadoop.hdfs.server.namenode;
|
||||||
|
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
|
|
||||||
import org.apache.commons.io.Charsets;
|
import org.apache.commons.io.Charsets;
|
||||||
import org.apache.hadoop.fs.ContentSummary;
|
import org.apache.hadoop.fs.ContentSummary;
|
||||||
import org.apache.hadoop.fs.DirectoryListingStartAfterNotFoundException;
|
import org.apache.hadoop.fs.DirectoryListingStartAfterNotFoundException;
|
||||||
@ -38,6 +39,7 @@
|
|||||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectorySnapshottableFeature;
|
import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectorySnapshottableFeature;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
|
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
|
||||||
import org.apache.hadoop.hdfs.util.ReadOnlyList;
|
import org.apache.hadoop.hdfs.util.ReadOnlyList;
|
||||||
|
import org.apache.hadoop.io.erasurecode.ECSchema;
|
||||||
|
|
||||||
import java.io.FileNotFoundException;
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
@ -315,7 +317,7 @@ static HdfsFileStatus getFileInfo(
|
|||||||
if (fsd.getINode4DotSnapshot(srcs) != null) {
|
if (fsd.getINode4DotSnapshot(srcs) != null) {
|
||||||
return new HdfsFileStatus(0, true, 0, 0, 0, 0, null, null, null, null,
|
return new HdfsFileStatus(0, true, 0, 0, 0, 0, null, null, null, null,
|
||||||
HdfsFileStatus.EMPTY_NAME, -1L, 0, null,
|
HdfsFileStatus.EMPTY_NAME, -1L, 0, null,
|
||||||
HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED);
|
HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED, null);
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
@ -382,7 +384,9 @@ static HdfsFileStatus createFileStatus(
|
|||||||
|
|
||||||
final FileEncryptionInfo feInfo = isRawPath ? null :
|
final FileEncryptionInfo feInfo = isRawPath ? null :
|
||||||
fsd.getFileEncryptionInfo(node, snapshot, iip);
|
fsd.getFileEncryptionInfo(node, snapshot, iip);
|
||||||
|
|
||||||
|
final ECSchema schema = fsd.getECSchema(iip);
|
||||||
|
|
||||||
if (node.isFile()) {
|
if (node.isFile()) {
|
||||||
final INodeFile fileNode = node.asFile();
|
final INodeFile fileNode = node.asFile();
|
||||||
size = fileNode.computeFileSize(snapshot);
|
size = fileNode.computeFileSize(snapshot);
|
||||||
@ -412,7 +416,8 @@ static HdfsFileStatus createFileStatus(
|
|||||||
node.getId(),
|
node.getId(),
|
||||||
childrenNum,
|
childrenNum,
|
||||||
feInfo,
|
feInfo,
|
||||||
storagePolicy);
|
storagePolicy,
|
||||||
|
schema);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static INodeAttributes getINodeAttributes(
|
private static INodeAttributes getINodeAttributes(
|
||||||
@ -459,7 +464,8 @@ private static HdfsLocatedFileStatus createLocatedFileStatus(
|
|||||||
}
|
}
|
||||||
int childrenNum = node.isDirectory() ?
|
int childrenNum = node.isDirectory() ?
|
||||||
node.asDirectory().getChildrenNum(snapshot) : 0;
|
node.asDirectory().getChildrenNum(snapshot) : 0;
|
||||||
|
final ECSchema schema = fsd.getECSchema(iip);
|
||||||
|
|
||||||
HdfsLocatedFileStatus status =
|
HdfsLocatedFileStatus status =
|
||||||
new HdfsLocatedFileStatus(size, node.isDirectory(), replication,
|
new HdfsLocatedFileStatus(size, node.isDirectory(), replication,
|
||||||
blocksize, node.getModificationTime(snapshot),
|
blocksize, node.getModificationTime(snapshot),
|
||||||
@ -467,7 +473,7 @@ private static HdfsLocatedFileStatus createLocatedFileStatus(
|
|||||||
getPermissionForFileStatus(nodeAttrs, isEncrypted),
|
getPermissionForFileStatus(nodeAttrs, isEncrypted),
|
||||||
nodeAttrs.getUserName(), nodeAttrs.getGroupName(),
|
nodeAttrs.getUserName(), nodeAttrs.getGroupName(),
|
||||||
node.isSymlink() ? node.asSymlink().getSymlink() : null, path,
|
node.isSymlink() ? node.asSymlink().getSymlink() : null, path,
|
||||||
node.getId(), loc, childrenNum, feInfo, storagePolicy);
|
node.getId(), loc, childrenNum, feInfo, storagePolicy, schema);
|
||||||
// Set caching information for the located blocks.
|
// Set caching information for the located blocks.
|
||||||
if (loc != null) {
|
if (loc != null) {
|
||||||
CacheManager cacheManager = fsd.getFSNamesystem().getCacheManager();
|
CacheManager cacheManager = fsd.getFSNamesystem().getCacheManager();
|
||||||
|
@ -23,25 +23,6 @@ package hadoop.hdfs;
|
|||||||
|
|
||||||
import "hdfs.proto";
|
import "hdfs.proto";
|
||||||
|
|
||||||
/**
|
|
||||||
* ECSchema options entry
|
|
||||||
*/
|
|
||||||
message ECSchemaOptionEntryProto {
|
|
||||||
required string key = 1;
|
|
||||||
required string value = 2;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* ECSchema for erasurecoding
|
|
||||||
*/
|
|
||||||
message ECSchemaProto {
|
|
||||||
required string schemaName = 1;
|
|
||||||
required string codecName = 2;
|
|
||||||
required uint32 dataUnits = 3;
|
|
||||||
required uint32 parityUnits = 4;
|
|
||||||
repeated ECSchemaOptionEntryProto options = 5;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* ErasureCodingInfo
|
* ErasureCodingInfo
|
||||||
*/
|
*/
|
||||||
|
@ -303,6 +303,25 @@ message LocatedBlocksProto {
|
|||||||
optional FileEncryptionInfoProto fileEncryptionInfo = 6;
|
optional FileEncryptionInfoProto fileEncryptionInfo = 6;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* ECSchema options entry
|
||||||
|
*/
|
||||||
|
message ECSchemaOptionEntryProto {
|
||||||
|
required string key = 1;
|
||||||
|
required string value = 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* ECSchema for erasurecoding
|
||||||
|
*/
|
||||||
|
message ECSchemaProto {
|
||||||
|
required string schemaName = 1;
|
||||||
|
required string codecName = 2;
|
||||||
|
required uint32 dataUnits = 3;
|
||||||
|
required uint32 parityUnits = 4;
|
||||||
|
repeated ECSchemaOptionEntryProto options = 5;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Status of a file, directory or symlink
|
* Status of a file, directory or symlink
|
||||||
* Optionally includes a file's block locations if requested by client on the rpc call.
|
* Optionally includes a file's block locations if requested by client on the rpc call.
|
||||||
@ -337,6 +356,9 @@ message HdfsFileStatusProto {
|
|||||||
optional FileEncryptionInfoProto fileEncryptionInfo = 15;
|
optional FileEncryptionInfoProto fileEncryptionInfo = 15;
|
||||||
|
|
||||||
optional uint32 storagePolicy = 16 [default = 0]; // block storage policy id
|
optional uint32 storagePolicy = 16 [default = 0]; // block storage policy id
|
||||||
|
|
||||||
|
// Optional field for erasure coding
|
||||||
|
optional ECSchemaProto ecSchema = 17;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -255,12 +255,12 @@ public Object answer(InvocationOnMock invocation)
|
|||||||
Mockito.doReturn(
|
Mockito.doReturn(
|
||||||
new HdfsFileStatus(0, false, 1, 1024, 0, 0, new FsPermission(
|
new HdfsFileStatus(0, false, 1, 1024, 0, 0, new FsPermission(
|
||||||
(short) 777), "owner", "group", new byte[0], new byte[0],
|
(short) 777), "owner", "group", new byte[0], new byte[0],
|
||||||
1010, 0, null, (byte) 0)).when(mockNN).getFileInfo(anyString());
|
1010, 0, null, (byte) 0, null)).when(mockNN).getFileInfo(anyString());
|
||||||
|
|
||||||
Mockito.doReturn(
|
Mockito.doReturn(
|
||||||
new HdfsFileStatus(0, false, 1, 1024, 0, 0, new FsPermission(
|
new HdfsFileStatus(0, false, 1, 1024, 0, 0, new FsPermission(
|
||||||
(short) 777), "owner", "group", new byte[0], new byte[0],
|
(short) 777), "owner", "group", new byte[0], new byte[0],
|
||||||
1010, 0, null, (byte) 0))
|
1010, 0, null, (byte) 0, null))
|
||||||
.when(mockNN)
|
.when(mockNN)
|
||||||
.create(anyString(), (FsPermission) anyObject(), anyString(),
|
.create(anyString(), (FsPermission) anyObject(), anyString(),
|
||||||
(EnumSetWritable<CreateFlag>) anyObject(), anyBoolean(),
|
(EnumSetWritable<CreateFlag>) anyObject(), anyBoolean(),
|
||||||
|
@ -24,7 +24,6 @@
|
|||||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.protocol.ErasureCodingInfo;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||||
@ -36,6 +35,7 @@
|
|||||||
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
|
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.ErasureCodingSchemaManager;
|
import org.apache.hadoop.hdfs.server.namenode.ErasureCodingSchemaManager;
|
||||||
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
|
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
|
||||||
|
import org.apache.hadoop.io.erasurecode.ECSchema;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
@ -54,8 +54,7 @@ public class TestDFSStripedInputStream {
|
|||||||
private DistributedFileSystem fs;
|
private DistributedFileSystem fs;
|
||||||
private final Path dirPath = new Path("/striped");
|
private final Path dirPath = new Path("/striped");
|
||||||
private Path filePath = new Path(dirPath, "file");
|
private Path filePath = new Path(dirPath, "file");
|
||||||
private ErasureCodingInfo info = new ErasureCodingInfo(filePath.toString(),
|
private final ECSchema schema = ErasureCodingSchemaManager.getSystemDefaultSchema();
|
||||||
ErasureCodingSchemaManager.getSystemDefaultSchema());
|
|
||||||
private final short DATA_BLK_NUM = HdfsConstants.NUM_DATA_BLOCKS;
|
private final short DATA_BLK_NUM = HdfsConstants.NUM_DATA_BLOCKS;
|
||||||
private final short PARITY_BLK_NUM = HdfsConstants.NUM_PARITY_BLOCKS;
|
private final short PARITY_BLK_NUM = HdfsConstants.NUM_PARITY_BLOCKS;
|
||||||
private final int CELLSIZE = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
|
private final int CELLSIZE = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
|
||||||
@ -92,8 +91,8 @@ public void testGetBlock() throws Exception {
|
|||||||
NUM_STRIPE_PER_BLOCK, false);
|
NUM_STRIPE_PER_BLOCK, false);
|
||||||
LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(
|
LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(
|
||||||
filePath.toString(), 0, BLOCK_GROUP_SIZE * numBlocks);
|
filePath.toString(), 0, BLOCK_GROUP_SIZE * numBlocks);
|
||||||
final DFSStripedInputStream in =
|
final DFSStripedInputStream in = new DFSStripedInputStream(fs.getClient(),
|
||||||
new DFSStripedInputStream(fs.getClient(), filePath.toString(), false, info);
|
filePath.toString(), false, schema);
|
||||||
|
|
||||||
List<LocatedBlock> lbList = lbs.getLocatedBlocks();
|
List<LocatedBlock> lbList = lbs.getLocatedBlocks();
|
||||||
for (LocatedBlock aLbList : lbList) {
|
for (LocatedBlock aLbList : lbList) {
|
||||||
@ -129,7 +128,7 @@ public void testPread() throws Exception {
|
|||||||
}
|
}
|
||||||
DFSStripedInputStream in =
|
DFSStripedInputStream in =
|
||||||
new DFSStripedInputStream(fs.getClient(),
|
new DFSStripedInputStream(fs.getClient(),
|
||||||
filePath.toString(), false, info);
|
filePath.toString(), false, schema);
|
||||||
int readSize = BLOCK_GROUP_SIZE;
|
int readSize = BLOCK_GROUP_SIZE;
|
||||||
byte[] readBuffer = new byte[readSize];
|
byte[] readBuffer = new byte[readSize];
|
||||||
int ret = in.read(0, readBuffer, 0, readSize);
|
int ret = in.read(0, readBuffer, 0, readSize);
|
||||||
@ -156,8 +155,7 @@ private void testStatefulRead(boolean useByteBuffer, boolean cellMisalignPacket)
|
|||||||
}
|
}
|
||||||
DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks,
|
DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks,
|
||||||
NUM_STRIPE_PER_BLOCK, false);
|
NUM_STRIPE_PER_BLOCK, false);
|
||||||
LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(
|
LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(filePath.toString(), 0, fileSize);
|
||||||
filePath.toString(), 0, fileSize);
|
|
||||||
|
|
||||||
assert lbs.getLocatedBlocks().size() == numBlocks;
|
assert lbs.getLocatedBlocks().size() == numBlocks;
|
||||||
for (LocatedBlock lb : lbs.getLocatedBlocks()) {
|
for (LocatedBlock lb : lbs.getLocatedBlocks()) {
|
||||||
@ -175,7 +173,7 @@ private void testStatefulRead(boolean useByteBuffer, boolean cellMisalignPacket)
|
|||||||
|
|
||||||
DFSStripedInputStream in =
|
DFSStripedInputStream in =
|
||||||
new DFSStripedInputStream(fs.getClient(), filePath.toString(),
|
new DFSStripedInputStream(fs.getClient(), filePath.toString(),
|
||||||
false, info);
|
false, schema);
|
||||||
|
|
||||||
byte[] expected = new byte[fileSize];
|
byte[] expected = new byte[fileSize];
|
||||||
|
|
||||||
|
@ -737,7 +737,7 @@ private static void mockCreate(ClientProtocol mcp,
|
|||||||
version, new byte[suite.getAlgorithmBlockSize()],
|
version, new byte[suite.getAlgorithmBlockSize()],
|
||||||
new byte[suite.getAlgorithmBlockSize()],
|
new byte[suite.getAlgorithmBlockSize()],
|
||||||
"fakeKey", "fakeVersion"),
|
"fakeKey", "fakeVersion"),
|
||||||
(byte) 0))
|
(byte) 0, null))
|
||||||
.when(mcp)
|
.when(mcp)
|
||||||
.create(anyString(), (FsPermission) anyObject(), anyString(),
|
.create(anyString(), (FsPermission) anyObject(), anyString(),
|
||||||
(EnumSetWritable<CreateFlag>) anyObject(), anyBoolean(),
|
(EnumSetWritable<CreateFlag>) anyObject(), anyBoolean(),
|
||||||
|
@ -0,0 +1,65 @@
|
|||||||
|
package org.apache.hadoop.hdfs;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.ErasureCodingSchemaManager;
|
||||||
|
import org.apache.hadoop.io.erasurecode.ECSchema;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class TestFileStatusWithECschema {
|
||||||
|
private MiniDFSCluster cluster;
|
||||||
|
private DistributedFileSystem fs;
|
||||||
|
private DFSClient client;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void before() throws IOException {
|
||||||
|
cluster =
|
||||||
|
new MiniDFSCluster.Builder(new Configuration()).numDataNodes(1).build();
|
||||||
|
cluster.waitActive();
|
||||||
|
fs = cluster.getFileSystem();
|
||||||
|
client = fs.getClient();
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void after() {
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFileStatusWithECschema() throws Exception {
|
||||||
|
// test directory not in EC zone
|
||||||
|
final Path dir = new Path("/foo");
|
||||||
|
assertTrue(fs.mkdir(dir, FsPermission.getDirDefault()));
|
||||||
|
assertNull(client.getFileInfo(dir.toString()).getECSchema());
|
||||||
|
// test file not in EC zone
|
||||||
|
final Path file = new Path(dir, "foo");
|
||||||
|
fs.create(file).close();
|
||||||
|
assertNull(client.getFileInfo(file.toString()).getECSchema());
|
||||||
|
fs.delete(file, true);
|
||||||
|
|
||||||
|
final ECSchema schema1 = ErasureCodingSchemaManager.getSystemDefaultSchema();
|
||||||
|
// create EC zone on dir
|
||||||
|
fs.createErasureCodingZone(dir, schema1);
|
||||||
|
final ECSchema schame2 = client.getFileInfo(dir.toUri().getPath()).getECSchema();
|
||||||
|
assertNotNull(schame2);
|
||||||
|
assertTrue(schema1.equals(schame2));
|
||||||
|
|
||||||
|
// test file in EC zone
|
||||||
|
fs.create(file).close();
|
||||||
|
final ECSchema schame3 =
|
||||||
|
fs.getClient().getFileInfo(file.toUri().getPath()).getECSchema();
|
||||||
|
assertNotNull(schame3);
|
||||||
|
assertTrue(schema1.equals(schame3));
|
||||||
|
}
|
||||||
|
}
|
@ -354,12 +354,12 @@ public void testFactory() throws Exception {
|
|||||||
Mockito.doReturn(
|
Mockito.doReturn(
|
||||||
new HdfsFileStatus(0, false, 1, 1024, 0, 0, new FsPermission(
|
new HdfsFileStatus(0, false, 1, 1024, 0, 0, new FsPermission(
|
||||||
(short) 777), "owner", "group", new byte[0], new byte[0],
|
(short) 777), "owner", "group", new byte[0], new byte[0],
|
||||||
1010, 0, null, (byte) 0)).when(mcp).getFileInfo(anyString());
|
1010, 0, null, (byte) 0, null)).when(mcp).getFileInfo(anyString());
|
||||||
Mockito
|
Mockito
|
||||||
.doReturn(
|
.doReturn(
|
||||||
new HdfsFileStatus(0, false, 1, 1024, 0, 0, new FsPermission(
|
new HdfsFileStatus(0, false, 1, 1024, 0, 0, new FsPermission(
|
||||||
(short) 777), "owner", "group", new byte[0], new byte[0],
|
(short) 777), "owner", "group", new byte[0], new byte[0],
|
||||||
1010, 0, null, (byte) 0))
|
1010, 0, null, (byte) 0, null))
|
||||||
.when(mcp)
|
.when(mcp)
|
||||||
.create(anyString(), (FsPermission) anyObject(), anyString(),
|
.create(anyString(), (FsPermission) anyObject(), anyString(),
|
||||||
(EnumSetWritable<CreateFlag>) anyObject(), anyBoolean(),
|
(EnumSetWritable<CreateFlag>) anyObject(), anyBoolean(),
|
||||||
|
@ -1198,7 +1198,7 @@ public void testFsckFileNotFound() throws Exception {
|
|||||||
|
|
||||||
HdfsFileStatus file = new HdfsFileStatus(length, isDir, blockReplication,
|
HdfsFileStatus file = new HdfsFileStatus(length, isDir, blockReplication,
|
||||||
blockSize, modTime, accessTime, perms, owner, group, symlink,
|
blockSize, modTime, accessTime, perms, owner, group, symlink,
|
||||||
path, fileId, numChildren, null, storagePolicy);
|
path, fileId, numChildren, null, storagePolicy, null);
|
||||||
Result res = new Result(conf);
|
Result res = new Result(conf);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
@ -65,7 +65,7 @@ public void testHdfsFileStatus() throws IOException {
|
|||||||
final HdfsFileStatus status = new HdfsFileStatus(1001L, false, 3, 1L << 26,
|
final HdfsFileStatus status = new HdfsFileStatus(1001L, false, 3, 1L << 26,
|
||||||
now, now + 10, new FsPermission((short) 0644), "user", "group",
|
now, now + 10, new FsPermission((short) 0644), "user", "group",
|
||||||
DFSUtil.string2Bytes("bar"), DFSUtil.string2Bytes("foo"),
|
DFSUtil.string2Bytes("bar"), DFSUtil.string2Bytes("foo"),
|
||||||
HdfsConstants.GRANDFATHER_INODE_ID, 0, null, (byte) 0);
|
HdfsConstants.GRANDFATHER_INODE_ID, 0, null, (byte) 0, null);
|
||||||
final FileStatus fstatus = toFileStatus(status, parent);
|
final FileStatus fstatus = toFileStatus(status, parent);
|
||||||
System.out.println("status = " + status);
|
System.out.println("status = " + status);
|
||||||
System.out.println("fstatus = " + fstatus);
|
System.out.println("fstatus = " + fstatus);
|
||||||
|
Loading…
Reference in New Issue
Block a user