HDFS-4098. Add FileWithSnapshot, INodeFileUnderConstructionWithSnapshot and INodeFileUnderConstructionSnapshot for supporting append to snapshotted files.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2802@1434966 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
7856221d4a
commit
b71d386890
@ -109,3 +109,7 @@ Branch-2802 Snapshot (Unreleased)
|
||||
|
||||
HDFS-4407. Change INodeDirectoryWithSnapshot.Diff.combinePostDiff(..) to
|
||||
merge-sort like and keep the postDiff parameter unmodified. (szetszwo)
|
||||
|
||||
HDFS-4098. Add FileWithSnapshot, INodeFileUnderConstructionWithSnapshot and
|
||||
INodeFileUnderConstructionSnapshot for supporting append to snapshotted files.
|
||||
(szetszwo)
|
||||
|
@ -324,7 +324,7 @@ private long applyEditLogOp(FSEditLogOp op, FSDirectory fsDir,
|
||||
if (oldFile.isUnderConstruction()) {
|
||||
INodeFileUnderConstruction ucFile = (INodeFileUnderConstruction) oldFile;
|
||||
fsNamesys.leaseManager.removeLeaseWithPrefixPath(addCloseOp.path);
|
||||
INodeFile newFile = ucFile.convertToInodeFile(ucFile.getModificationTime());
|
||||
INodeFile newFile = ucFile.toINodeFile(ucFile.getModificationTime());
|
||||
fsDir.unprotectedReplaceINodeFile(addCloseOp.path, ucFile, newFile,
|
||||
iip.getLatestSnapshot());
|
||||
}
|
||||
|
@ -178,7 +178,7 @@
|
||||
import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMBean;
|
||||
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
|
||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable;
|
||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeFileWithLink;
|
||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeFileWithSnapshot;
|
||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
|
||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotManager;
|
||||
import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
|
||||
@ -1357,10 +1357,11 @@ private LocatedBlocks getBlockLocationsUpdateTimes(String src,
|
||||
doAccessTime = false;
|
||||
}
|
||||
|
||||
long now = now();
|
||||
final INodesInPath iip = dir.getMutableINodesInPath(src);
|
||||
final INodesInPath iip = dir.getINodesInPath(src);
|
||||
final INodeFile inode = INodeFile.valueOf(iip.getLastINode(), src);
|
||||
if (doAccessTime && isAccessTimeSupported()) {
|
||||
if (!iip.isSnapshot() //snapshots are readonly, so don't update atime.
|
||||
&& doAccessTime && isAccessTimeSupported()) {
|
||||
final long now = now();
|
||||
if (now <= inode.getAccessTime() + getAccessTimePrecision()) {
|
||||
// if we have to set access time but we only have the readlock, then
|
||||
// restart this entire operation with the writeLock.
|
||||
@ -1488,7 +1489,7 @@ private void concatInternal(String target, String [] srcs)
|
||||
throw new HadoopIllegalArgumentException("concat: target file "
|
||||
+ target + " is empty");
|
||||
}
|
||||
if (trgInode instanceof INodeFileWithLink) {
|
||||
if (trgInode instanceof INodeFileWithSnapshot) {
|
||||
throw new HadoopIllegalArgumentException("concat: target file "
|
||||
+ target + " is in a snapshot");
|
||||
}
|
||||
@ -1981,18 +1982,12 @@ private LocatedBlock startFileInternal(String src,
|
||||
LocatedBlock prepareFileForWrite(String src, INodeFile file,
|
||||
String leaseHolder, String clientMachine, DatanodeDescriptor clientNode,
|
||||
boolean writeToEditLog, Snapshot latestSnapshot) throws IOException {
|
||||
//TODO SNAPSHOT: INodeFileUnderConstruction with link
|
||||
INodeFileUnderConstruction cons = new INodeFileUnderConstruction(
|
||||
file.getId(),
|
||||
file.getLocalNameBytes(),
|
||||
file.getFileReplication(),
|
||||
file.getModificationTime(),
|
||||
file.getPreferredBlockSize(),
|
||||
file.getBlocks(),
|
||||
file.getPermissionStatus(),
|
||||
leaseHolder,
|
||||
clientMachine,
|
||||
clientNode);
|
||||
if (latestSnapshot != null) {
|
||||
file = (INodeFile)file.recordModification(latestSnapshot).left;
|
||||
}
|
||||
final INodeFileUnderConstruction cons = file.toUnderConstruction(
|
||||
leaseHolder, clientMachine, clientNode);
|
||||
|
||||
dir.replaceINodeFile(src, file, cons, latestSnapshot);
|
||||
leaseManager.addLease(cons.getClientName(), src);
|
||||
|
||||
@ -3301,7 +3296,7 @@ private void finalizeINodeFileUnderConstruction(String src,
|
||||
|
||||
// The file is no longer pending.
|
||||
// Create permanent INode, update blocks
|
||||
INodeFile newFile = pendingFile.convertToInodeFile(now());
|
||||
final INodeFile newFile = pendingFile.toINodeFile(now());
|
||||
dir.replaceINodeFile(src, pendingFile, newFile, latestSnapshot);
|
||||
|
||||
// close file and persist block allocations for this file
|
||||
|
@ -508,7 +508,7 @@ public long getAccessTime() {
|
||||
/**
|
||||
* Set last access time of inode.
|
||||
*/
|
||||
INode setAccessTime(long atime, Snapshot latest) {
|
||||
public INode setAccessTime(long atime, Snapshot latest) {
|
||||
Pair<? extends INode, ? extends INode> pair = recordModification(latest);
|
||||
INode nodeToUpdate = pair != null ? pair.left : this;
|
||||
nodeToUpdate.accessTime = atime;
|
||||
|
@ -32,7 +32,7 @@
|
||||
import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
|
||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable;
|
||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectoryWithSnapshot;
|
||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeFileWithLink;
|
||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeFileWithSnapshot;
|
||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
|
||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotAccessControlException;
|
||||
import org.apache.hadoop.hdfs.util.ReadOnlyList;
|
||||
@ -216,13 +216,13 @@ private final void replaceSelf(INodeDirectory newDir) {
|
||||
oldDir.setParent(null);
|
||||
}
|
||||
|
||||
/** Replace a child {@link INodeFile} with an {@link INodeFileWithLink}. */
|
||||
INodeFileWithLink replaceINodeFile(final INodeFile child) {
|
||||
/** Replace a child {@link INodeFile} with an {@link INodeFileWithSnapshot}. */
|
||||
INodeFileWithSnapshot replaceINodeFile(final INodeFile child) {
|
||||
assertChildrenNonNull();
|
||||
Preconditions.checkArgument(!(child instanceof INodeFileWithLink),
|
||||
Preconditions.checkArgument(!(child instanceof INodeFileWithSnapshot),
|
||||
"Child file is already an INodeFileWithLink, child=" + child);
|
||||
|
||||
final INodeFileWithLink newChild = new INodeFileWithLink(child);
|
||||
final INodeFileWithSnapshot newChild = new INodeFileWithSnapshot(child);
|
||||
final int i = searchChildrenForExistingINode(newChild);
|
||||
children.set(i, newChild);
|
||||
return newChild;
|
||||
|
@ -28,10 +28,11 @@
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
|
||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeFileSnapshot;
|
||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeFileWithLink;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
/** I-node for closed file. */
|
||||
@InterfaceAudience.Private
|
||||
public class INodeFile extends INode implements BlockCollection {
|
||||
@ -111,7 +112,7 @@ protected INodeFile(INodeFile that) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Pair<INodeFileWithLink, INodeFileSnapshot> createSnapshotCopy() {
|
||||
public Pair<? extends INodeFile, ? extends INodeFile> createSnapshotCopy() {
|
||||
return parent.replaceINodeFile(this).createSnapshotCopy();
|
||||
}
|
||||
|
||||
@ -121,6 +122,17 @@ public final boolean isFile() {
|
||||
return true;
|
||||
}
|
||||
|
||||
/** Convert this file to an {@link INodeFileUnderConstruction}. */
|
||||
public INodeFileUnderConstruction toUnderConstruction(
|
||||
String clientName,
|
||||
String clientMachine,
|
||||
DatanodeDescriptor clientNode) {
|
||||
Preconditions.checkArgument(!(this instanceof INodeFileUnderConstruction),
|
||||
"file is already an INodeFileUnderConstruction");
|
||||
return new INodeFileUnderConstruction(this,
|
||||
clientName, clientMachine, clientNode);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the {@link FsPermission} of this {@link INodeFile}.
|
||||
* Since this is a file,
|
||||
@ -141,7 +153,7 @@ public short getBlockReplication() {
|
||||
return getFileReplication();
|
||||
}
|
||||
|
||||
protected void setFileReplication(short replication, Snapshot latest) {
|
||||
public void setFileReplication(short replication, Snapshot latest) {
|
||||
if (latest != null) {
|
||||
final Pair<? extends INode, ? extends INode> p = recordModification(latest);
|
||||
if (p != null) {
|
||||
|
@ -30,6 +30,8 @@
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.MutableBlockCollection;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
/**
|
||||
* I-node for file being written.
|
||||
*/
|
||||
@ -78,7 +80,17 @@ public static INodeFileUnderConstruction valueOf(INode inode, String path
|
||||
this.clientNode = clientNode;
|
||||
}
|
||||
|
||||
String getClientName() {
|
||||
protected INodeFileUnderConstruction(final INodeFile that,
|
||||
final String clientName,
|
||||
final String clientMachine,
|
||||
final DatanodeDescriptor clientNode) {
|
||||
super(that);
|
||||
this.clientName = clientName;
|
||||
this.clientMachine = clientMachine;
|
||||
this.clientNode = clientNode;
|
||||
}
|
||||
|
||||
public String getClientName() {
|
||||
return clientName;
|
||||
}
|
||||
|
||||
@ -86,11 +98,11 @@ void setClientName(String clientName) {
|
||||
this.clientName = clientName;
|
||||
}
|
||||
|
||||
String getClientMachine() {
|
||||
public String getClientMachine() {
|
||||
return clientMachine;
|
||||
}
|
||||
|
||||
DatanodeDescriptor getClientNode() {
|
||||
public DatanodeDescriptor getClientNode() {
|
||||
return clientNode;
|
||||
}
|
||||
|
||||
@ -102,31 +114,28 @@ public boolean isUnderConstruction() {
|
||||
return true;
|
||||
}
|
||||
|
||||
//
|
||||
// converts a INodeFileUnderConstruction into a INodeFile
|
||||
// use the modification time as the access time
|
||||
//
|
||||
INodeFile convertToInodeFile(long mtime) {
|
||||
assert allBlocksComplete() : "Can't finalize inode " + this
|
||||
+ " since it contains non-complete blocks! Blocks are "
|
||||
+ Arrays.asList(getBlocks());
|
||||
//TODO SNAPSHOT: may convert to INodeFileWithLink
|
||||
/**
|
||||
* Converts an INodeFileUnderConstruction to an INodeFile.
|
||||
* The original modification time is used as the access time.
|
||||
* The new modification is the specified mtime.
|
||||
*/
|
||||
protected INodeFile toINodeFile(long mtime) {
|
||||
assertAllBlocksComplete();
|
||||
|
||||
return new INodeFile(getId(), getLocalNameBytes(), getPermissionStatus(),
|
||||
mtime, getModificationTime(),
|
||||
getBlocks(), getFileReplication(), getPreferredBlockSize());
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if all of the blocks in this file are marked as completed.
|
||||
*/
|
||||
private boolean allBlocksComplete() {
|
||||
for (BlockInfo b : getBlocks()) {
|
||||
if (!b.isComplete()) {
|
||||
return false;
|
||||
/** Assert all blocks are complete. */
|
||||
protected void assertAllBlocksComplete() {
|
||||
final BlockInfo[] blocks = getBlocks();
|
||||
for (int i = 0; i < blocks.length; i++) {
|
||||
Preconditions.checkState(blocks[i].isComplete(), "Failed to finalize"
|
||||
+ " %s %s since blocks[%s] is non-complete, where blocks=%s.",
|
||||
getClass().getSimpleName(), this, i, Arrays.asList(getBlocks()));
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove a block from the block list. This block should be
|
||||
|
@ -0,0 +1,166 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.namenode.snapshot;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
|
||||
import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapINodeUpdateEntry;
|
||||
import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
/**
|
||||
* {@link INodeFile} with a link to the next element.
|
||||
* The link of all the snapshot files and the original file form a circular
|
||||
* linked list so that all elements are accessible by any of the elements.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public interface FileWithSnapshot {
|
||||
/** @return the {@link INodeFile} view of this object. */
|
||||
public INodeFile asINodeFile();
|
||||
|
||||
/** @return the next element. */
|
||||
public FileWithSnapshot getNext();
|
||||
|
||||
/** Set the next element. */
|
||||
public void setNext(FileWithSnapshot next);
|
||||
|
||||
/** Insert inode to the circular linked list. */
|
||||
public void insert(FileWithSnapshot inode);
|
||||
|
||||
/** Utility methods for the classes which implement the interface. */
|
||||
static class Util {
|
||||
|
||||
/** Replace the old file with the new file in the circular linked list. */
|
||||
static void replace(FileWithSnapshot oldFile, FileWithSnapshot newFile) {
|
||||
//set next element
|
||||
FileWithSnapshot i = oldFile.getNext();
|
||||
newFile.setNext(i);
|
||||
oldFile.setNext(null);
|
||||
//find previous element and update it
|
||||
for(; i.getNext() != oldFile; i = i.getNext());
|
||||
i.setNext(newFile);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the max file replication of the elements
|
||||
* in the circular linked list.
|
||||
*/
|
||||
static short getBlockReplication(final FileWithSnapshot file) {
|
||||
short max = file.asINodeFile().getFileReplication();
|
||||
// i may be null since next will be set to null when the INode is deleted
|
||||
for(FileWithSnapshot i = file.getNext();
|
||||
i != file && i != null;
|
||||
i = i.getNext()) {
|
||||
final short replication = i.asINodeFile().getFileReplication();
|
||||
if (replication > max) {
|
||||
max = replication;
|
||||
}
|
||||
}
|
||||
return max;
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove the current inode from the circular linked list.
|
||||
* If some blocks at the end of the block list no longer belongs to
|
||||
* any other inode, collect them and update the block list.
|
||||
*/
|
||||
static int collectSubtreeBlocksAndClear(final FileWithSnapshot file,
|
||||
final BlocksMapUpdateInfo info) {
|
||||
final FileWithSnapshot next = file.getNext();
|
||||
Preconditions.checkState(next != file, "this is the only remaining inode.");
|
||||
|
||||
// There are other inode(s) using the blocks.
|
||||
// Compute max file size excluding this and find the last inode.
|
||||
long max = next.asINodeFile().computeFileSize(true);
|
||||
short maxReplication = next.asINodeFile().getFileReplication();
|
||||
FileWithSnapshot last = next;
|
||||
for(FileWithSnapshot i = next.getNext(); i != file; i = i.getNext()) {
|
||||
final long size = i.asINodeFile().computeFileSize(true);
|
||||
if (size > max) {
|
||||
max = size;
|
||||
}
|
||||
final short rep = i.asINodeFile().getFileReplication();
|
||||
if (rep > maxReplication) {
|
||||
maxReplication = rep;
|
||||
}
|
||||
last = i;
|
||||
}
|
||||
|
||||
collectBlocksBeyondMaxAndClear(file, max, info);
|
||||
|
||||
// remove this from the circular linked list.
|
||||
last.setNext(next);
|
||||
// Set the replication of the current INode to the max of all the other
|
||||
// linked INodes, so that in case the current INode is retrieved from the
|
||||
// blocksMap before it is removed or updated, the correct replication
|
||||
// number can be retrieved.
|
||||
file.asINodeFile().setFileReplication(maxReplication, null);
|
||||
file.setNext(null);
|
||||
// clear parent
|
||||
file.asINodeFile().setParent(null);
|
||||
return 1;
|
||||
}
|
||||
|
||||
static void collectBlocksBeyondMaxAndClear(final FileWithSnapshot file,
|
||||
final long max, final BlocksMapUpdateInfo info) {
|
||||
final BlockInfo[] oldBlocks = file.asINodeFile().getBlocks();
|
||||
if (oldBlocks != null) {
|
||||
//find the minimum n such that the size of the first n blocks > max
|
||||
int n = 0;
|
||||
for(long size = 0; n < oldBlocks.length && max > size; n++) {
|
||||
size += oldBlocks[n].getNumBytes();
|
||||
}
|
||||
|
||||
// Replace the INode for all the remaining blocks in blocksMap
|
||||
final FileWithSnapshot next = file.getNext();
|
||||
final BlocksMapINodeUpdateEntry entry = new BlocksMapINodeUpdateEntry(
|
||||
file.asINodeFile(), next.asINodeFile());
|
||||
if (info != null) {
|
||||
for (int i = 0; i < n; i++) {
|
||||
info.addUpdateBlock(oldBlocks[i], entry);
|
||||
}
|
||||
}
|
||||
|
||||
// starting from block n, the data is beyond max.
|
||||
if (n < oldBlocks.length) {
|
||||
// resize the array.
|
||||
final BlockInfo[] newBlocks;
|
||||
if (n == 0) {
|
||||
newBlocks = null;
|
||||
} else {
|
||||
newBlocks = new BlockInfo[n];
|
||||
System.arraycopy(oldBlocks, 0, newBlocks, 0, n);
|
||||
}
|
||||
for(FileWithSnapshot i = next; i != file; i = i.getNext()) {
|
||||
i.asINodeFile().setBlocks(newBlocks);
|
||||
}
|
||||
|
||||
// collect the blocks beyond max.
|
||||
if (info != null) {
|
||||
for(; n < oldBlocks.length; n++) {
|
||||
info.addDeleteBlock(oldBlocks[n]);
|
||||
}
|
||||
}
|
||||
}
|
||||
file.asINodeFile().setBlocks(null);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -23,11 +23,11 @@
|
||||
* INode representing a snapshot of a file.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class INodeFileSnapshot extends INodeFileWithLink {
|
||||
public class INodeFileSnapshot extends INodeFileWithSnapshot {
|
||||
/** The file size at snapshot creation time. */
|
||||
final long size;
|
||||
|
||||
INodeFileSnapshot(INodeFileWithLink f) {
|
||||
INodeFileSnapshot(INodeFileWithSnapshot f) {
|
||||
super(f);
|
||||
this.size = f.computeFileSize(true);
|
||||
f.insert(this);
|
||||
|
@ -0,0 +1,44 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.namenode.snapshot;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hdfs.server.namenode.INodeFileUnderConstruction;
|
||||
|
||||
/**
|
||||
* INode representing a snapshot of an {@link INodeFileUnderConstruction}.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class INodeFileUnderConstructionSnapshot
|
||||
extends INodeFileUnderConstructionWithSnapshot {
|
||||
/** The file size at snapshot creation time. */
|
||||
final long size;
|
||||
|
||||
INodeFileUnderConstructionSnapshot(INodeFileUnderConstructionWithSnapshot f) {
|
||||
super(f, f.getClientName(), f.getClientMachine(), f.getClientNode());
|
||||
this.size = f.computeFileSize(true);
|
||||
f.insert(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long computeFileSize(boolean includesBlockInfoUnderConstruction) {
|
||||
//ignore includesBlockInfoUnderConstruction
|
||||
//since files in a snapshot are considered as closed.
|
||||
return size;
|
||||
}
|
||||
}
|
@ -0,0 +1,96 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.namenode.snapshot;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
||||
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
|
||||
import org.apache.hadoop.hdfs.server.namenode.INodeFileUnderConstruction;
|
||||
|
||||
/**
|
||||
* Represent an {@link INodeFileUnderConstruction} that is snapshotted.
|
||||
* Note that snapshot files are represented by
|
||||
* {@link INodeFileUnderConstructionSnapshot}.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class INodeFileUnderConstructionWithSnapshot
|
||||
extends INodeFileUnderConstruction implements FileWithSnapshot {
|
||||
private FileWithSnapshot next;
|
||||
|
||||
INodeFileUnderConstructionWithSnapshot(final FileWithSnapshot f,
|
||||
final String clientName,
|
||||
final String clientMachine,
|
||||
final DatanodeDescriptor clientNode) {
|
||||
super(f.asINodeFile(), clientName, clientMachine, clientNode);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected INodeFileWithSnapshot toINodeFile(final long mtime) {
|
||||
assertAllBlocksComplete();
|
||||
final long atime = getModificationTime();
|
||||
final INodeFileWithSnapshot f = new INodeFileWithSnapshot(this);
|
||||
f.setModificationTime(mtime, null);
|
||||
f.setAccessTime(atime, null);
|
||||
Util.replace(this, f);
|
||||
return f;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Pair<? extends INodeFileUnderConstruction,
|
||||
INodeFileUnderConstructionSnapshot> createSnapshotCopy() {
|
||||
return new Pair<INodeFileUnderConstructionWithSnapshot,
|
||||
INodeFileUnderConstructionSnapshot>(
|
||||
this, new INodeFileUnderConstructionSnapshot(this));
|
||||
}
|
||||
|
||||
@Override
|
||||
public INodeFile asINodeFile() {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public FileWithSnapshot getNext() {
|
||||
return next;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setNext(FileWithSnapshot next) {
|
||||
this.next = next;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void insert(FileWithSnapshot inode) {
|
||||
inode.setNext(this.getNext());
|
||||
this.setNext(inode);
|
||||
}
|
||||
|
||||
@Override
|
||||
public short getBlockReplication() {
|
||||
return Util.getBlockReplication(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int collectSubtreeBlocksAndClear(BlocksMapUpdateInfo info) {
|
||||
if (next == null || next == this) {
|
||||
// this is the only remaining inode.
|
||||
return super.collectSubtreeBlocksAndClear(info);
|
||||
} else {
|
||||
return Util.collectSubtreeBlocksAndClear(this, info);
|
||||
}
|
||||
}
|
||||
}
|
@ -1,166 +0,0 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.namenode.snapshot;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
|
||||
|
||||
/**
|
||||
* INodeFile with a link to the next element.
|
||||
* This class is used to represent the original file that is snapshotted.
|
||||
* The snapshot files are represented by {@link INodeFileSnapshot}.
|
||||
* The link of all the snapshot files and the original file form a circular
|
||||
* linked list so that all elements are accessible by any of the elements.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class INodeFileWithLink extends INodeFile {
|
||||
private INodeFileWithLink next;
|
||||
|
||||
public INodeFileWithLink(INodeFile f) {
|
||||
super(f);
|
||||
next = this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Pair<INodeFileWithLink, INodeFileSnapshot> createSnapshotCopy() {
|
||||
return new Pair<INodeFileWithLink, INodeFileSnapshot>(this,
|
||||
new INodeFileSnapshot(this));
|
||||
}
|
||||
|
||||
void setNext(INodeFileWithLink next) {
|
||||
this.next = next;
|
||||
}
|
||||
|
||||
INodeFileWithLink getNext() {
|
||||
return next;
|
||||
}
|
||||
|
||||
/** Insert inode to the circular linked list. */
|
||||
void insert(INodeFileWithLink inode) {
|
||||
inode.setNext(this.getNext());
|
||||
this.setNext(inode);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the max file replication of the elements
|
||||
* in the circular linked list.
|
||||
*/
|
||||
@Override
|
||||
public short getBlockReplication() {
|
||||
short max = getFileReplication();
|
||||
// i may be null since next will be set to null when the INode is deleted
|
||||
for(INodeFileWithLink i = next; i != this && i != null; i = i.getNext()) {
|
||||
final short replication = i.getFileReplication();
|
||||
if (replication > max) {
|
||||
max = replication;
|
||||
}
|
||||
}
|
||||
return max;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*
|
||||
* Remove the current inode from the circular linked list.
|
||||
* If some blocks at the end of the block list no longer belongs to
|
||||
* any other inode, collect them and update the block list.
|
||||
*/
|
||||
@Override
|
||||
public int collectSubtreeBlocksAndClear(BlocksMapUpdateInfo info) {
|
||||
if (next == this) {
|
||||
// this is the only remaining inode.
|
||||
super.collectSubtreeBlocksAndClear(info);
|
||||
} else {
|
||||
// There are other inode(s) using the blocks.
|
||||
// Compute max file size excluding this and find the last inode.
|
||||
long max = next.computeFileSize(true);
|
||||
short maxReplication = next.getFileReplication();
|
||||
INodeFileWithLink last = next;
|
||||
for(INodeFileWithLink i = next.getNext(); i != this; i = i.getNext()) {
|
||||
final long size = i.computeFileSize(true);
|
||||
if (size > max) {
|
||||
max = size;
|
||||
}
|
||||
final short rep = i.getFileReplication();
|
||||
if (rep > maxReplication) {
|
||||
maxReplication = rep;
|
||||
}
|
||||
last = i;
|
||||
}
|
||||
|
||||
collectBlocksBeyondMaxAndClear(max, info);
|
||||
|
||||
// remove this from the circular linked list.
|
||||
last.next = this.next;
|
||||
// Set the replication of the current INode to the max of all the other
|
||||
// linked INodes, so that in case the current INode is retrieved from the
|
||||
// blocksMap before it is removed or updated, the correct replication
|
||||
// number can be retrieved.
|
||||
this.setFileReplication(maxReplication, null);
|
||||
this.next = null;
|
||||
// clear parent
|
||||
setParent(null);
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
|
||||
private void collectBlocksBeyondMaxAndClear(final long max,
|
||||
final BlocksMapUpdateInfo info) {
|
||||
final BlockInfo[] oldBlocks = getBlocks();
|
||||
if (oldBlocks != null) {
|
||||
//find the minimum n such that the size of the first n blocks > max
|
||||
int n = 0;
|
||||
for(long size = 0; n < oldBlocks.length && max > size; n++) {
|
||||
size += oldBlocks[n].getNumBytes();
|
||||
}
|
||||
|
||||
// Replace the INode for all the remaining blocks in blocksMap
|
||||
BlocksMapINodeUpdateEntry entry = new BlocksMapINodeUpdateEntry(this,
|
||||
next);
|
||||
if (info != null) {
|
||||
for (int i = 0; i < n; i++) {
|
||||
info.addUpdateBlock(oldBlocks[i], entry);
|
||||
}
|
||||
}
|
||||
|
||||
// starting from block n, the data is beyond max.
|
||||
if (n < oldBlocks.length) {
|
||||
// resize the array.
|
||||
final BlockInfo[] newBlocks;
|
||||
if (n == 0) {
|
||||
newBlocks = null;
|
||||
} else {
|
||||
newBlocks = new BlockInfo[n];
|
||||
System.arraycopy(oldBlocks, 0, newBlocks, 0, n);
|
||||
}
|
||||
for(INodeFileWithLink i = next; i != this; i = i.getNext()) {
|
||||
i.setBlocks(newBlocks);
|
||||
}
|
||||
|
||||
// collect the blocks beyond max.
|
||||
if (info != null) {
|
||||
for(; n < oldBlocks.length; n++) {
|
||||
info.addDeleteBlock(oldBlocks[n]);
|
||||
}
|
||||
}
|
||||
}
|
||||
setBlocks(null);
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,92 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.namenode.snapshot;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
||||
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
|
||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.FileWithSnapshot.Util;
|
||||
|
||||
/**
|
||||
* Represent an {@link INodeFile} that is snapshotted.
|
||||
* Note that snapshot files are represented by {@link INodeFileSnapshot}.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class INodeFileWithSnapshot extends INodeFile
|
||||
implements FileWithSnapshot {
|
||||
private FileWithSnapshot next;
|
||||
|
||||
public INodeFileWithSnapshot(INodeFile f) {
|
||||
super(f);
|
||||
setNext(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public INodeFileUnderConstructionWithSnapshot toUnderConstruction(
|
||||
final String clientName,
|
||||
final String clientMachine,
|
||||
final DatanodeDescriptor clientNode) {
|
||||
final INodeFileUnderConstructionWithSnapshot f
|
||||
= new INodeFileUnderConstructionWithSnapshot(this,
|
||||
clientName, clientMachine, clientNode);
|
||||
Util.replace(this, f);
|
||||
return f;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Pair<INodeFileWithSnapshot, INodeFileSnapshot> createSnapshotCopy() {
|
||||
return new Pair<INodeFileWithSnapshot, INodeFileSnapshot>(this,
|
||||
new INodeFileSnapshot(this));
|
||||
}
|
||||
|
||||
@Override
|
||||
public INodeFile asINodeFile() {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public FileWithSnapshot getNext() {
|
||||
return next;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setNext(FileWithSnapshot next) {
|
||||
this.next = next;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void insert(FileWithSnapshot inode) {
|
||||
inode.setNext(this.getNext());
|
||||
this.setNext(inode);
|
||||
}
|
||||
|
||||
@Override
|
||||
public short getBlockReplication() {
|
||||
return Util.getBlockReplication(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int collectSubtreeBlocksAndClear(BlocksMapUpdateInfo info) {
|
||||
if (next == null || next == this) {
|
||||
// this is the only remaining inode.
|
||||
return super.collectSubtreeBlocksAndClear(info);
|
||||
} else {
|
||||
return Util.collectSubtreeBlocksAndClear(this, info);
|
||||
}
|
||||
}
|
||||
}
|
@ -364,8 +364,7 @@ public void testSnapshotPathINodesWithAddedFile() throws Exception {
|
||||
* Test {@link INodeDirectory#getExistingPathINodes(byte[][], int, boolean)}
|
||||
* for snapshot file while modifying file after snapshot.
|
||||
*/
|
||||
// TODO: disable it temporarily since it uses append.
|
||||
// @Test
|
||||
@Test
|
||||
public void testSnapshotPathINodesAfterModification() throws Exception {
|
||||
//file1 was deleted, create it again.
|
||||
DFSTestUtil.createFile(hdfs, file1, 1024, REPLICATION, seed);
|
||||
|
@ -270,10 +270,9 @@ private Modification[] prepareModifications(TestDirectoryTree.Node[] nodes)
|
||||
Modification delete = new FileDeletion(
|
||||
node.fileList.get((node.nullFileIndex + 1) % node.fileList.size()),
|
||||
hdfs);
|
||||
// TODO: fix append for snapshots
|
||||
// Modification append = new FileAppend(
|
||||
// node.fileList.get((node.nullFileIndex + 2) % node.fileList.size()),
|
||||
// hdfs, (int) BLOCKSIZE);
|
||||
Modification append = new FileAppend(
|
||||
node.fileList.get((node.nullFileIndex + 2) % node.fileList.size()),
|
||||
hdfs, (int) BLOCKSIZE);
|
||||
Modification chmod = new FileChangePermission(
|
||||
node.fileList.get((node.nullFileIndex + 3) % node.fileList.size()),
|
||||
hdfs, genRandomPermission());
|
||||
@ -290,8 +289,7 @@ private Modification[] prepareModifications(TestDirectoryTree.Node[] nodes)
|
||||
|
||||
mList.add(create);
|
||||
mList.add(delete);
|
||||
// TODO: fix append for snapshots
|
||||
// mList.add(append);
|
||||
mList.add(append);
|
||||
mList.add(chmod);
|
||||
mList.add(chown);
|
||||
mList.add(replication);
|
||||
|
@ -39,7 +39,7 @@
|
||||
/**
|
||||
* This class tests the replication handling/calculation of snapshots. In
|
||||
* particular, {@link INodeFile#getFileReplication()} and
|
||||
* {@link INodeFileWithLink#getBlockReplication()} are tested to make sure
|
||||
* {@link INodeFileWithSnapshot#getBlockReplication()} are tested to make sure
|
||||
* the number of replication is calculated correctly with/without snapshots.
|
||||
*/
|
||||
public class TestSnapshotReplication {
|
||||
@ -132,7 +132,7 @@ INodeFile getINodeFile(Path p) throws Exception {
|
||||
* INodes
|
||||
* @param expectedBlockRep
|
||||
* The expected replication number that should be returned by
|
||||
* {@link INodeFileWithLink#getBlockReplication()} of all the INodes
|
||||
* {@link INodeFileWithSnapshot#getBlockReplication()} of all the INodes
|
||||
* @throws Exception
|
||||
*/
|
||||
private void checkSnapshotFileReplication(Path currentFile,
|
||||
|
Loading…
Reference in New Issue
Block a user