Merge r1404624 through r1405251 from trunk.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2802@1405253 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2012-11-03 01:38:41 +00:00
commit 554fb4d2b2
20 changed files with 328 additions and 150 deletions

View File

@ -445,6 +445,9 @@ Release 2.0.3-alpha - Unreleased
HDFS-3916. libwebhdfs testing code cleanup. (Jing Zhao via suresh) HDFS-3916. libwebhdfs testing code cleanup. (Jing Zhao via suresh)
HDFS-4143. Change blocks to private in INodeFile and renames isLink() to
isSymlink() in INode. (szetszwo)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES
@ -536,6 +539,12 @@ Release 2.0.3-alpha - Unreleased
HDFS-3809. Make BKJM use protobufs for all serialization with ZK. HDFS-3809. Make BKJM use protobufs for all serialization with ZK.
(Ivan Kelly via umamahesh) (Ivan Kelly via umamahesh)
HDFS-3804. TestHftpFileSystem fails intermittently with JDK7
(Trevor Robinson via daryn)
HDFS-4132. When libwebhdfs is not enabled, nativeMiniDfsClient frees
uninitialized memory (Colin Patrick McCabe via todd)
Release 2.0.2-alpha - 2012-09-07 Release 2.0.2-alpha - 2012-09-07
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -313,7 +313,7 @@ INodeDirectory addToParent(byte[] src, INodeDirectory parentINode,
} }
if(newParent == null) if(newParent == null)
return null; return null;
if(!newNode.isDirectory() && !newNode.isLink()) { if(!newNode.isDirectory() && !newNode.isSymlink()) {
// Add file->block mapping // Add file->block mapping
INodeFile newF = (INodeFile)newNode; INodeFile newF = (INodeFile)newNode;
BlockInfo[] blocks = newF.getBlocks(); BlockInfo[] blocks = newF.getBlocks();
@ -533,7 +533,7 @@ boolean unprotectedRenameTo(String src, String dst, long timestamp)
if (dst.equals(src)) { if (dst.equals(src)) {
return true; return true;
} }
if (srcInode.isLink() && if (srcInode.isSymlink() &&
dst.equals(((INodeSymlink)srcInode).getLinkValue())) { dst.equals(((INodeSymlink)srcInode).getLinkValue())) {
throw new FileAlreadyExistsException( throw new FileAlreadyExistsException(
"Cannot rename symlink "+src+" to its target "+dst); "Cannot rename symlink "+src+" to its target "+dst);
@ -655,7 +655,7 @@ boolean unprotectedRenameTo(String src, String dst, long timestamp,
throw new FileAlreadyExistsException( throw new FileAlreadyExistsException(
"The source "+src+" and destination "+dst+" are the same"); "The source "+src+" and destination "+dst+" are the same");
} }
if (srcInode.isLink() && if (srcInode.isSymlink() &&
dst.equals(((INodeSymlink)srcInode).getLinkValue())) { dst.equals(((INodeSymlink)srcInode).getLinkValue())) {
throw new FileAlreadyExistsException( throw new FileAlreadyExistsException(
"Cannot rename symlink "+src+" to its target "+dst); "Cannot rename symlink "+src+" to its target "+dst);
@ -819,7 +819,7 @@ Block[] unprotectedSetReplication(String src,
if (inode == null) { if (inode == null) {
return null; return null;
} }
assert !inode.isLink(); assert !inode.isSymlink();
if (inode.isDirectory()) { if (inode.isDirectory()) {
return null; return null;
} }
@ -851,7 +851,7 @@ long getPreferredBlockSize(String filename) throws UnresolvedLinkException,
if (inode == null) { if (inode == null) {
throw new FileNotFoundException("File does not exist: " + filename); throw new FileNotFoundException("File does not exist: " + filename);
} }
if (inode.isDirectory() || inode.isLink()) { if (inode.isDirectory() || inode.isSymlink()) {
throw new IOException("Getting block size of non-file: "+ filename); throw new IOException("Getting block size of non-file: "+ filename);
} }
return ((INodeFile)inode).getPreferredBlockSize(); return ((INodeFile)inode).getPreferredBlockSize();
@ -868,7 +868,7 @@ boolean exists(String src) throws UnresolvedLinkException {
if (inode == null) { if (inode == null) {
return false; return false;
} }
return inode.isDirectory() || inode.isLink() return inode.isDirectory() || inode.isSymlink()
? true ? true
: ((INodeFile)inode).getBlocks() != null; : ((INodeFile)inode).getBlocks() != null;
} finally { } finally {
@ -968,7 +968,7 @@ public void unprotectedConcat(String target, String [] srcs, long timestamp)
for(String src : srcs) { for(String src : srcs) {
INodeFile srcInode = (INodeFile)getINode(src); INodeFile srcInode = (INodeFile)getINode(src);
allSrcInodes[i++] = srcInode; allSrcInodes[i++] = srcInode;
totalBlocks += srcInode.blocks.length; totalBlocks += srcInode.numBlocks();
} }
trgInode.appendBlocks(allSrcInodes, totalBlocks); // copy the blocks trgInode.appendBlocks(allSrcInodes, totalBlocks); // copy the blocks
@ -977,7 +977,7 @@ public void unprotectedConcat(String target, String [] srcs, long timestamp)
for(INodeFile nodeToRemove: allSrcInodes) { for(INodeFile nodeToRemove: allSrcInodes) {
if(nodeToRemove == null) continue; if(nodeToRemove == null) continue;
nodeToRemove.blocks = null; nodeToRemove.setBlocks(null);
trgParent.removeChild(nodeToRemove); trgParent.removeChild(nodeToRemove);
count++; count++;
} }
@ -1232,7 +1232,7 @@ Block[] getFileBlocks(String src) throws UnresolvedLinkException {
return null; return null;
if (targetNode.isDirectory()) if (targetNode.isDirectory())
return null; return null;
if (targetNode.isLink()) if (targetNode.isSymlink())
return null; return null;
return ((INodeFile)targetNode).getBlocks(); return ((INodeFile)targetNode).getBlocks();
} finally { } finally {
@ -1846,7 +1846,7 @@ private static void updateCountForINodeWithQuota(INodeDirectory dir,
if (child.isDirectory()) { if (child.isDirectory()) {
updateCountForINodeWithQuota((INodeDirectory)child, updateCountForINodeWithQuota((INodeDirectory)child,
counts, nodesInPath); counts, nodesInPath);
} else if (child.isLink()) { } else if (child.isSymlink()) {
counts.nsCount += 1; counts.nsCount += 1;
} else { // reduce recursive calls } else { // reduce recursive calls
counts.nsCount += 1; counts.nsCount += 1;
@ -2075,7 +2075,7 @@ private HdfsFileStatus createFileStatus(byte[] path, INode node) {
node.getFsPermission(), node.getFsPermission(),
node.getUserName(), node.getUserName(),
node.getGroupName(), node.getGroupName(),
node.isLink() ? ((INodeSymlink)node).getSymlink() : null, node.isSymlink() ? ((INodeSymlink)node).getSymlink() : null,
path); path);
} }
@ -2111,7 +2111,7 @@ private HdfsLocatedFileStatus createLocatedFileStatus(
node.getFsPermission(), node.getFsPermission(),
node.getUserName(), node.getUserName(),
node.getGroupName(), node.getGroupName(),
node.isLink() ? ((INodeSymlink)node).getSymlink() : null, node.isSymlink() ? ((INodeSymlink)node).getSymlink() : null,
path, path,
loc); loc);
} }
@ -2182,7 +2182,7 @@ INodeSymlink unprotectedSymlink(String path, String target, long modTime,
*/ */
void cacheName(INode inode) { void cacheName(INode inode) {
// Name is cached only for files // Name is cached only for files
if (inode.isDirectory() || inode.isLink()) { if (inode.isDirectory() || inode.isSymlink()) {
return; return;
} }
ByteArray name = new ByteArray(inode.getLocalNameBytes()); ByteArray name = new ByteArray(inode.getLocalNameBytes());

View File

@ -162,7 +162,7 @@ static void saveINode2Image(INode node,
PermissionStatus.write(out, node.getUserName(), PermissionStatus.write(out, node.getUserName(),
node.getGroupName(), node.getGroupName(),
filePerm); filePerm);
} else if (node.isLink()) { } else if (node.isSymlink()) {
out.writeShort(0); // replication out.writeShort(0); // replication
out.writeLong(0); // modification time out.writeLong(0); // modification time
out.writeLong(0); // access time out.writeLong(0); // access time

View File

@ -1394,7 +1394,7 @@ private void concatInternal(String target, String [] srcs)
+ target + " is under construction"); + target + " is under construction");
} }
// per design target shouldn't be empty and all the blocks same size // per design target shouldn't be empty and all the blocks same size
if(trgInode.blocks.length == 0) { if(trgInode.numBlocks() == 0) {
throw new HadoopIllegalArgumentException("concat: target file " throw new HadoopIllegalArgumentException("concat: target file "
+ target + " is empty"); + target + " is empty");
} }
@ -1406,10 +1406,10 @@ private void concatInternal(String target, String [] srcs)
long blockSize = trgInode.getPreferredBlockSize(); long blockSize = trgInode.getPreferredBlockSize();
// check the end block to be full // check the end block to be full
if(blockSize != trgInode.blocks[trgInode.blocks.length-1].getNumBytes()) { final BlockInfo last = trgInode.getLastBlock();
if(blockSize != last.getNumBytes()) {
throw new HadoopIllegalArgumentException("The last block in " + target throw new HadoopIllegalArgumentException("The last block in " + target
+ " is not full; last block size = " + " is not full; last block size = " + last.getNumBytes()
+ trgInode.blocks[trgInode.blocks.length-1].getNumBytes()
+ " but file block size = " + blockSize); + " but file block size = " + blockSize);
} }
@ -1426,7 +1426,7 @@ private void concatInternal(String target, String [] srcs)
final INodeFile srcInode = INodeFile.valueOf(dir.getINode(src), src); final INodeFile srcInode = INodeFile.valueOf(dir.getINode(src), src);
if(src.isEmpty() if(src.isEmpty()
|| srcInode.isUnderConstruction() || srcInode.isUnderConstruction()
|| srcInode.blocks.length == 0) { || srcInode.numBlocks() == 0) {
throw new HadoopIllegalArgumentException("concat: source file " + src throw new HadoopIllegalArgumentException("concat: source file " + src
+ " is invalid or empty or underConstruction"); + " is invalid or empty or underConstruction");
} }
@ -1443,15 +1443,16 @@ private void concatInternal(String target, String [] srcs)
//boolean endBlock=false; //boolean endBlock=false;
// verify that all the blocks are of the same length as target // verify that all the blocks are of the same length as target
// should be enough to check the end blocks // should be enough to check the end blocks
int idx = srcInode.blocks.length-1; final BlockInfo[] srcBlocks = srcInode.getBlocks();
int idx = srcBlocks.length-1;
if(endSrc) if(endSrc)
idx = srcInode.blocks.length-2; // end block of endSrc is OK not to be full idx = srcBlocks.length-2; // end block of endSrc is OK not to be full
if(idx >= 0 && srcInode.blocks[idx].getNumBytes() != blockSize) { if(idx >= 0 && srcBlocks[idx].getNumBytes() != blockSize) {
throw new HadoopIllegalArgumentException("concat: the soruce file " throw new HadoopIllegalArgumentException("concat: the soruce file "
+ src + " and the target file " + target + src + " and the target file " + target
+ " should have the same blocks sizes: target block size is " + " should have the same blocks sizes: target block size is "
+ blockSize + " but the size of source block " + idx + " is " + blockSize + " but the size of source block " + idx + " is "
+ srcInode.blocks[idx].getNumBytes()); + srcBlocks[idx].getNumBytes());
} }
si.add(srcInode); si.add(srcInode);
@ -1686,7 +1687,7 @@ private void verifyParentDir(String src) throws FileNotFoundException,
if (parentNode == null) { if (parentNode == null) {
throw new FileNotFoundException("Parent directory doesn't exist: " throw new FileNotFoundException("Parent directory doesn't exist: "
+ parent.toString()); + parent.toString());
} else if (!parentNode.isDirectory() && !parentNode.isLink()) { } else if (!parentNode.isDirectory() && !parentNode.isSymlink()) {
throw new ParentNotDirectoryException("Parent path is not a directory: " throw new ParentNotDirectoryException("Parent path is not a directory: "
+ parent.toString()); + parent.toString());
} }

View File

@ -183,7 +183,9 @@ void setPermission(FsPermission permission) {
/** /**
* Check whether it's a directory * Check whether it's a directory
*/ */
abstract boolean isDirectory(); public boolean isDirectory() {
return false;
}
/** /**
* Collect all the blocks in all children of this INode. * Collect all the blocks in all children of this INode.
@ -337,7 +339,7 @@ public boolean isUnderConstruction() {
/** /**
* Check whether it's a symlink * Check whether it's a symlink
*/ */
public boolean isLink() { public boolean isSymlink() {
return false; return false;
} }

View File

@ -81,11 +81,9 @@ public INodeDirectory(INodeDirectory other) {
this.children = other.getChildren(); this.children = other.getChildren();
} }
/** /** @return true unconditionally. */
* Check whether it's a directory
*/
@Override @Override
public boolean isDirectory() { public final boolean isDirectory() {
return true; return true;
} }
@ -214,7 +212,7 @@ INodesInPath getExistingPathINodes(byte[][] components, int numOfINodes,
if (index >= 0) { if (index >= 0) {
existing.addNode(curNode); existing.addNode(curNode);
} }
if (curNode.isLink() && (!lastComp || (lastComp && resolveLink))) { if (curNode.isSymlink() && (!lastComp || (lastComp && resolveLink))) {
final String path = constructPath(components, 0, components.length); final String path = constructPath(components, 0, components.length);
final String preceding = constructPath(components, 0, count); final String preceding = constructPath(components, 0, count);
final String remainder = final String remainder =

View File

@ -55,7 +55,7 @@ public static INodeFile valueOf(INode inode, String path) throws IOException {
private long header; private long header;
protected BlockInfo[] blocks = null; protected BlockInfo[] blocks;
INodeFile(PermissionStatus permissions, BlockInfo[] blklist, INodeFile(PermissionStatus permissions, BlockInfo[] blklist,
short replication, long modificationTime, short replication, long modificationTime,
@ -63,7 +63,7 @@ public static INodeFile valueOf(INode inode, String path) throws IOException {
super(permissions, modificationTime, atime); super(permissions, modificationTime, atime);
this.setFileReplication(replication); this.setFileReplication(replication);
this.setPreferredBlockSize(preferredBlockSize); this.setPreferredBlockSize(preferredBlockSize);
blocks = blklist; this.blocks = blklist;
} }
protected INodeFile(INodeFile f) { protected INodeFile(INodeFile f) {
@ -82,11 +82,6 @@ void setPermission(FsPermission permission) {
super.setPermission(permission.applyUMask(UMASK)); super.setPermission(permission.applyUMask(UMASK));
} }
@Override
boolean isDirectory() {
return false;
}
/** @return the replication factor of the file. */ /** @return the replication factor of the file. */
public final short getFileReplication() { public final short getFileReplication() {
return (short) ((header & HEADERMASK) >> BLOCKBITS); return (short) ((header & HEADERMASK) >> BLOCKBITS);
@ -138,7 +133,7 @@ void appendBlocks(INodeFile [] inodes, int totalAddedBlocks) {
for(BlockInfo bi: newlist) { for(BlockInfo bi: newlist) {
bi.setBlockCollection(this); bi.setBlockCollection(this);
} }
this.blocks = newlist; setBlocks(newlist);
} }
/** /**
@ -146,14 +141,13 @@ void appendBlocks(INodeFile [] inodes, int totalAddedBlocks) {
*/ */
void addBlock(BlockInfo newblock) { void addBlock(BlockInfo newblock) {
if (this.blocks == null) { if (this.blocks == null) {
this.blocks = new BlockInfo[1]; this.setBlocks(new BlockInfo[]{newblock});
this.blocks[0] = newblock;
} else { } else {
int size = this.blocks.length; int size = this.blocks.length;
BlockInfo[] newlist = new BlockInfo[size + 1]; BlockInfo[] newlist = new BlockInfo[size + 1];
System.arraycopy(this.blocks, 0, newlist, 0, size); System.arraycopy(this.blocks, 0, newlist, 0, size);
newlist[size] = newblock; newlist[size] = newblock;
this.blocks = newlist; this.setBlocks(newlist);
} }
} }
@ -162,6 +156,11 @@ public void setBlock(int idx, BlockInfo blk) {
this.blocks[idx] = blk; this.blocks[idx] = blk;
} }
/** Set the blocks. */
public void setBlocks(BlockInfo[] blocks) {
this.blocks = blocks;
}
@Override @Override
protected int collectSubtreeBlocksAndClear(List<Block> v) { protected int collectSubtreeBlocksAndClear(List<Block> v) {
parent = null; parent = null;
@ -171,7 +170,7 @@ protected int collectSubtreeBlocksAndClear(List<Block> v) {
blk.setBlockCollection(null); blk.setBlockCollection(null);
} }
} }
blocks = null; setBlocks(null);
return 1; return 1;
} }

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.namenode; package org.apache.hadoop.hdfs.server.namenode;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.permission.PermissionStatus; import org.apache.hadoop.fs.permission.PermissionStatus;
@ -28,8 +29,6 @@
import org.apache.hadoop.hdfs.server.blockmanagement.MutableBlockCollection; import org.apache.hadoop.hdfs.server.blockmanagement.MutableBlockCollection;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import com.google.common.base.Joiner;
/** /**
* I-node for file being written. * I-node for file being written.
*/ */
@ -109,9 +108,9 @@ public boolean isUnderConstruction() {
// use the modification time as the access time // use the modification time as the access time
// //
INodeFile convertToInodeFile() { INodeFile convertToInodeFile() {
assert allBlocksComplete() : assert allBlocksComplete() : "Can't finalize inode " + this
"Can't finalize inode " + this + " since it contains " + + " since it contains non-complete blocks! Blocks are "
"non-complete blocks! Blocks are: " + blocksAsString(); + Arrays.asList(getBlocks());
//TODO SNAPSHOT: may convert to INodeFileWithLink //TODO SNAPSHOT: may convert to INodeFileWithLink
INodeFile obj = new INodeFile(getPermissionStatus(), INodeFile obj = new INodeFile(getPermissionStatus(),
getBlocks(), getBlocks(),
@ -127,7 +126,7 @@ assert allBlocksComplete() :
* @return true if all of the blocks in this file are marked as completed. * @return true if all of the blocks in this file are marked as completed.
*/ */
private boolean allBlocksComplete() { private boolean allBlocksComplete() {
for (BlockInfo b : blocks) { for (BlockInfo b : getBlocks()) {
if (!b.isComplete()) { if (!b.isComplete()) {
return false; return false;
} }
@ -140,6 +139,7 @@ private boolean allBlocksComplete() {
* the last one on the list. * the last one on the list.
*/ */
void removeLastBlock(Block oldblock) throws IOException { void removeLastBlock(Block oldblock) throws IOException {
final BlockInfo[] blocks = getBlocks();
if (blocks == null) { if (blocks == null) {
throw new IOException("Trying to delete non-existant block " + oldblock); throw new IOException("Trying to delete non-existant block " + oldblock);
} }
@ -151,7 +151,7 @@ void removeLastBlock(Block oldblock) throws IOException {
//copy to a new list //copy to a new list
BlockInfo[] newlist = new BlockInfo[size_1]; BlockInfo[] newlist = new BlockInfo[size_1];
System.arraycopy(blocks, 0, newlist, 0, size_1); System.arraycopy(blocks, 0, newlist, 0, size_1);
blocks = newlist; setBlocks(newlist);
} }
/** /**
@ -160,11 +160,9 @@ void removeLastBlock(Block oldblock) throws IOException {
*/ */
@Override @Override
public BlockInfoUnderConstruction setLastBlock(BlockInfo lastBlock, public BlockInfoUnderConstruction setLastBlock(BlockInfo lastBlock,
DatanodeDescriptor[] targets) DatanodeDescriptor[] targets) throws IOException {
throws IOException { if (numBlocks() == 0) {
if (blocks == null || blocks.length == 0) { throw new IOException("Failed to set last block: File is empty.");
throw new IOException("Trying to update non-existant block. " +
"File is empty.");
} }
BlockInfoUnderConstruction ucBlock = BlockInfoUnderConstruction ucBlock =
lastBlock.convertToBlockUnderConstruction( lastBlock.convertToBlockUnderConstruction(
@ -173,8 +171,4 @@ public BlockInfoUnderConstruction setLastBlock(BlockInfo lastBlock,
setBlock(numBlocks()-1, ucBlock); setBlock(numBlocks()-1, ucBlock);
return ucBlock; return ucBlock;
} }
private String blocksAsString() {
return Joiner.on(",").join(this.blocks);
}
} }

View File

@ -49,7 +49,7 @@ public INodeSymlink(INodeSymlink that) {
} }
@Override @Override
public boolean isLink() { public boolean isSymlink() {
return true; return true;
} }
@ -81,9 +81,4 @@ long[] computeContentSummary(long[] summary) {
summary[1]++; // Increment the file count summary[1]++; // Increment the file count
return summary; return summary;
} }
@Override
public boolean isDirectory() {
return false;
}
} }

View File

@ -44,11 +44,11 @@ struct NativeMiniDfsCluster {
struct NativeMiniDfsCluster* nmdCreate(struct NativeMiniDfsConf *conf) struct NativeMiniDfsCluster* nmdCreate(struct NativeMiniDfsConf *conf)
{ {
struct NativeMiniDfsCluster* cl = NULL; struct NativeMiniDfsCluster* cl = NULL;
jobject bld = NULL, bld2 = NULL, cobj = NULL; jobject bld = NULL, cobj = NULL, cluster = NULL;
jvalue val; jvalue val;
JNIEnv *env = getJNIEnv(); JNIEnv *env = getJNIEnv();
jthrowable jthr; jthrowable jthr;
jstring jconfStr; jstring jconfStr = NULL;
if (!env) { if (!env) {
fprintf(stderr, "nmdCreate: unable to construct JNIEnv.\n"); fprintf(stderr, "nmdCreate: unable to construct JNIEnv.\n");
@ -63,14 +63,14 @@ struct NativeMiniDfsCluster* nmdCreate(struct NativeMiniDfsConf *conf)
if (jthr) { if (jthr) {
printExceptionAndFree(env, jthr, PRINT_EXC_ALL, printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"nmdCreate: new Configuration"); "nmdCreate: new Configuration");
goto error_free_cl; goto error;
} }
if (conf->webhdfsEnabled) { if (conf->webhdfsEnabled) {
jthr = newJavaStr(env, DFS_WEBHDFS_ENABLED_KEY, &jconfStr); jthr = newJavaStr(env, DFS_WEBHDFS_ENABLED_KEY, &jconfStr);
if (jthr) { if (jthr) {
printExceptionAndFree(env, jthr, PRINT_EXC_ALL, printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"nmdCreate: new String"); "nmdCreate: new String");
goto error_dlr_cobj; goto error;
} }
jthr = invokeMethod(env, NULL, INSTANCE, cobj, HADOOP_CONF, jthr = invokeMethod(env, NULL, INSTANCE, cobj, HADOOP_CONF,
"setBoolean", "(Ljava/lang/String;Z)V", "setBoolean", "(Ljava/lang/String;Z)V",
@ -78,7 +78,7 @@ struct NativeMiniDfsCluster* nmdCreate(struct NativeMiniDfsConf *conf)
if (jthr) { if (jthr) {
printExceptionAndFree(env, jthr, PRINT_EXC_ALL, printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"nmdCreate: Configuration::setBoolean"); "nmdCreate: Configuration::setBoolean");
goto error_dlr_cobj; goto error;
} }
} }
jthr = constructNewObjectOfClass(env, &bld, MINIDFS_CLUSTER_BUILDER, jthr = constructNewObjectOfClass(env, &bld, MINIDFS_CLUSTER_BUILDER,
@ -86,58 +86,53 @@ struct NativeMiniDfsCluster* nmdCreate(struct NativeMiniDfsConf *conf)
if (jthr) { if (jthr) {
printExceptionAndFree(env, jthr, PRINT_EXC_ALL, printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"nmdCreate: NativeMiniDfsCluster#Builder#Builder"); "nmdCreate: NativeMiniDfsCluster#Builder#Builder");
goto error_dlr_cobj; goto error;
} }
jthr = invokeMethod(env, &val, INSTANCE, bld, MINIDFS_CLUSTER_BUILDER, jthr = invokeMethod(env, &val, INSTANCE, bld, MINIDFS_CLUSTER_BUILDER,
"format", "(Z)L" MINIDFS_CLUSTER_BUILDER ";", conf->doFormat); "format", "(Z)L" MINIDFS_CLUSTER_BUILDER ";", conf->doFormat);
if (jthr) { if (jthr) {
printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "nmdCreate: " printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "nmdCreate: "
"Builder::format"); "Builder::format");
goto error_dlr_bld; goto error;
} }
bld2 = val.l; (*env)->DeleteLocalRef(env, val.l);
if (conf->webhdfsEnabled) { if (conf->webhdfsEnabled) {
jthr = invokeMethod(env, &val, INSTANCE, bld2, MINIDFS_CLUSTER_BUILDER, jthr = invokeMethod(env, &val, INSTANCE, bld, MINIDFS_CLUSTER_BUILDER,
"nameNodeHttpPort", "(I)L" MINIDFS_CLUSTER_BUILDER ";", "nameNodeHttpPort", "(I)L" MINIDFS_CLUSTER_BUILDER ";",
conf->namenodeHttpPort); conf->namenodeHttpPort);
if (jthr) { if (jthr) {
printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "nmdCreate: " printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "nmdCreate: "
"Builder::nameNodeHttpPort"); "Builder::nameNodeHttpPort");
goto error_dlr_bld2; goto error;
} }
(*env)->DeleteLocalRef(env, val.l);
} }
jthr = invokeMethod(env, &val, INSTANCE, bld, MINIDFS_CLUSTER_BUILDER, jthr = invokeMethod(env, &val, INSTANCE, bld, MINIDFS_CLUSTER_BUILDER,
"build", "()L" MINIDFS_CLUSTER ";"); "build", "()L" MINIDFS_CLUSTER ";");
if (jthr) { if (jthr) {
printExceptionAndFree(env, jthr, PRINT_EXC_ALL, printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"nmdCreate: Builder#build"); "nmdCreate: Builder#build");
goto error_dlr_bld2; goto error;
} }
cl->obj = (*env)->NewGlobalRef(env, val.l); cluster = val.l;
cl->obj = (*env)->NewGlobalRef(env, val.l);
if (!cl->obj) { if (!cl->obj) {
printPendingExceptionAndFree(env, PRINT_EXC_ALL, printPendingExceptionAndFree(env, PRINT_EXC_ALL,
"nmdCreate: NewGlobalRef"); "nmdCreate: NewGlobalRef");
goto error_dlr_val; goto error;
} }
(*env)->DeleteLocalRef(env, val.l); (*env)->DeleteLocalRef(env, cluster);
(*env)->DeleteLocalRef(env, bld2);
(*env)->DeleteLocalRef(env, bld); (*env)->DeleteLocalRef(env, bld);
(*env)->DeleteLocalRef(env, cobj); (*env)->DeleteLocalRef(env, cobj);
(*env)->DeleteLocalRef(env, jconfStr); (*env)->DeleteLocalRef(env, jconfStr);
return cl; return cl;
error_dlr_val: error:
(*env)->DeleteLocalRef(env, val.l); (*env)->DeleteLocalRef(env, cluster);
error_dlr_bld2:
(*env)->DeleteLocalRef(env, bld2);
error_dlr_bld:
(*env)->DeleteLocalRef(env, bld); (*env)->DeleteLocalRef(env, bld);
error_dlr_cobj:
(*env)->DeleteLocalRef(env, cobj); (*env)->DeleteLocalRef(env, cobj);
(*env)->DeleteLocalRef(env, jconfStr); (*env)->DeleteLocalRef(env, jconfStr);
error_free_cl:
free(cl); free(cl);
error:
return NULL; return NULL;
} }

View File

@ -35,6 +35,7 @@
import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
@ -42,18 +43,17 @@
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.util.ServletUtil; import org.apache.hadoop.util.ServletUtil;
import org.apache.log4j.Level; import org.apache.log4j.Level;
import org.junit.AfterClass; import org.junit.*;
import org.junit.BeforeClass;
import org.junit.Test;
public class TestHftpFileSystem { public class TestHftpFileSystem {
private static final Random RAN = new Random(); private static final Random RAN = new Random();
private static Configuration config = null; private static Configuration config = null;
private static MiniDFSCluster cluster = null; private static MiniDFSCluster cluster = null;
private static FileSystem hdfs = null;
private static HftpFileSystem hftpFs = null;
private static String blockPoolId = null; private static String blockPoolId = null;
private static String hftpUri = null;
private FileSystem hdfs = null;
private HftpFileSystem hftpFs = null;
private static Path[] TEST_PATHS = new Path[] { private static Path[] TEST_PATHS = new Path[] {
// URI does not encode, Request#getPathInfo returns /foo // URI does not encode, Request#getPathInfo returns /foo
@ -93,26 +93,33 @@ public static void setUp() throws IOException {
config = new Configuration(); config = new Configuration();
cluster = new MiniDFSCluster.Builder(config).numDataNodes(2).build(); cluster = new MiniDFSCluster.Builder(config).numDataNodes(2).build();
hdfs = cluster.getFileSystem();
blockPoolId = cluster.getNamesystem().getBlockPoolId(); blockPoolId = cluster.getNamesystem().getBlockPoolId();
final String hftpUri = hftpUri =
"hftp://" + config.get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY); "hftp://" + config.get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY);
hftpFs = (HftpFileSystem) new Path(hftpUri).getFileSystem(config);
} }
@AfterClass @AfterClass
public static void tearDown() throws IOException { public static void tearDown() throws IOException {
if (hdfs != null) {
hdfs.close();
}
if (hftpFs != null) {
hftpFs.close();
}
if (cluster != null) { if (cluster != null) {
cluster.shutdown(); cluster.shutdown();
} }
} }
@Before
public void initFileSystems() throws IOException {
hdfs = cluster.getFileSystem();
hftpFs = (HftpFileSystem) new Path(hftpUri).getFileSystem(config);
// clear out the namespace
for (FileStatus stat : hdfs.listStatus(new Path("/"))) {
hdfs.delete(stat.getPath(), true);
}
}
@After
public void resetFileSystems() throws IOException {
FileSystem.closeAll();
}
/** /**
* Test file creation and access with file names that need encoding. * Test file creation and access with file names that need encoding.
*/ */
@ -280,19 +287,8 @@ private void checkClosedStream(InputStream is) {
assertEquals("Stream closed", ioe.getMessage()); assertEquals("Stream closed", ioe.getMessage());
} }
public void resetFileSystem() throws IOException {
// filesystem caching has a quirk/bug that it caches based on the user's
// given uri. the result is if a filesystem is instantiated with no port,
// it gets the default port. then if the default port is changed,
// and another filesystem is instantiated with no port, the prior fs
// is returned, not a new one using the changed port. so let's flush
// the cache between tests...
FileSystem.closeAll();
}
@Test @Test
public void testHftpDefaultPorts() throws IOException { public void testHftpDefaultPorts() throws IOException {
resetFileSystem();
Configuration conf = new Configuration(); Configuration conf = new Configuration();
URI uri = URI.create("hftp://localhost"); URI uri = URI.create("hftp://localhost");
HftpFileSystem fs = (HftpFileSystem) FileSystem.get(uri, conf); HftpFileSystem fs = (HftpFileSystem) FileSystem.get(uri, conf);
@ -309,7 +305,6 @@ public void testHftpDefaultPorts() throws IOException {
@Test @Test
public void testHftpCustomDefaultPorts() throws IOException { public void testHftpCustomDefaultPorts() throws IOException {
resetFileSystem();
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.setInt("dfs.http.port", 123); conf.setInt("dfs.http.port", 123);
conf.setInt("dfs.https.port", 456); conf.setInt("dfs.https.port", 456);
@ -329,7 +324,6 @@ public void testHftpCustomDefaultPorts() throws IOException {
@Test @Test
public void testHftpCustomUriPortWithDefaultPorts() throws IOException { public void testHftpCustomUriPortWithDefaultPorts() throws IOException {
resetFileSystem();
Configuration conf = new Configuration(); Configuration conf = new Configuration();
URI uri = URI.create("hftp://localhost:123"); URI uri = URI.create("hftp://localhost:123");
HftpFileSystem fs = (HftpFileSystem) FileSystem.get(uri, conf); HftpFileSystem fs = (HftpFileSystem) FileSystem.get(uri, conf);
@ -346,7 +340,6 @@ public void testHftpCustomUriPortWithDefaultPorts() throws IOException {
@Test @Test
public void testHftpCustomUriPortWithCustomDefaultPorts() throws IOException { public void testHftpCustomUriPortWithCustomDefaultPorts() throws IOException {
resetFileSystem();
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.setInt("dfs.http.port", 123); conf.setInt("dfs.http.port", 123);
conf.setInt("dfs.https.port", 456); conf.setInt("dfs.https.port", 456);
@ -368,7 +361,6 @@ public void testHftpCustomUriPortWithCustomDefaultPorts() throws IOException {
@Test @Test
public void testHsftpDefaultPorts() throws IOException { public void testHsftpDefaultPorts() throws IOException {
resetFileSystem();
Configuration conf = new Configuration(); Configuration conf = new Configuration();
URI uri = URI.create("hsftp://localhost"); URI uri = URI.create("hsftp://localhost");
HsftpFileSystem fs = (HsftpFileSystem) FileSystem.get(uri, conf); HsftpFileSystem fs = (HsftpFileSystem) FileSystem.get(uri, conf);
@ -385,7 +377,6 @@ public void testHsftpDefaultPorts() throws IOException {
@Test @Test
public void testHsftpCustomDefaultPorts() throws IOException { public void testHsftpCustomDefaultPorts() throws IOException {
resetFileSystem();
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.setInt("dfs.http.port", 123); conf.setInt("dfs.http.port", 123);
conf.setInt("dfs.https.port", 456); conf.setInt("dfs.https.port", 456);
@ -405,7 +396,6 @@ public void testHsftpCustomDefaultPorts() throws IOException {
@Test @Test
public void testHsftpCustomUriPortWithDefaultPorts() throws IOException { public void testHsftpCustomUriPortWithDefaultPorts() throws IOException {
resetFileSystem();
Configuration conf = new Configuration(); Configuration conf = new Configuration();
URI uri = URI.create("hsftp://localhost:123"); URI uri = URI.create("hsftp://localhost:123");
HsftpFileSystem fs = (HsftpFileSystem) FileSystem.get(uri, conf); HsftpFileSystem fs = (HsftpFileSystem) FileSystem.get(uri, conf);
@ -422,7 +412,6 @@ public void testHsftpCustomUriPortWithDefaultPorts() throws IOException {
@Test @Test
public void testHsftpCustomUriPortWithCustomDefaultPorts() throws IOException { public void testHsftpCustomUriPortWithCustomDefaultPorts() throws IOException {
resetFileSystem();
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.setInt("dfs.http.port", 123); conf.setInt("dfs.http.port", 123);
conf.setInt("dfs.https.port", 456); conf.setInt("dfs.https.port", 456);

View File

@ -64,6 +64,7 @@
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.namenode.NamenodeFsck.Result; import org.apache.hadoop.hdfs.server.namenode.NamenodeFsck.Result;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.tools.DFSck; import org.apache.hadoop.hdfs.tools.DFSck;
@ -678,11 +679,11 @@ public void testFsckError() throws Exception {
DFSTestUtil.waitReplication(fs, filePath, (short)1); DFSTestUtil.waitReplication(fs, filePath, (short)1);
// intentionally corrupt NN data structure // intentionally corrupt NN data structure
INodeFile node = INodeFile node = (INodeFile)cluster.getNamesystem().dir.rootDir.getNode(
(INodeFile)cluster.getNamesystem().dir.rootDir.getNode(fileName, fileName, true);
true); final BlockInfo[] blocks = node.getBlocks();
assertEquals(node.blocks.length, 1); assertEquals(blocks.length, 1);
node.blocks[0].setNumBytes(-1L); // set the block length to be negative blocks[0].setNumBytes(-1L); // set the block length to be negative
// run fsck and expect a failure with -1 as the error code // run fsck and expect a failure with -1 as the error code
String outStr = runFsck(conf, -1, true, fileName); String outStr = runFsck(conf, -1, true, fileName);

View File

@ -623,6 +623,15 @@ Release 0.23.5 - UNRELEASED
MAPREDUCE-4724. job history web ui applications page should be sorted to MAPREDUCE-4724. job history web ui applications page should be sorted to
display last app first (tgraves via bobby) display last app first (tgraves via bobby)
MAPREDUCE-4746. The MR Application Master does not have a config to set
environment variables (Rob Parker via bobby)
MAPREDUCE-4729. job history UI not showing all job attempts. (Vinod
Kumar Vavilapalli via jlowe)
MAPREDUCE-4763 repair test TestUmbilicalProtocolWithJobToken (Ivan A.
Veselovsky via bobby)
Release 0.23.4 - UNRELEASED Release 0.23.4 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -23,14 +23,17 @@
import java.lang.reflect.Constructor; import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException; import java.lang.reflect.InvocationTargetException;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -45,6 +48,9 @@
import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.AMStartedEvent; import org.apache.hadoop.mapreduce.jobhistory.AMStartedEvent;
import org.apache.hadoop.mapreduce.jobhistory.EventReader;
import org.apache.hadoop.mapreduce.jobhistory.EventType;
import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
@ -89,6 +95,7 @@
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.ShutdownHookManager; import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.StringInterner;
import org.apache.hadoop.yarn.Clock; import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.ClusterInfo; import org.apache.hadoop.yarn.ClusterInfo;
import org.apache.hadoop.yarn.SystemClock; import org.apache.hadoop.yarn.SystemClock;
@ -826,16 +833,21 @@ public ClusterInfo getClusterInfo() {
@Override @Override
public void start() { public void start() {
amInfos = new LinkedList<AMInfo>();
// Pull completedTasks etc from recovery // Pull completedTasks etc from recovery
if (inRecovery) { if (inRecovery) {
completedTasksFromPreviousRun = recoveryServ.getCompletedTasks(); completedTasksFromPreviousRun = recoveryServ.getCompletedTasks();
amInfos = recoveryServ.getAMInfos(); amInfos = recoveryServ.getAMInfos();
} else {
// Get the amInfos anyways irrespective of whether recovery is enabled or
// not IF this is not the first AM generation
if (appAttemptID.getAttemptId() != 1) {
amInfos.addAll(readJustAMInfos());
}
} }
// / Create the AMInfo for the current AppMaster // Current an AMInfo for the current AM generation.
if (amInfos == null) {
amInfos = new LinkedList<AMInfo>();
}
AMInfo amInfo = AMInfo amInfo =
MRBuilderUtils.newAMInfo(appAttemptID, startTime, containerID, nmHost, MRBuilderUtils.newAMInfo(appAttemptID, startTime, containerID, nmHost,
nmPort, nmHttpPort); nmPort, nmHttpPort);
@ -893,6 +905,51 @@ public void start() {
startJobs(); startJobs();
} }
private List<AMInfo> readJustAMInfos() {
List<AMInfo> amInfos = new ArrayList<AMInfo>();
FSDataInputStream inputStream = null;
try {
inputStream =
RecoveryService.getPreviousJobHistoryFileStream(getConfig(),
appAttemptID);
EventReader jobHistoryEventReader = new EventReader(inputStream);
// All AMInfos are contiguous. Track when the first AMStartedEvent
// appears.
boolean amStartedEventsBegan = false;
HistoryEvent event;
while ((event = jobHistoryEventReader.getNextEvent()) != null) {
if (event.getEventType() == EventType.AM_STARTED) {
if (!amStartedEventsBegan) {
// First AMStartedEvent.
amStartedEventsBegan = true;
}
AMStartedEvent amStartedEvent = (AMStartedEvent) event;
amInfos.add(MRBuilderUtils.newAMInfo(
amStartedEvent.getAppAttemptId(), amStartedEvent.getStartTime(),
amStartedEvent.getContainerId(),
StringInterner.weakIntern(amStartedEvent.getNodeManagerHost()),
amStartedEvent.getNodeManagerPort(),
amStartedEvent.getNodeManagerHttpPort()));
} else if (amStartedEventsBegan) {
// This means AMStartedEvents began and this event is a
// non-AMStarted event.
// No need to continue reading all the other events.
break;
}
}
} catch (IOException e) {
LOG.warn("Could not parse the old history file. "
+ "Will not have old AMinfos ", e);
} finally {
if (inputStream != null) {
IOUtils.closeQuietly(inputStream);
}
}
return amInfos;
}
/** /**
* This can be overridden to instantiate multiple jobs and create a * This can be overridden to instantiate multiple jobs and create a
* workflow. * workflow.

View File

@ -34,7 +34,6 @@
import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
@ -178,26 +177,13 @@ public List<AMInfo> getAMInfos() {
} }
private void parse() throws IOException { private void parse() throws IOException {
// TODO: parse history file based on startCount FSDataInputStream in =
String jobName = getPreviousJobHistoryFileStream(getConfig(), applicationAttemptId);
TypeConverter.fromYarn(applicationAttemptId.getApplicationId()).toString();
String jobhistoryDir = JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(getConfig());
FSDataInputStream in = null;
Path historyFile = null;
Path histDirPath = FileContext.getFileContext(getConfig()).makeQualified(
new Path(jobhistoryDir));
FileContext fc = FileContext.getFileContext(histDirPath.toUri(),
getConfig());
//read the previous history file
historyFile = fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile(
histDirPath, jobName, (applicationAttemptId.getAttemptId() - 1)));
LOG.info("History file is at " + historyFile);
in = fc.open(historyFile);
JobHistoryParser parser = new JobHistoryParser(in); JobHistoryParser parser = new JobHistoryParser(in);
jobInfo = parser.parse(); jobInfo = parser.parse();
Exception parseException = parser.getParseException(); Exception parseException = parser.getParseException();
if (parseException != null) { if (parseException != null) {
LOG.info("Got an error parsing job-history file " + historyFile + LOG.info("Got an error parsing job-history file" +
", ignoring incomplete events.", parseException); ", ignoring incomplete events.", parseException);
} }
Map<org.apache.hadoop.mapreduce.TaskID, TaskInfo> taskInfos = jobInfo Map<org.apache.hadoop.mapreduce.TaskID, TaskInfo> taskInfos = jobInfo
@ -213,6 +199,28 @@ private void parse() throws IOException {
LOG.info("Read completed tasks from history " LOG.info("Read completed tasks from history "
+ completedTasks.size()); + completedTasks.size());
} }
public static FSDataInputStream getPreviousJobHistoryFileStream(
Configuration conf, ApplicationAttemptId applicationAttemptId)
throws IOException {
FSDataInputStream in = null;
Path historyFile = null;
String jobName =
TypeConverter.fromYarn(applicationAttemptId.getApplicationId())
.toString();
String jobhistoryDir =
JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(conf);
Path histDirPath =
FileContext.getFileContext(conf).makeQualified(new Path(jobhistoryDir));
FileContext fc = FileContext.getFileContext(histDirPath.toUri(), conf);
// read the previous history file
historyFile =
fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile(histDirPath,
jobName, (applicationAttemptId.getAttemptId() - 1)));
LOG.info("History file is at " + historyFile);
in = fc.open(historyFile);
return in;
}
protected Dispatcher createRecoveryDispatcher() { protected Dispatcher createRecoveryDispatcher() {
return new RecoveryDispatcher(); return new RecoveryDispatcher();

View File

@ -0,0 +1,84 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.app;
import java.util.Iterator;
import java.util.List;
import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.app.TestRecovery.MRAppWithHistory;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.junit.Test;
public class TestAMInfos {
@Test
public void testAMInfosWithoutRecoveryEnabled() throws Exception {
int runCount = 0;
MRApp app =
new MRAppWithHistory(1, 0, false, this.getClass().getName(), true,
++runCount);
Configuration conf = new Configuration();
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
Job job = app.submit(conf);
app.waitForState(job, JobState.RUNNING);
long am1StartTime = app.getAllAMInfos().get(0).getStartTime();
Assert.assertEquals("No of tasks not correct", 1, job.getTasks().size());
Iterator<Task> it = job.getTasks().values().iterator();
Task mapTask = it.next();
app.waitForState(mapTask, TaskState.RUNNING);
TaskAttempt taskAttempt = mapTask.getAttempts().values().iterator().next();
app.waitForState(taskAttempt, TaskAttemptState.RUNNING);
// stop the app
app.stop();
// rerun
app =
new MRAppWithHistory(1, 0, false, this.getClass().getName(), false,
++runCount);
conf = new Configuration();
// in rerun the AMInfo will be recovered from previous run even if recovery
// is not enabled.
conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, false);
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
job = app.submit(conf);
app.waitForState(job, JobState.RUNNING);
Assert.assertEquals("No of tasks not correct", 1, job.getTasks().size());
it = job.getTasks().values().iterator();
mapTask = it.next();
// There should be two AMInfos
List<AMInfo> amInfos = app.getAllAMInfos();
Assert.assertEquals(2, amInfos.size());
AMInfo amInfoOne = amInfos.get(0);
Assert.assertEquals(am1StartTime, amInfoOne.getStartTime());
app.stop();
}
}

View File

@ -486,6 +486,9 @@ public interface MRJobConfig {
public static final int DEFAULT_MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD = public static final int DEFAULT_MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD =
50; 50;
public static final String MR_AM_ENV =
MR_AM_PREFIX + "env";
public static final String MAPRED_MAP_ADMIN_JAVA_OPTS = public static final String MAPRED_MAP_ADMIN_JAVA_OPTS =
"mapreduce.admin.map.child.java.opts"; "mapreduce.admin.map.child.java.opts";

View File

@ -839,6 +839,33 @@
mapreduce.job.end-notification.max.retry.interval</description> mapreduce.job.end-notification.max.retry.interval</description>
</property> </property>
<property>
<name>yarn.app.mapreduce.am.env</name>
<value></value>
<description>User added environment variables for the MR App Master
processes. Example :
1) A=foo This will set the env variable A to foo
2) B=$B:c This is inherit tasktracker's B env variable.
</description>
</property>
<property>
<name>yarn.app.mapreduce.am.command-opts</name>
<value>-Xmx1024m</value>
<description>Java opts for the MR App Master processes.
The following symbol, if present, will be interpolated: @taskid@ is replaced
by current TaskID. Any other occurrences of '@' will go unchanged.
For example, to enable verbose gc logging to a file named for the taskid in
/tmp and to set the heap maximum to be a gigabyte, pass a 'value' of:
-Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc
Usage of -Djava.library.path can cause programs to no longer function if
hadoop native libraries are used. These values should instead be set as part
of LD_LIBRARY_PATH in the map / reduce JVM env using the mapreduce.map.env and
mapreduce.reduce.env config settings.
</description>
</property>
<property> <property>
<name>yarn.app.mapreduce.am.job.task.listener.thread-count</name> <name>yarn.app.mapreduce.am.job.task.listener.thread-count</name>
<value>30</value> <value>30</value>

View File

@ -397,7 +397,7 @@ public ApplicationSubmissionContext createApplicationSubmissionContext(
Vector<String> vargsFinal = new Vector<String>(8); Vector<String> vargsFinal = new Vector<String>(8);
// Final commmand // Final command
StringBuilder mergedCommand = new StringBuilder(); StringBuilder mergedCommand = new StringBuilder();
for (CharSequence str : vargs) { for (CharSequence str : vargs) {
mergedCommand.append(str).append(" "); mergedCommand.append(str).append(" ");
@ -411,6 +411,10 @@ public ApplicationSubmissionContext createApplicationSubmissionContext(
// i.e. add { Hadoop jars, job jar, CWD } to classpath. // i.e. add { Hadoop jars, job jar, CWD } to classpath.
Map<String, String> environment = new HashMap<String, String>(); Map<String, String> environment = new HashMap<String, String>();
MRApps.setClasspath(environment, conf); MRApps.setClasspath(environment, conf);
// Setup the environment variables (LD_LIBRARY_PATH, etc)
MRApps.setEnvFromInputString(environment,
conf.get(MRJobConfig.MR_AM_ENV));
// Parse distributed cache // Parse distributed cache
MRApps.setupDistributedCache(jobConf, localResources); MRApps.setupDistributedCache(jobConf, localResources);

View File

@ -51,11 +51,14 @@
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.log4j.Level; import org.apache.log4j.Level;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
/** Unit tests for using Job Token over RPC. */ /** Unit tests for using Job Token over RPC.
@Ignore *
* System properties required:
* -Djava.security.krb5.conf=.../hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/target/test-classes/krb5.conf
* -Djava.net.preferIPv4Stack=true
*/
public class TestUmbilicalProtocolWithJobToken { public class TestUmbilicalProtocolWithJobToken {
private static final String ADDRESS = "0.0.0.0"; private static final String ADDRESS = "0.0.0.0";