HADOOP-12077. Provide a multi-URI replication Inode for ViewFs. Contributed by Gera Shegalov

This commit is contained in:
Chris Douglas 2017-09-05 23:30:18 -07:00
parent 63720ef574
commit 1f3bc63e67
9 changed files with 1275 additions and 63 deletions

View File

@ -20,6 +20,7 @@
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.StringUtils;
/**
* Utilities for config variables of the viewFs See {@link ViewFs}
@ -68,6 +69,32 @@ public static void addLink(final Configuration conf, final String src,
src, target);
}
/**
*
* @param conf
* @param mountTableName
* @param src
* @param settings
* @param targets
*/
public static void addLinkNfly(Configuration conf, String mountTableName,
String src, String settings, final URI ... targets) {
settings = settings == null
? "minReplication=2,repairOnRead=true"
: settings;
conf.set(getConfigViewFsPrefix(mountTableName) + "." +
Constants.CONFIG_VIEWFS_LINK_NFLY + "." + settings + "." + src,
StringUtils.uriToString(targets));
}
public static void addLinkNfly(final Configuration conf, final String src,
final URI ... targets) {
addLinkNfly(conf, Constants.CONFIG_VIEWFS_DEFAULT_MOUNT_TABLE, src, null,
targets);
}
/**
* Add config variable for homedir for default mount table
* @param conf - add to this conf

View File

@ -57,7 +57,13 @@ public interface Constants {
* Config variable for specifying a merge link
*/
public static final String CONFIG_VIEWFS_LINK_MERGE = "linkMerge";
/**
* Config variable for specifying an nfly link. Nfly writes to multiple
* locations, and allows reads from the closest one.
*/
String CONFIG_VIEWFS_LINK_NFLY = "linkNfly";
/**
* Config variable for specifying a merge of the root of the mount-table
* with the root of another file system.

View File

@ -133,6 +133,12 @@ void addLink(final String pathComponent, final INodeLink<T> link)
}
}
enum LinkType {
SINGLE,
MERGE,
NFLY
}
/**
* An internal class to represent a mount link.
* A mount link can be single dir link or a merge dir link.
@ -146,19 +152,17 @@ void addLink(final String pathComponent, final INodeLink<T> link)
* is changed later it is then ignored (a dir with null entries)
*/
static class INodeLink<T> extends INode<T> {
final boolean isMergeLink; // true if MergeLink
final URI[] targetDirLinkList;
final T targetFileSystem; // file system object created from the link.
/**
* Construct a mergeLink.
* Construct a mergeLink or nfly.
*/
INodeLink(final String pathToNode, final UserGroupInformation aUgi,
final T targetMergeFs, final URI[] aTargetDirLinkList) {
super(pathToNode, aUgi);
targetFileSystem = targetMergeFs;
targetDirLinkList = aTargetDirLinkList;
isMergeLink = true;
}
/**
@ -170,7 +174,6 @@ static class INodeLink<T> extends INode<T> {
targetFileSystem = targetFs;
targetDirLinkList = new URI[1];
targetDirLinkList[0] = aTargetDirLink;
isMergeLink = false;
}
/**
@ -188,7 +191,9 @@ Path getTargetLink() {
}
private void createLink(final String src, final String target,
final boolean isLinkMerge, final UserGroupInformation aUgi)
final LinkType linkType, final String settings,
final UserGroupInformation aUgi,
final Configuration config)
throws URISyntaxException, IOException,
FileAlreadyExistsException, UnsupportedFileSystemException {
// Validate that src is valid absolute path
@ -235,18 +240,20 @@ private void createLink(final String src, final String target,
final INodeLink<T> newLink;
final String fullPath = curInode.fullPath + (curInode == root ? "" : "/")
+ iPath;
if (isLinkMerge) { // Target is list of URIs
String[] targetsList = StringUtils.getStrings(target);
URI[] targetsListURI = new URI[targetsList.length];
int k = 0;
for (String itarget : targetsList) {
targetsListURI[k++] = new URI(itarget);
}
newLink = new INodeLink<T>(fullPath, aUgi,
getTargetFileSystem(targetsListURI), targetsListURI);
} else {
switch (linkType) {
case SINGLE:
newLink = new INodeLink<T>(fullPath, aUgi,
getTargetFileSystem(new URI(target)), new URI(target));
break;
case MERGE:
case NFLY:
final URI[] targetUris = StringUtils.stringToURI(
StringUtils.getStrings(target));
newLink = new INodeLink<T>(fullPath, aUgi,
getTargetFileSystem(settings, targetUris), targetUris);
break;
default:
throw new IllegalArgumentException(linkType + ": Infeasible linkType");
}
curInode.addLink(iPath, newLink);
mountPoints.add(new MountPoint<T>(src, newLink));
@ -257,14 +264,14 @@ private void createLink(final String src, final String target,
* 3 abstract methods.
* @throws IOException
*/
protected abstract T getTargetFileSystem(final URI uri)
protected abstract T getTargetFileSystem(URI uri)
throws UnsupportedFileSystemException, URISyntaxException, IOException;
protected abstract T getTargetFileSystem(final INodeDir<T> dir)
protected abstract T getTargetFileSystem(INodeDir<T> dir)
throws URISyntaxException;
protected abstract T getTargetFileSystem(final URI[] mergeFsURIList)
throws UnsupportedFileSystemException, URISyntaxException;
protected abstract T getTargetFileSystem(String settings, URI[] mergeFsURIs)
throws UnsupportedFileSystemException, URISyntaxException, IOException;
/**
* Create Inode Tree from the specified mount-table specified in Config
@ -298,8 +305,9 @@ protected InodeTree(final Configuration config, final String viewName)
final String key = si.getKey();
if (key.startsWith(mtPrefix)) {
gotMountTableEntry = true;
boolean isMergeLink = false;
LinkType linkType = LinkType.SINGLE;
String src = key.substring(mtPrefix.length());
String settings = null;
if (src.startsWith(linkPrefix)) {
src = src.substring(linkPrefix.length());
if (src.equals(SlashPath.toString())) {
@ -309,8 +317,20 @@ protected InodeTree(final Configuration config, final String viewName)
+ "supported yet.");
}
} else if (src.startsWith(linkMergePrefix)) { // A merge link
isMergeLink = true;
linkType = LinkType.MERGE;
src = src.substring(linkMergePrefix.length());
} else if (src.startsWith(Constants.CONFIG_VIEWFS_LINK_NFLY)) {
// prefix.settings.src
src = src.substring(Constants.CONFIG_VIEWFS_LINK_NFLY.length() + 1);
// settings.src
settings = src.substring(0, src.indexOf('.'));
// settings
// settings.src
src = src.substring(settings.length() + 1);
// src
linkType = LinkType.NFLY;
} else if (src.startsWith(Constants.CONFIG_VIEWFS_HOMEDIR)) {
// ignore - we set home dir from config
continue;
@ -319,7 +339,7 @@ protected InodeTree(final Configuration config, final String viewName)
"Mount table in config: " + src);
}
final String target = si.getValue(); // link or merge link
createLink(src, target, isMergeLink, ugi);
createLink(src, target, linkType, settings, ugi, config);
}
}
if (!gotMountTableEntry) {

View File

@ -0,0 +1,951 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.viewfs;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.net.DNSToSwitchMapping;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.net.ScriptBasedMapping;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.List;
/**
* Nfly is a multi filesystem mount point.
*/
@Private
final class NflyFSystem extends FileSystem {
private static final Log LOG = LogFactory.getLog(NflyFSystem.class);
private static final String NFLY_TMP_PREFIX = "_nfly_tmp_";
enum NflyKey {
// minimum replication, if local filesystem is included +1 is recommended
minReplication,
// forces to check all the replicas and fetch the one with the most recent
// time stamp
//
readMostRecent,
// create missing replica from far to near, including local?
repairOnRead
}
private static final int DEFAULT_MIN_REPLICATION = 2;
private static URI nflyURI = URI.create("nfly:///");
private final NflyNode[] nodes;
private final int minReplication;
private final EnumSet<NflyKey> nflyFlags;
private final Node myNode;
private final NetworkTopology topology;
/**
* URI's authority is used as an approximation of the distance from the
* client. It's sufficient for DC but not accurate because worker nodes can be
* closer.
*/
private static class NflyNode extends NodeBase {
private final ChRootedFileSystem fs;
NflyNode(String hostName, String rackName, URI uri,
Configuration conf) throws IOException {
this(hostName, rackName, new ChRootedFileSystem(uri, conf));
}
NflyNode(String hostName, String rackName, ChRootedFileSystem fs) {
super(hostName, rackName);
this.fs = fs;
}
ChRootedFileSystem getFs() {
return fs;
}
@Override
public boolean equals(Object o) {
// satisfy findbugs
return super.equals(o);
}
@Override
public int hashCode() {
// satisfy findbugs
return super.hashCode();
}
}
private static final class MRNflyNode
extends NflyNode implements Comparable<MRNflyNode> {
private FileStatus status;
private MRNflyNode(NflyNode n) {
super(n.getName(), n.getNetworkLocation(), n.fs);
}
private void updateFileStatus(Path f) throws IOException {
final FileStatus tmpStatus = getFs().getFileStatus(f);
status = tmpStatus == null
? notFoundStatus(f)
: tmpStatus;
}
// TODO allow configurable error margin for FileSystems with different
// timestamp precisions
@Override
public int compareTo(MRNflyNode other) {
if (status == null) {
return other.status == null ? 0 : 1; // move non-null towards head
} else if (other.status == null) {
return -1; // move this towards head
} else {
final long mtime = status.getModificationTime();
final long their = other.status.getModificationTime();
return Long.compare(their, mtime); // move more recent towards head
}
}
@Override
public boolean equals(Object o) {
if (!(o instanceof MRNflyNode)) {
return false;
}
MRNflyNode other = (MRNflyNode) o;
return 0 == compareTo(other);
}
@Override
public int hashCode() {
// satisfy findbugs
return super.hashCode();
}
private FileStatus nflyStatus() throws IOException {
return new NflyStatus(getFs(), status);
}
private FileStatus cloneStatus() throws IOException {
return new FileStatus(status.getLen(),
status.isDirectory(),
status.getReplication(),
status.getBlockSize(),
status.getModificationTime(),
status.getAccessTime(),
null, null, null,
status.isSymlink() ? status.getSymlink() : null,
status.getPath());
}
}
private MRNflyNode[] workSet() {
final MRNflyNode[] res = new MRNflyNode[nodes.length];
for (int i = 0; i < res.length; i++) {
res[i] = new MRNflyNode(nodes[i]);
}
return res;
}
/**
* Utility to replace null with DEFAULT_RACK.
*
* @param rackString rack value, can be null
* @return non-null rack string
*/
private static String getRack(String rackString) {
return rackString == null ? NetworkTopology.DEFAULT_RACK : rackString;
}
/**
* Creates a new Nfly instance.
*
* @param uris the list of uris in the mount point
* @param conf configuration object
* @param minReplication minimum copies to commit a write op
* @param nflyFlags modes such readMostRecent
* @throws IOException
*/
private NflyFSystem(URI[] uris, Configuration conf, int minReplication,
EnumSet<NflyKey> nflyFlags) throws IOException {
if (uris.length < minReplication) {
throw new IOException(minReplication + " < " + uris.length
+ ": Minimum replication < #destinations");
}
setConf(conf);
final String localHostName = InetAddress.getLocalHost().getHostName();
// build a list for topology resolution
final List<String> hostStrings = new ArrayList<String>(uris.length + 1);
for (URI uri : uris) {
final String uriHost = uri.getHost();
// assume local file system or another closest filesystem if no authority
hostStrings.add(uriHost == null ? localHostName : uriHost);
}
// resolve the client node
hostStrings.add(localHostName);
final DNSToSwitchMapping tmpDns = ReflectionUtils.newInstance(conf.getClass(
CommonConfigurationKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
ScriptBasedMapping.class, DNSToSwitchMapping.class), conf);
// this is an ArrayList
final List<String> rackStrings = tmpDns.resolve(hostStrings);
nodes = new NflyNode[uris.length];
final Iterator<String> rackIter = rackStrings.iterator();
for (int i = 0; i < nodes.length; i++) {
nodes[i] = new NflyNode(hostStrings.get(i), rackIter.next(), uris[i],
conf);
}
// sort all the uri's by distance from myNode, the local file system will
// automatically be the the first one.
//
myNode = new NodeBase(localHostName, getRack(rackIter.next()));
topology = NetworkTopology.getInstance(conf);
topology.sortByDistance(myNode, nodes, nodes.length);
this.minReplication = minReplication;
this.nflyFlags = nflyFlags;
statistics = getStatistics(nflyURI.getScheme(), getClass());
}
/**
* Transactional output stream. When creating path /dir/file
* 1) create invisible /real/dir_i/_nfly_tmp_file
* 2) when more than min replication was written, write is committed by
* renaming all successfully written files to /real/dir_i/file
*/
private final class NflyOutputStream extends OutputStream {
// actual path
private final Path nflyPath;
// tmp path before commit
private final Path tmpPath;
// broadcast set
private final FSDataOutputStream[] outputStreams;
// status set: 1 working, 0 problem
private final BitSet opSet;
private final boolean useOverwrite;
private NflyOutputStream(Path f, FsPermission permission, boolean overwrite,
int bufferSize, short replication, long blockSize,
Progressable progress) throws IOException {
nflyPath = f;
tmpPath = getNflyTmpPath(f);
outputStreams = new FSDataOutputStream[nodes.length];
for (int i = 0; i < outputStreams.length; i++) {
outputStreams[i] = nodes[i].fs.create(tmpPath, permission, true,
bufferSize, replication, blockSize, progress);
}
opSet = new BitSet(outputStreams.length);
opSet.set(0, outputStreams.length);
useOverwrite = false;
}
//
// TODO consider how to clean up and throw an exception early when the clear
// bits under min replication
//
private void mayThrow(List<IOException> ioExceptions) throws IOException {
final IOException ioe = MultipleIOException
.createIOException(ioExceptions);
if (opSet.cardinality() < minReplication) {
throw ioe;
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Exceptions occurred: " + ioe);
}
}
}
@Override
public void write(int d) throws IOException {
final List<IOException> ioExceptions = new ArrayList<IOException>();
for (int i = opSet.nextSetBit(0);
i >=0;
i = opSet.nextSetBit(i + 1)) {
try {
outputStreams[i].write(d);
} catch (Throwable t) {
osException(i, "write", t, ioExceptions);
}
}
mayThrow(ioExceptions);
}
private void osException(int i, String op, Throwable t,
List<IOException> ioExceptions) {
opSet.clear(i);
processThrowable(nodes[i], op, t, ioExceptions, tmpPath, nflyPath);
}
@Override
public void write(byte[] bytes, int offset, int len) throws IOException {
final List<IOException> ioExceptions = new ArrayList<IOException>();
for (int i = opSet.nextSetBit(0);
i >= 0;
i = opSet.nextSetBit(i + 1)) {
try {
outputStreams[i].write(bytes, offset, len);
} catch (Throwable t) {
osException(i, "write", t, ioExceptions);
}
}
mayThrow(ioExceptions);
}
@Override
public void flush() throws IOException {
final List<IOException> ioExceptions = new ArrayList<IOException>();
for (int i = opSet.nextSetBit(0);
i >= 0;
i = opSet.nextSetBit(i + 1)) {
try {
outputStreams[i].flush();
} catch (Throwable t) {
osException(i, "flush", t, ioExceptions);
}
}
mayThrow(ioExceptions);
}
@Override
public void close() throws IOException {
final List<IOException> ioExceptions = new ArrayList<IOException>();
for (int i = opSet.nextSetBit(0);
i >= 0;
i = opSet.nextSetBit(i + 1)) {
try {
outputStreams[i].close();
} catch (Throwable t) {
osException(i, "close", t, ioExceptions);
}
}
if (opSet.cardinality() < minReplication) {
cleanupAllTmpFiles();
throw new IOException("Failed to sufficiently replicate: min="
+ minReplication + " actual=" + opSet.cardinality());
} else {
commit();
}
}
private void cleanupAllTmpFiles() throws IOException {
for (int i = 0; i < outputStreams.length; i++) {
try {
nodes[i].fs.delete(tmpPath);
} catch (Throwable t) {
processThrowable(nodes[i], "delete", t, null, tmpPath);
}
}
}
private void commit() throws IOException {
final List<IOException> ioExceptions = new ArrayList<IOException>();
for (int i = opSet.nextSetBit(0);
i >= 0;
i = opSet.nextSetBit(i + 1)) {
final NflyNode nflyNode = nodes[i];
try {
if (useOverwrite) {
nflyNode.fs.delete(nflyPath);
}
nflyNode.fs.rename(tmpPath, nflyPath);
} catch (Throwable t) {
osException(i, "commit", t, ioExceptions);
}
}
if (opSet.cardinality() < minReplication) {
// cleanup should be done outside. If rename failed, it's unlikely that
// delete will work either. It's the same kind of metadata-only op
//
throw MultipleIOException.createIOException(ioExceptions);
}
// best effort to have a consistent timestamp
final long commitTime = System.currentTimeMillis();
for (int i = opSet.nextSetBit(0);
i >= 0;
i = opSet.nextSetBit(i + 1)) {
try {
nodes[i].fs.setTimes(nflyPath, commitTime, commitTime);
} catch (Throwable t) {
LOG.info("Failed to set timestamp: " + nodes[i] + " " + nflyPath);
}
}
}
}
private Path getNflyTmpPath(Path f) {
return new Path(f.getParent(), NFLY_TMP_PREFIX + f.getName());
}
/**
* // TODO
* Some file status implementations have expensive deserialization or metadata
* retrieval. This probably does not go beyond RawLocalFileSystem. Wrapping
* the the real file status to preserve this behavior. Otherwise, calling
* realStatus getters in constructor defeats this design.
*/
static final class NflyStatus extends FileStatus {
private static final long serialVersionUID = 0x21f276d8;
private final FileStatus realStatus;
private final String strippedRoot;
private NflyStatus(ChRootedFileSystem realFs, FileStatus realStatus)
throws IOException {
this.realStatus = realStatus;
this.strippedRoot = realFs.stripOutRoot(realStatus.getPath());
}
String stripRoot() throws IOException {
return strippedRoot;
}
@Override
public long getLen() {
return realStatus.getLen();
}
@Override
public boolean isFile() {
return realStatus.isFile();
}
@Override
public boolean isDirectory() {
return realStatus.isDirectory();
}
@Override
public boolean isSymlink() {
return realStatus.isSymlink();
}
@Override
public long getBlockSize() {
return realStatus.getBlockSize();
}
@Override
public short getReplication() {
return realStatus.getReplication();
}
@Override
public long getModificationTime() {
return realStatus.getModificationTime();
}
@Override
public long getAccessTime() {
return realStatus.getAccessTime();
}
@Override
public FsPermission getPermission() {
return realStatus.getPermission();
}
@Override
public String getOwner() {
return realStatus.getOwner();
}
@Override
public String getGroup() {
return realStatus.getGroup();
}
@Override
public Path getPath() {
return realStatus.getPath();
}
@Override
public void setPath(Path p) {
realStatus.setPath(p);
}
@Override
public Path getSymlink() throws IOException {
return realStatus.getSymlink();
}
@Override
public void setSymlink(Path p) {
realStatus.setSymlink(p);
}
@Override
public boolean equals(Object o) {
return realStatus.equals(o);
}
@Override
public int hashCode() {
return realStatus.hashCode();
}
@Override
public String toString() {
return realStatus.toString();
}
}
@Override
public URI getUri() {
return nflyURI;
}
/**
* Category: READ.
*
* @param f the file name to open
* @param bufferSize the size of the buffer to be used.
* @return input stream according to nfly flags (closest, most recent)
* @throws IOException
* @throws FileNotFoundException iff all destinations generate this exception
*/
@Override
public FSDataInputStream open(Path f, int bufferSize) throws IOException {
// TODO proxy stream for reads
final List<IOException> ioExceptions =
new ArrayList<IOException>(nodes.length);
int numNotFounds = 0;
final MRNflyNode[] mrNodes = workSet();
// naively iterate until one can be opened
//
for (final MRNflyNode nflyNode : mrNodes) {
try {
if (nflyFlags.contains(NflyKey.repairOnRead)
|| nflyFlags.contains(NflyKey.readMostRecent)) {
// calling file status to avoid pulling bytes prematurely
nflyNode.updateFileStatus(f);
} else {
return nflyNode.getFs().open(f, bufferSize);
}
} catch (FileNotFoundException fnfe) {
nflyNode.status = notFoundStatus(f);
numNotFounds++;
processThrowable(nflyNode, "open", fnfe, ioExceptions, f);
} catch (Throwable t) {
processThrowable(nflyNode, "open", t, ioExceptions, f);
}
}
if (nflyFlags.contains(NflyKey.readMostRecent)) {
// sort from most recent to least recent
Arrays.sort(mrNodes);
}
final FSDataInputStream fsdisAfterRepair = repairAndOpen(mrNodes, f,
bufferSize);
if (fsdisAfterRepair != null) {
return fsdisAfterRepair;
}
mayThrowFileNotFound(ioExceptions, numNotFounds);
throw MultipleIOException.createIOException(ioExceptions);
}
private static FileStatus notFoundStatus(Path f) {
return new FileStatus(-1, false, 0, 0, 0, f);
}
/**
* Iterate all available nodes in the proximity order to attempt repair of all
* FileNotFound nodes.
*
* @param mrNodes work set copy of nodes
* @param f path to repair and open
* @param bufferSize buffer size for read RPC
* @return the closest/most recent replica stream AFTER repair
*/
private FSDataInputStream repairAndOpen(MRNflyNode[] mrNodes, Path f,
int bufferSize) {
long maxMtime = 0L;
for (final MRNflyNode srcNode : mrNodes) {
if (srcNode.status == null // not available
|| srcNode.status.getLen() < 0L) { // not found
continue; // not available
}
if (srcNode.status.getModificationTime() > maxMtime) {
maxMtime = srcNode.status.getModificationTime();
}
// attempt to repair all notFound nodes with srcNode
//
for (final MRNflyNode dstNode : mrNodes) {
if (dstNode.status == null // not available
|| srcNode.compareTo(dstNode) == 0) { // same mtime
continue;
}
try {
// status is absolute from the underlying mount, making it chrooted
//
final FileStatus srcStatus = srcNode.cloneStatus();
srcStatus.setPath(f);
final Path tmpPath = getNflyTmpPath(f);
FileUtil.copy(srcNode.getFs(), srcStatus, dstNode.getFs(), tmpPath,
false, // don't delete
true, // overwrite
getConf());
dstNode.getFs().delete(f, false);
if (dstNode.getFs().rename(tmpPath, f)) {
try {
dstNode.getFs().setTimes(f, srcNode.status.getModificationTime(),
srcNode.status.getAccessTime());
} finally {
// save getFileStatus rpc
srcStatus.setPath(dstNode.getFs().makeQualified(f));
dstNode.status = srcStatus;
}
}
} catch (IOException ioe) {
// can blame the source by statusSet.clear(ai), however, it would
// cost an extra RPC, so just rely on the loop below that will attempt
// an open anyhow
//
LOG.info(f + " " + srcNode + "->" + dstNode + ": Failed to repair",
ioe);
}
}
}
// Since Java7, QuickSort is used instead of MergeSort.
// QuickSort may not be stable and thus the equal most recent nodes, may no
// longer appear in the NetworkTopology order.
//
if (maxMtime > 0) {
final List<MRNflyNode> mrList = new ArrayList<MRNflyNode>();
for (final MRNflyNode openNode : mrNodes) {
if (openNode.status != null && openNode.status.getLen() >= 0L) {
if (openNode.status.getModificationTime() == maxMtime) {
mrList.add(openNode);
}
}
}
// assert mrList.size > 0
final MRNflyNode[] readNodes = mrList.toArray(new MRNflyNode[0]);
topology.sortByDistance(myNode, readNodes, readNodes.length);
for (final MRNflyNode rNode : readNodes) {
try {
return rNode.getFs().open(f, bufferSize);
} catch (IOException e) {
LOG.info(f + ": Failed to open at " + rNode.getFs().getUri());
}
}
}
return null;
}
private void mayThrowFileNotFound(List<IOException> ioExceptions,
int numNotFounds) throws FileNotFoundException {
if (numNotFounds == nodes.length) {
throw (FileNotFoundException)ioExceptions.get(nodes.length - 1);
}
}
// WRITE
@Override
public FSDataOutputStream create(Path f, FsPermission permission,
boolean overwrite, int bufferSize, short replication, long blockSize,
Progressable progress) throws IOException {
return new FSDataOutputStream(new NflyOutputStream(f, permission, overwrite,
bufferSize, replication, blockSize, progress), statistics);
}
// WRITE
@Override
public FSDataOutputStream append(Path f, int bufferSize,
Progressable progress) throws IOException {
return null;
}
// WRITE
@Override
public boolean rename(Path src, Path dst) throws IOException {
final List<IOException> ioExceptions = new ArrayList<IOException>();
int numNotFounds = 0;
boolean succ = true;
for (final NflyNode nflyNode : nodes) {
try {
succ &= nflyNode.fs.rename(src, dst);
} catch (FileNotFoundException fnfe) {
numNotFounds++;
processThrowable(nflyNode, "rename", fnfe, ioExceptions, src, dst);
} catch (Throwable t) {
processThrowable(nflyNode, "rename", t, ioExceptions, src, dst);
succ = false;
}
}
mayThrowFileNotFound(ioExceptions, numNotFounds);
// if all destinations threw exceptions throw, otherwise return
//
if (ioExceptions.size() == nodes.length) {
throw MultipleIOException.createIOException(ioExceptions);
}
return succ;
}
// WRITE
@Override
public boolean delete(Path f, boolean recursive) throws IOException {
final List<IOException> ioExceptions = new ArrayList<IOException>();
int numNotFounds = 0;
boolean succ = true;
for (final NflyNode nflyNode : nodes) {
try {
succ &= nflyNode.fs.delete(f);
} catch (FileNotFoundException fnfe) {
numNotFounds++;
processThrowable(nflyNode, "delete", fnfe, ioExceptions, f);
} catch (Throwable t) {
processThrowable(nflyNode, "delete", t, ioExceptions, f);
succ = false;
}
}
mayThrowFileNotFound(ioExceptions, numNotFounds);
// if all destinations threw exceptions throw, otherwise return
//
if (ioExceptions.size() == nodes.length) {
throw MultipleIOException.createIOException(ioExceptions);
}
return succ;
}
/**
* Returns the closest non-failing destination's result.
*
* @param f given path
* @return array of file statuses according to nfly modes
* @throws FileNotFoundException
* @throws IOException
*/
@Override
public FileStatus[] listStatus(Path f) throws FileNotFoundException,
IOException {
final List<IOException> ioExceptions =
new ArrayList<IOException>(nodes.length);
final MRNflyNode[] mrNodes = workSet();
if (nflyFlags.contains(NflyKey.readMostRecent)) {
int numNotFounds = 0;
for (final MRNflyNode nflyNode : mrNodes) {
try {
nflyNode.updateFileStatus(f);
} catch (FileNotFoundException fnfe) {
numNotFounds++;
processThrowable(nflyNode, "listStatus", fnfe, ioExceptions, f);
} catch (Throwable t) {
processThrowable(nflyNode, "listStatus", t, ioExceptions, f);
}
}
mayThrowFileNotFound(ioExceptions, numNotFounds);
Arrays.sort(mrNodes);
}
int numNotFounds = 0;
for (final MRNflyNode nflyNode : mrNodes) {
try {
final FileStatus[] realStats = nflyNode.getFs().listStatus(f);
final FileStatus[] nflyStats = new FileStatus[realStats.length];
for (int i = 0; i < realStats.length; i++) {
nflyStats[i] = new NflyStatus(nflyNode.getFs(), realStats[i]);
}
return nflyStats;
} catch (FileNotFoundException fnfe) {
numNotFounds++;
processThrowable(nflyNode, "listStatus", fnfe, ioExceptions, f);
} catch (Throwable t) {
processThrowable(nflyNode, "listStatus", t, ioExceptions, f);
}
}
mayThrowFileNotFound(ioExceptions, numNotFounds);
throw MultipleIOException.createIOException(ioExceptions);
}
@Override
public RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f)
throws FileNotFoundException, IOException {
// TODO important for splits
return super.listLocatedStatus(f);
}
@Override
public void setWorkingDirectory(Path newDir) {
for (final NflyNode nflyNode : nodes) {
nflyNode.fs.setWorkingDirectory(newDir);
}
}
@Override
public Path getWorkingDirectory() {
return nodes[0].fs.getWorkingDirectory(); // 0 is as good as any
}
@Override
public boolean mkdirs(Path f, FsPermission permission) throws IOException {
boolean succ = true;
for (final NflyNode nflyNode : nodes) {
succ &= nflyNode.fs.mkdirs(f, permission);
}
return succ;
}
@Override
public FileStatus getFileStatus(Path f) throws IOException {
// TODO proxy stream for reads
final List<IOException> ioExceptions =
new ArrayList<IOException>(nodes.length);
int numNotFounds = 0;
final MRNflyNode[] mrNodes = workSet();
long maxMtime = Long.MIN_VALUE;
int maxMtimeIdx = Integer.MIN_VALUE;
// naively iterate until one can be returned
//
for (int i = 0; i < mrNodes.length; i++) {
MRNflyNode nflyNode = mrNodes[i];
try {
nflyNode.updateFileStatus(f);
if (nflyFlags.contains(NflyKey.readMostRecent)) {
final long nflyTime = nflyNode.status.getModificationTime();
if (nflyTime > maxMtime) {
maxMtime = nflyTime;
maxMtimeIdx = i;
}
} else {
return nflyNode.nflyStatus();
}
} catch (FileNotFoundException fnfe) {
numNotFounds++;
processThrowable(nflyNode, "getFileStatus", fnfe, ioExceptions, f);
} catch (Throwable t) {
processThrowable(nflyNode, "getFileStatus", t, ioExceptions, f);
}
}
if (maxMtimeIdx >= 0) {
return mrNodes[maxMtimeIdx].nflyStatus();
}
mayThrowFileNotFound(ioExceptions, numNotFounds);
throw MultipleIOException.createIOException(ioExceptions);
}
private static void processThrowable(NflyNode nflyNode, String op,
Throwable t, List<IOException> ioExceptions,
Path... f) {
final String errMsg = Arrays.toString(f)
+ ": failed to " + op + " " + nflyNode.fs.getUri();
final IOException ioex;
if (t instanceof FileNotFoundException) {
ioex = new FileNotFoundException(errMsg);
ioex.initCause(t);
} else {
ioex = new IOException(errMsg, t);
}
if (ioExceptions != null) {
ioExceptions.add(ioex);
}
}
/**
* Initializes an nfly mountpoint in viewfs.
*
* @param uris destinations to replicate writes to
* @param conf file system configuration
* @param settings comma-separated list of k=v pairs.
* @return an Nfly filesystem
* @throws IOException
*/
static FileSystem createFileSystem(URI[] uris, Configuration conf,
String settings) throws IOException {
// assert settings != null
int minRepl = DEFAULT_MIN_REPLICATION;
EnumSet<NflyKey> nflyFlags = EnumSet.noneOf(NflyKey.class);
final String[] kvPairs = StringUtils.split(settings);
for (String kv : kvPairs) {
final String[] kvPair = StringUtils.split(kv, '=');
if (kvPair.length != 2) {
throw new IllegalArgumentException(kv);
}
NflyKey nflyKey = NflyKey.valueOf(kvPair[0]);
switch (nflyKey) {
case minReplication:
minRepl = Integer.parseInt(kvPair[1]);
break;
case repairOnRead:
case readMostRecent:
if (Boolean.valueOf(kvPair[1])) {
nflyFlags.add(nflyKey);
}
break;
default:
throw new IllegalArgumentException(nflyKey + ": Infeasible");
}
}
return new NflyFSystem(uris, conf, minRepl, nflyFlags);
}
}

View File

@ -54,7 +54,6 @@
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.QuotaUsage;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
@ -186,25 +185,21 @@ public void initialize(final URI theUri, final Configuration conf)
fsState = new InodeTree<FileSystem>(conf, authority) {
@Override
protected
FileSystem getTargetFileSystem(final URI uri)
protected FileSystem getTargetFileSystem(final URI uri)
throws URISyntaxException, IOException {
return new ChRootedFileSystem(uri, config);
}
@Override
protected
FileSystem getTargetFileSystem(final INodeDir<FileSystem> dir)
protected FileSystem getTargetFileSystem(final INodeDir<FileSystem> dir)
throws URISyntaxException {
return new InternalDirOfViewFs(dir, creationTime, ugi, myUri, config);
}
@Override
protected
FileSystem getTargetFileSystem(URI[] mergeFsURIList)
throws URISyntaxException, UnsupportedFileSystemException {
throw new UnsupportedFileSystemException("mergefs not implemented");
// return MergeFs.createMergeFs(mergeFsURIList, config);
protected FileSystem getTargetFileSystem(final String settings,
final URI[] uris) throws URISyntaxException, IOException {
return NflyFSystem.createFileSystem(uris, config, settings);
}
};
workingDir = this.getHomeDirectory();
@ -455,8 +450,13 @@ public LocatedFileStatus next() throws IOException {
private Path getChrootedPath(InodeTree.ResolveResult<FileSystem> res,
FileStatus status, Path f) throws IOException {
final String suffix = ((ChRootedFileSystem)res.targetFileSystem)
.stripOutRoot(status.getPath());
final String suffix;
if (res.targetFileSystem instanceof ChRootedFileSystem) {
suffix = ((ChRootedFileSystem)res.targetFileSystem)
.stripOutRoot(status.getPath());
} else { // nfly
suffix = ((NflyFSystem.NflyStatus)status).stripRoot();
}
return this.makeQualified(
suffix.length() == 0 ? f : new Path(res.resolvedPath, suffix));
}
@ -501,10 +501,15 @@ public boolean rename(final Path src, final Path dst) throws IOException {
verifyRenameStrategy(srcUri, dstUri,
resSrc.targetFileSystem == resDst.targetFileSystem, renameStrategy);
ChRootedFileSystem srcFS = (ChRootedFileSystem) resSrc.targetFileSystem;
ChRootedFileSystem dstFS = (ChRootedFileSystem) resDst.targetFileSystem;
return srcFS.getMyFs().rename(srcFS.fullPath(resSrc.remainingPath),
dstFS.fullPath(resDst.remainingPath));
if (resSrc.targetFileSystem instanceof ChRootedFileSystem &&
resDst.targetFileSystem instanceof ChRootedFileSystem) {
ChRootedFileSystem srcFS = (ChRootedFileSystem) resSrc.targetFileSystem;
ChRootedFileSystem dstFS = (ChRootedFileSystem) resDst.targetFileSystem;
return srcFS.getMyFs().rename(srcFS.fullPath(resSrc.remainingPath),
dstFS.fullPath(resDst.remainingPath));
} else {
return resSrc.targetFileSystem.rename(resSrc.remainingPath, resDst.remainingPath);
}
}
static void verifyRenameStrategy(URI srcUri, URI dstUri,

View File

@ -212,8 +212,7 @@ public ViewFs(final Configuration conf) throws IOException,
fsState = new InodeTree<AbstractFileSystem>(conf, authority) {
@Override
protected
AbstractFileSystem getTargetFileSystem(final URI uri)
protected AbstractFileSystem getTargetFileSystem(final URI uri)
throws URISyntaxException, UnsupportedFileSystemException {
String pathString = uri.getPath();
if (pathString.isEmpty()) {
@ -225,15 +224,14 @@ AbstractFileSystem getTargetFileSystem(final URI uri)
}
@Override
protected
AbstractFileSystem getTargetFileSystem(
protected AbstractFileSystem getTargetFileSystem(
final INodeDir<AbstractFileSystem> dir) throws URISyntaxException {
return new InternalDirOfViewFs(dir, creationTime, ugi, getUri());
}
@Override
protected
AbstractFileSystem getTargetFileSystem(URI[] mergeFsURIList)
protected AbstractFileSystem getTargetFileSystem(final String settings,
final URI[] mergeFsURIList)
throws URISyntaxException, UnsupportedFileSystemException {
throw new UnsupportedFileSystemException("mergefs not implemented yet");
// return MergeFs.createMergeFs(mergeFsURIList, config);

View File

@ -18,13 +18,25 @@
package org.apache.hadoop.fs.viewfs;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.net.URI;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystemTestHelper;
import org.apache.hadoop.fs.Path;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
/**
@ -37,6 +49,8 @@
*/
public class TestViewFileSystemLocalFileSystem extends ViewFileSystemBaseTest {
private static final Log LOG =
LogFactory.getLog(TestViewFileSystemLocalFileSystem.class);
@Override
@Before
@ -47,6 +61,65 @@ public void setUp() throws Exception {
}
@Test
public void testNflyWriteSimple() throws IOException {
LOG.info("Starting testNflyWriteSimple");
final URI[] testUris = new URI[] {
URI.create(targetTestRoot + "/nfwd1"),
URI.create(targetTestRoot + "/nfwd2")
};
final String testFileName = "test.txt";
final Configuration testConf = new Configuration(conf);
final String testString = "Hello Nfly!";
final Path nflyRoot = new Path("/nflyroot");
ConfigUtil.addLinkNfly(testConf, nflyRoot.toString(), testUris);
final FileSystem nfly = FileSystem.get(URI.create("viewfs:///"), testConf);
final FSDataOutputStream fsDos = nfly.create(
new Path(nflyRoot, "test.txt"));
try {
fsDos.writeUTF(testString);
} finally {
fsDos.close();
}
FileStatus[] statuses = nfly.listStatus(nflyRoot);
FileSystem lfs = FileSystem.getLocal(testConf);
for (final URI testUri : testUris) {
final Path testFile = new Path(new Path(testUri), testFileName);
assertTrue(testFile + " should exist!", lfs.exists(testFile));
final FSDataInputStream fsdis = lfs.open(testFile);
try {
assertEquals("Wrong file content", testString, fsdis.readUTF());
} finally {
fsdis.close();
}
}
}
@Test
public void testNflyInvalidMinReplication() throws Exception {
LOG.info("Starting testNflyInvalidMinReplication");
final URI[] testUris = new URI[] {
URI.create(targetTestRoot + "/nfwd1"),
URI.create(targetTestRoot + "/nfwd2")
};
final Configuration conf = new Configuration();
ConfigUtil.addLinkNfly(conf, "mt", "/nflyroot", "minReplication=4",
testUris);
try {
FileSystem.get(URI.create("viewfs://mt/"), conf);
fail("Expected bad minReplication exception.");
} catch (IOException ioe) {
assertTrue("No minReplication message",
ioe.getMessage().contains("Minimum replication"));
}
}
@Override
@After
public void tearDown() throws Exception {

View File

@ -24,7 +24,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.junit.Test;
public class TestViewFsConfig {
@ -43,23 +42,21 @@ class Foo {
new InodeTree<Foo>(conf, null) {
@Override
protected Foo getTargetFileSystem(final URI uri)
throws URISyntaxException, UnsupportedFileSystemException {
protected Foo getTargetFileSystem(final URI uri) {
return null;
}
@Override
protected Foo getTargetFileSystem(
org.apache.hadoop.fs.viewfs.InodeTree.INodeDir<Foo> dir)
throws URISyntaxException {
protected Foo getTargetFileSystem(final INodeDir<Foo> dir) {
return null;
}
@Override
protected Foo getTargetFileSystem(URI[] mergeFsURIList)
throws URISyntaxException, UnsupportedFileSystemException {
protected Foo getTargetFileSystem(final String settings,
final URI[] mergeFsURIList) {
return null;
}
};
}

View File

@ -17,11 +17,9 @@
*/
package org.apache.hadoop.fs.viewfs;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.EnumSet;
@ -31,6 +29,8 @@
import org.apache.hadoop.crypto.key.JavaKeyStoreProvider;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@ -45,17 +45,26 @@
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.client.CreateEncryptionZoneFlag;
import org.apache.hadoop.hdfs.client.HdfsAdmin;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TestViewFileSystemHdfs extends ViewFileSystemBaseTest {
private static final Logger LOG =
LoggerFactory.getLogger(TestViewFileSystemHdfs.class);
private static MiniDFSCluster cluster;
private static Path defaultWorkingDirectory;
@ -190,12 +199,12 @@ public void testTrashRootsAfterEncryptionZoneDeletion() throws Exception {
//Verify file deletion within EZ
DFSTestUtil.verifyDelete(shell, fsTarget, encFile, true);
Assert.assertTrue("ViewFileSystem trash roots should include EZ file trash",
assertTrue("ViewFileSystem trash roots should include EZ file trash",
(fsView.getTrashRoots(true).size() == 1));
//Verify deletion of EZ
DFSTestUtil.verifyDelete(shell, fsTarget, zone, true);
Assert.assertTrue("ViewFileSystem trash roots should include EZ zone trash",
assertTrue("ViewFileSystem trash roots should include EZ zone trash",
(fsView.getTrashRoots(true).size() == 2));
}
@ -239,14 +248,14 @@ public void testFileChecksum() throws IOException {
viewFs.getFileChecksum(mountDataFilePath);
FileChecksum fileChecksumViaTargetFs =
fsTarget.getFileChecksum(fsTargetFilePath);
Assert.assertTrue("File checksum not matching!",
assertTrue("File checksum not matching!",
fileChecksumViaViewFs.equals(fileChecksumViaTargetFs));
fileChecksumViaViewFs =
viewFs.getFileChecksum(mountDataFilePath, fileLength / 2);
fileChecksumViaTargetFs =
fsTarget.getFileChecksum(fsTargetFilePath, fileLength / 2);
Assert.assertTrue("File checksum not matching!",
assertTrue("File checksum not matching!",
fileChecksumViaViewFs.equals(fileChecksumViaTargetFs));
}
@ -269,4 +278,130 @@ public void testRenameAccorssFilesystem() throws IOException {
e);
}
}
@Test
public void testNflyClosestRepair() throws Exception {
testNflyRepair(NflyFSystem.NflyKey.repairOnRead);
}
@Test
public void testNflyMostRecentRepair() throws Exception {
testNflyRepair(NflyFSystem.NflyKey.readMostRecent);
}
private void testNflyRepair(NflyFSystem.NflyKey repairKey)
throws Exception {
LOG.info("Starting testNflyWriteSimpleFailover");
final URI uri1 = targetTestRoot.toUri();
final URI uri2 = targetTestRoot2.toUri();
final URI[] testUris = new URI[] {
new URI(uri1.getScheme(), uri1.getAuthority(), "/", null, null),
new URI(uri2.getScheme(), uri2.getAuthority(), "/", null, null)
};
final Configuration testConf = new Configuration(conf);
testConf.setInt(IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 1);
final String testString = "Hello Nfly!";
final Path nflyRoot = new Path("/nflyroot");
ConfigUtil.addLinkNfly(testConf,
Constants.CONFIG_VIEWFS_DEFAULT_MOUNT_TABLE,
nflyRoot.toString(),
"minReplication=2," + repairKey + "=true", testUris);
final FileSystem nfly = FileSystem.get(URI.create("viewfs:///"), testConf);
// wd = /nflyroot/user/<user>
nfly.setWorkingDirectory(new Path(nflyRoot
+ nfly.getWorkingDirectory().toUri().getPath()));
// 1. test mkdirs
final Path testDir = new Path("testdir1/sub1/sub3");
final Path testDir_tmp = new Path("testdir1/sub1/sub3_temp");
assertTrue(testDir + ": Failed to create!", nfly.mkdirs(testDir));
// Test renames
assertTrue(nfly.rename(testDir, testDir_tmp));
assertTrue(nfly.rename(testDir_tmp, testDir));
for (final URI testUri : testUris) {
final FileSystem fs = FileSystem.get(testUri, testConf);
assertTrue(testDir + " should exist!", fs.exists(testDir));
}
// 2. test write
final Path testFile = new Path("test.txt");
final FSDataOutputStream fsDos = nfly.create(testFile);
try {
fsDos.writeUTF(testString);
} finally {
fsDos.close();
}
for (final URI testUri : testUris) {
final FileSystem fs = FileSystem.get(testUri, testConf);
final FSDataInputStream fsdis = fs.open(testFile);
try {
assertEquals("Wrong file content", testString, fsdis.readUTF());
} finally {
fsdis.close();
}
}
// 3. test reads when one unavailable
//
// bring one NN down and read through nfly should still work
//
for (int i = 0; i < cluster.getNumNameNodes(); i++) {
cluster.shutdownNameNode(i);
FSDataInputStream fsDis = null;
try {
fsDis = nfly.open(testFile);
assertEquals("Wrong file content", testString, fsDis.readUTF());
} finally {
IOUtils.cleanupWithLogger(LOG, fsDis);
cluster.restartNameNode(i);
}
}
// both nodes are up again, test repair
final FileSystem fs1 = FileSystem.get(testUris[0], conf);
assertTrue(fs1.delete(testFile, false));
assertFalse(fs1.exists(testFile));
FSDataInputStream fsDis = null;
try {
fsDis = nfly.open(testFile);
assertEquals("Wrong file content", testString, fsDis.readUTF());
assertTrue(fs1.exists(testFile));
} finally {
IOUtils.cleanupWithLogger(LOG, fsDis);
}
// test most recent repair
if (repairKey == NflyFSystem.NflyKey.readMostRecent) {
final FileSystem fs2 = FileSystem.get(testUris[0], conf);
final long expectedMtime = fs2.getFileStatus(testFile)
.getModificationTime();
for (final URI testUri : testUris) {
final FileSystem fs = FileSystem.get(testUri, conf);
fs.setTimes(testFile, 1L, 1L);
assertEquals(testUri + "Set mtime failed!", 1L,
fs.getFileStatus(testFile).getModificationTime());
assertEquals("nfly file status wrong", expectedMtime,
nfly.getFileStatus(testFile).getModificationTime());
FSDataInputStream fsDis2 = null;
try {
fsDis2 = nfly.open(testFile);
assertEquals("Wrong file content", testString, fsDis2.readUTF());
// repair is done, now trying via normal fs
//
assertEquals("Repair most recent failed!", expectedMtime,
fs.getFileStatus(testFile).getModificationTime());
} finally {
IOUtils.cleanupWithLogger(LOG, fsDis2);
}
}
}
}
}