HDFS-4078. Handle replication in snapshots.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2802@1400743 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2012-10-22 00:11:25 +00:00
parent f29724956a
commit 9a0651b4b8
10 changed files with 70 additions and 29 deletions

View File

@ -18,3 +18,5 @@ Branch-2802 Snapshot (Unreleased)
HDFS-4079. Add SnapshotManager which maintains a list for all the HDFS-4079. Add SnapshotManager which maintains a list for all the
snapshottable directories and supports snapshot methods such as setting a snapshottable directories and supports snapshot methods such as setting a
directory to snapshottable and creating a snapshot. (szetszwo) directory to snapshottable and creating a snapshot. (szetszwo)
HDFS-4078. Handle replication in snapshots. (szetszwo)

View File

@ -315,9 +315,19 @@ INodeFileSnapshot addFileSnapshot(String srcPath, String dstPath
//add destination snaplink //add destination snaplink
snapshot = addNode(dstPath, snapshot, UNKNOWN_DISK_SPACE); snapshot = addNode(dstPath, snapshot, UNKNOWN_DISK_SPACE);
if (snapshot != null && src.getClass() == INodeFile.class) { final INodeFileWithLink srcWithLink;
//created a snapshot and the source is an INodeFile, replace the source. if (snapshot != null) {
replaceNode(srcPath, src, new INodeFileWithLink(src)); //added snapshot node successfully, check source type,
if (src instanceof INodeFileWithLink) {
srcWithLink = (INodeFileWithLink)src;
} else {
//source is an INodeFile, replace the source.
srcWithLink = new INodeFileWithLink(src);
replaceNode(srcPath, src, srcWithLink);
}
//insert the snapshot to src's linked list.
srcWithLink.insert(snapshot);
} }
} finally { } finally {
writeUnlock(); writeUnlock();
@ -384,13 +394,13 @@ BlockInfo addBlock(String path,
// check quota limits and updated space consumed // check quota limits and updated space consumed
updateCount(inodes, inodes.length-1, 0, updateCount(inodes, inodes.length-1, 0,
fileINode.getPreferredBlockSize()*fileINode.getBlockReplication(), true); fileINode.getPreferredBlockSize()*fileINode.getFileReplication(), true);
// associate new last block for the file // associate new last block for the file
BlockInfoUnderConstruction blockInfo = BlockInfoUnderConstruction blockInfo =
new BlockInfoUnderConstruction( new BlockInfoUnderConstruction(
block, block,
fileINode.getBlockReplication(), fileINode.getFileReplication(),
BlockUCState.UNDER_CONSTRUCTION, BlockUCState.UNDER_CONSTRUCTION,
targets); targets);
getBlockManager().addBlockCollection(blockInfo, fileINode); getBlockManager().addBlockCollection(blockInfo, fileINode);
@ -481,7 +491,7 @@ void unprotectedRemoveBlock(String path, INodeFileUnderConstruction fileNode,
// update space consumed // update space consumed
INode[] pathINodes = getExistingPathINodes(path); INode[] pathINodes = getExistingPathINodes(path);
updateCount(pathINodes, pathINodes.length-1, 0, updateCount(pathINodes, pathINodes.length-1, 0,
-fileNode.getPreferredBlockSize()*fileNode.getBlockReplication(), true); -fileNode.getPreferredBlockSize()*fileNode.getFileReplication(), true);
} }
/** /**
@ -860,13 +870,13 @@ Block[] unprotectedSetReplication(String src,
return null; return null;
} }
INodeFile fileNode = (INodeFile)inode; INodeFile fileNode = (INodeFile)inode;
final short oldRepl = fileNode.getBlockReplication(); final short oldRepl = fileNode.getFileReplication();
// check disk quota // check disk quota
long dsDelta = (replication - oldRepl) * (fileNode.diskspaceConsumed()/oldRepl); long dsDelta = (replication - oldRepl) * (fileNode.diskspaceConsumed()/oldRepl);
updateCount(inodes, inodes.length-1, 0, dsDelta, true); updateCount(inodes, inodes.length-1, 0, dsDelta, true);
fileNode.setReplication(replication); fileNode.setFileReplication(replication);
if (oldReplication != null) { if (oldReplication != null) {
oldReplication[0] = oldRepl; oldReplication[0] = oldRepl;
@ -2124,7 +2134,7 @@ private HdfsFileStatus createFileStatus(byte[] path, INode node) {
if (node instanceof INodeFile) { if (node instanceof INodeFile) {
INodeFile fileNode = (INodeFile)node; INodeFile fileNode = (INodeFile)node;
size = fileNode.computeFileSize(true); size = fileNode.computeFileSize(true);
replication = fileNode.getBlockReplication(); replication = fileNode.getFileReplication();
blocksize = fileNode.getPreferredBlockSize(); blocksize = fileNode.getPreferredBlockSize();
} }
return new HdfsFileStatus( return new HdfsFileStatus(
@ -2154,7 +2164,7 @@ private HdfsLocatedFileStatus createLocatedFileStatus(
if (node instanceof INodeFile) { if (node instanceof INodeFile) {
INodeFile fileNode = (INodeFile)node; INodeFile fileNode = (INodeFile)node;
size = fileNode.computeFileSize(true); size = fileNode.computeFileSize(true);
replication = fileNode.getBlockReplication(); replication = fileNode.getFileReplication();
blocksize = fileNode.getPreferredBlockSize(); blocksize = fileNode.getPreferredBlockSize();
loc = getFSNamesystem().getBlockManager().createLocatedBlocks( loc = getFSNamesystem().getBlockManager().createLocatedBlocks(
fileNode.getBlocks(), fileNode.computeFileSize(false), fileNode.getBlocks(), fileNode.computeFileSize(false),

View File

@ -661,7 +661,7 @@ private void printStatistics(boolean force) {
public void logOpenFile(String path, INodeFileUnderConstruction newNode) { public void logOpenFile(String path, INodeFileUnderConstruction newNode) {
AddOp op = AddOp.getInstance(cache.get()) AddOp op = AddOp.getInstance(cache.get())
.setPath(path) .setPath(path)
.setReplication(newNode.getBlockReplication()) .setReplication(newNode.getFileReplication())
.setModificationTime(newNode.getModificationTime()) .setModificationTime(newNode.getModificationTime())
.setAccessTime(newNode.getAccessTime()) .setAccessTime(newNode.getAccessTime())
.setBlockSize(newNode.getPreferredBlockSize()) .setBlockSize(newNode.getPreferredBlockSize())
@ -679,7 +679,7 @@ public void logOpenFile(String path, INodeFileUnderConstruction newNode) {
public void logCloseFile(String path, INodeFile newNode) { public void logCloseFile(String path, INodeFile newNode) {
CloseOp op = CloseOp.getInstance(cache.get()) CloseOp op = CloseOp.getInstance(cache.get())
.setPath(path) .setPath(path)
.setReplication(newNode.getBlockReplication()) .setReplication(newNode.getFileReplication())
.setModificationTime(newNode.getModificationTime()) .setModificationTime(newNode.getModificationTime())
.setAccessTime(newNode.getAccessTime()) .setAccessTime(newNode.getAccessTime())
.setBlockSize(newNode.getPreferredBlockSize()) .setBlockSize(newNode.getPreferredBlockSize())

View File

@ -126,7 +126,7 @@ static void writeINodeUnderConstruction(DataOutputStream out,
String path) String path)
throws IOException { throws IOException {
writeString(path, out); writeString(path, out);
out.writeShort(cons.getBlockReplication()); out.writeShort(cons.getFileReplication());
out.writeLong(cons.getModificationTime()); out.writeLong(cons.getModificationTime());
out.writeLong(cons.getPreferredBlockSize()); out.writeLong(cons.getPreferredBlockSize());
int nrBlocks = cons.getBlocks().length; int nrBlocks = cons.getBlocks().length;
@ -175,7 +175,7 @@ static void saveINode2Image(INode node,
filePerm); filePerm);
} else { } else {
INodeFile fileINode = (INodeFile)node; INodeFile fileINode = (INodeFile)node;
out.writeShort(fileINode.getBlockReplication()); out.writeShort(fileINode.getFileReplication());
out.writeLong(fileINode.getModificationTime()); out.writeLong(fileINode.getModificationTime());
out.writeLong(fileINode.getAccessTime()); out.writeLong(fileINode.getAccessTime());
out.writeLong(fileINode.getPreferredBlockSize()); out.writeLong(fileINode.getPreferredBlockSize());

View File

@ -1414,7 +1414,7 @@ private void concatInternal(String target, String [] srcs)
} }
si.add(trgInode); si.add(trgInode);
short repl = trgInode.getBlockReplication(); final short repl = trgInode.getFileReplication();
// now check the srcs // now check the srcs
boolean endSrc = false; // final src file doesn't have to have full end block boolean endSrc = false; // final src file doesn't have to have full end block
@ -1434,10 +1434,10 @@ private void concatInternal(String target, String [] srcs)
} }
// check replication and blocks size // check replication and blocks size
if(repl != srcInode.getBlockReplication()) { if(repl != srcInode.getFileReplication()) {
throw new IllegalArgumentException(src + " and " + target + " " + throw new IllegalArgumentException(src + " and " + target + " " +
"should have same replication: " "should have same replication: "
+ repl + " vs. " + srcInode.getBlockReplication()); + repl + " vs. " + srcInode.getFileReplication());
} }
//boolean endBlock=false; //boolean endBlock=false;
@ -1878,9 +1878,10 @@ private LocatedBlock startFileInternal(String src,
LocatedBlock prepareFileForWrite(String src, INodeFile file, LocatedBlock prepareFileForWrite(String src, INodeFile file,
String leaseHolder, String clientMachine, DatanodeDescriptor clientNode, String leaseHolder, String clientMachine, DatanodeDescriptor clientNode,
boolean writeToEditLog) throws IOException { boolean writeToEditLog) throws IOException {
//TODO SNAPSHOT: INodeFileUnderConstruction with link
INodeFileUnderConstruction cons = new INodeFileUnderConstruction( INodeFileUnderConstruction cons = new INodeFileUnderConstruction(
file.getLocalNameBytes(), file.getLocalNameBytes(),
file.getBlockReplication(), file.getFileReplication(),
file.getModificationTime(), file.getModificationTime(),
file.getPreferredBlockSize(), file.getPreferredBlockSize(),
file.getBlocks(), file.getBlocks(),
@ -2194,7 +2195,7 @@ LocatedBlock getAdditionalBlock(String src,
fileLength = pendingFile.computeContentSummary().getLength(); fileLength = pendingFile.computeContentSummary().getLength();
blockSize = pendingFile.getPreferredBlockSize(); blockSize = pendingFile.getPreferredBlockSize();
clientNode = pendingFile.getClientNode(); clientNode = pendingFile.getClientNode();
replication = pendingFile.getBlockReplication(); replication = pendingFile.getFileReplication();
} finally { } finally {
writeUnlock(); writeUnlock();
} }
@ -3157,7 +3158,7 @@ private void commitOrCompleteLastBlock(final INodeFileUnderConstruction fileINod
if (diff > 0) { if (diff > 0) {
try { try {
String path = leaseManager.findPath(fileINode); String path = leaseManager.findPath(fileINode);
dir.updateSpaceConsumed(path, 0, -diff * fileINode.getBlockReplication()); dir.updateSpaceConsumed(path, 0, -diff*fileINode.getFileReplication());
} catch (IOException e) { } catch (IOException e) {
LOG.warn("Unexpected exception while updating disk space.", e); LOG.warn("Unexpected exception while updating disk space.", e);
} }

View File

@ -49,13 +49,13 @@ public class INodeFile extends INode implements BlockCollection {
short replication, long modificationTime, short replication, long modificationTime,
long atime, long preferredBlockSize) { long atime, long preferredBlockSize) {
super(permissions, modificationTime, atime); super(permissions, modificationTime, atime);
this.setReplication(replication); this.setFileReplication(replication);
this.setPreferredBlockSize(preferredBlockSize); this.setPreferredBlockSize(preferredBlockSize);
blocks = blklist; blocks = blklist;
} }
protected INodeFile(INodeFile f) { protected INodeFile(INodeFile f) {
this(f.getPermissionStatus(), f.getBlocks(), f.getBlockReplication(), this(f.getPermissionStatus(), f.getBlocks(), f.getFileReplication(),
f.getModificationTime(), f.getAccessTime(), f.getPreferredBlockSize()); f.getModificationTime(), f.getAccessTime(), f.getPreferredBlockSize());
} }
@ -75,12 +75,16 @@ boolean isDirectory() {
} }
/** @return the replication factor of the file. */ /** @return the replication factor of the file. */
@Override public final short getFileReplication() {
public short getBlockReplication() {
return (short) ((header & HEADERMASK) >> BLOCKBITS); return (short) ((header & HEADERMASK) >> BLOCKBITS);
} }
void setReplication(short replication) { @Override
public short getBlockReplication() {
return getFileReplication();
}
void setFileReplication(short replication) {
if(replication <= 0) if(replication <= 0)
throw new IllegalArgumentException("Unexpected value for the replication"); throw new IllegalArgumentException("Unexpected value for the replication");
header = ((long)replication << BLOCKBITS) | (header & ~HEADERMASK); header = ((long)replication << BLOCKBITS) | (header & ~HEADERMASK);
@ -220,7 +224,7 @@ private long diskspaceConsumed(Block[] blkArr) {
isUnderConstruction()) { isUnderConstruction()) {
size += getPreferredBlockSize() - blkArr[blkArr.length-1].getNumBytes(); size += getPreferredBlockSize() - blkArr[blkArr.length-1].getNumBytes();
} }
return size * getBlockReplication(); return size * getFileReplication();
} }
/** /**

View File

@ -102,9 +102,10 @@ INodeFile convertToInodeFile() {
assert allBlocksComplete() : assert allBlocksComplete() :
"Can't finalize inode " + this + " since it contains " + "Can't finalize inode " + this + " since it contains " +
"non-complete blocks! Blocks are: " + blocksAsString(); "non-complete blocks! Blocks are: " + blocksAsString();
//TODO SNAPSHOT: may convert to INodeFileWithLink
INodeFile obj = new INodeFile(getPermissionStatus(), INodeFile obj = new INodeFile(getPermissionStatus(),
getBlocks(), getBlocks(),
getBlockReplication(), getFileReplication(),
getModificationTime(), getModificationTime(),
getModificationTime(), getModificationTime(),
getPreferredBlockSize()); getPreferredBlockSize());

View File

@ -834,7 +834,7 @@ public void toXML(XMLOutputter doc) throws IOException {
doc.endTag(); doc.endTag();
doc.startTag("replication"); doc.startTag("replication");
doc.pcdata(""+inode.getBlockReplication()); doc.pcdata(""+inode.getFileReplication());
doc.endTag(); doc.endTag();
doc.startTag("disk_space_consumed"); doc.startTag("disk_space_consumed");

View File

@ -33,6 +33,7 @@ public class INodeFileWithLink extends INodeFile {
public INodeFileWithLink(INodeFile f) { public INodeFileWithLink(INodeFile f) {
super(f); super(f);
next = this;
} }
void setNext(INodeFileWithLink next) { void setNext(INodeFileWithLink next) {
@ -42,4 +43,26 @@ void setNext(INodeFileWithLink next) {
INodeFileWithLink getNext() { INodeFileWithLink getNext() {
return next; return next;
} }
/** Insert inode to the circular linked list. */
public void insert(INodeFileWithLink inode) {
inode.setNext(this.getNext());
this.setNext(inode);
}
/**
* @return the max file replication of the elements
* in the circular linked list.
*/
@Override
public short getBlockReplication() {
short max = getFileReplication();
for(INodeFileWithLink i = next; i != this; i = i.getNext()) {
final short replication = i.getFileReplication();
if (replication > max) {
max = replication;
}
}
return max;
}
} }

View File

@ -48,7 +48,7 @@ public void testReplication () {
FsPermission.getDefault()), null, replication, FsPermission.getDefault()), null, replication,
0L, 0L, preferredBlockSize); 0L, 0L, preferredBlockSize);
assertEquals("True has to be returned in this case", replication, assertEquals("True has to be returned in this case", replication,
inf.getBlockReplication()); inf.getFileReplication());
} }
/** /**