HDFS-11640. [READ] Datanodes should use a unique identifier when reading from external stores

This commit is contained in:
Virajith Jalaparti 2017-12-06 09:39:56 -08:00 committed by Chris Douglas
parent fb996a32a9
commit 4531588a94
12 changed files with 174 additions and 37 deletions

View File

@ -37,8 +37,13 @@ public class FileRegion implements BlockAlias {
public FileRegion(long blockId, Path path, long offset,
long length, long genStamp) {
this(blockId, path, offset, length, genStamp, new byte[0]);
}
public FileRegion(long blockId, Path path, long offset,
long length, long genStamp, byte[] nonce) {
this(new Block(blockId, length, genStamp),
new ProvidedStorageLocation(path, offset, length, new byte[0]));
new ProvidedStorageLocation(path, offset, length, nonce));
}
public FileRegion(long blockId, Path path, long offset, long length) {

View File

@ -26,6 +26,7 @@
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
@ -353,11 +354,16 @@ private FileRegion nextInternal(Iterator<FileRegion> i) throws IOException {
return null;
}
String[] f = line.split(delim);
if (f.length != 5) {
if (f.length != 5 && f.length != 6) {
throw new IOException("Invalid line: " + line);
}
byte[] nonce = new byte[0];
if (f.length == 6) {
nonce = f[5].getBytes(Charset.forName("UTF-8"));
}
return new FileRegion(Long.parseLong(f[0]), new Path(f[1]),
Long.parseLong(f[2]), Long.parseLong(f[3]), Long.parseLong(f[4]));
Long.parseLong(f[2]), Long.parseLong(f[3]), Long.parseLong(f[4]),
nonce);
}
public InputStream createStream() throws IOException {
@ -442,7 +448,11 @@ public void store(FileRegion token) throws IOException {
out.append(psl.getPath().toString()).append(delim);
out.append(Long.toString(psl.getOffset())).append(delim);
out.append(Long.toString(psl.getLength())).append(delim);
out.append(Long.toString(block.getGenerationStamp())).append(delim);
out.append(Long.toString(block.getGenerationStamp()));
if (psl.getNonce().length > 0) {
out.append(delim)
.append(new String(psl.getNonce(), Charset.forName("UTF-8")));
}
out.append("\n");
}

View File

@ -18,12 +18,15 @@
package org.apache.hadoop.hdfs.server.datanode;
import java.net.URI;
import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathHandle;
import org.apache.hadoop.fs.RawPathHandle;
import org.apache.hadoop.hdfs.server.common.FileRegion;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
@ -36,11 +39,11 @@
@InterfaceStability.Unstable
public class FinalizedProvidedReplica extends ProvidedReplica {
public FinalizedProvidedReplica(long blockId, URI fileURI,
long fileOffset, long blockLen, long genStamp,
FsVolumeSpi volume, Configuration conf, FileSystem remoteFS) {
super(blockId, fileURI, fileOffset, blockLen, genStamp, volume, conf,
remoteFS);
public FinalizedProvidedReplica(long blockId, URI fileURI, long fileOffset,
long blockLen, long genStamp, PathHandle pathHandle, FsVolumeSpi volume,
Configuration conf, FileSystem remoteFS) {
super(blockId, fileURI, fileOffset, blockLen, genStamp, pathHandle, volume,
conf, remoteFS);
}
public FinalizedProvidedReplica(FileRegion fileRegion, FsVolumeSpi volume,
@ -50,14 +53,17 @@ public FinalizedProvidedReplica(FileRegion fileRegion, FsVolumeSpi volume,
fileRegion.getProvidedStorageLocation().getOffset(),
fileRegion.getBlock().getNumBytes(),
fileRegion.getBlock().getGenerationStamp(),
new RawPathHandle(ByteBuffer
.wrap(fileRegion.getProvidedStorageLocation().getNonce())),
volume, conf, remoteFS);
}
public FinalizedProvidedReplica(long blockId, Path pathPrefix,
String pathSuffix, long fileOffset, long blockLen, long genStamp,
FsVolumeSpi volume, Configuration conf, FileSystem remoteFS) {
PathHandle pathHandle, FsVolumeSpi volume, Configuration conf,
FileSystem remoteFS) {
super(blockId, pathPrefix, pathSuffix, fileOffset, blockLen,
genStamp, volume, conf, remoteFS);
genStamp, pathHandle, volume, conf, remoteFS);
}
@Override

View File

@ -32,6 +32,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathHandle;
import org.apache.hadoop.hdfs.server.common.FileRegion;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.ScanInfo;
@ -41,6 +42,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
/**
* This abstract class is used as a base class for provided replicas.
*/
@ -60,6 +64,7 @@ public abstract class ProvidedReplica extends ReplicaInfo {
private String pathSuffix;
private long fileOffset;
private Configuration conf;
private PathHandle pathHandle;
private FileSystem remoteFS;
/**
@ -75,12 +80,13 @@ public abstract class ProvidedReplica extends ReplicaInfo {
* @param remoteFS reference to the remote filesystem to use for this replica.
*/
public ProvidedReplica(long blockId, URI fileURI, long fileOffset,
long blockLen, long genStamp, FsVolumeSpi volume, Configuration conf,
FileSystem remoteFS) {
long blockLen, long genStamp, PathHandle pathHandle, FsVolumeSpi volume,
Configuration conf, FileSystem remoteFS) {
super(volume, blockId, blockLen, genStamp);
this.fileURI = fileURI;
this.fileOffset = fileOffset;
this.conf = conf;
this.pathHandle = pathHandle;
if (remoteFS != null) {
this.remoteFS = remoteFS;
} else {
@ -114,14 +120,15 @@ public ProvidedReplica(long blockId, URI fileURI, long fileOffset,
* @param remoteFS reference to the remote filesystem to use for this replica.
*/
public ProvidedReplica(long blockId, Path pathPrefix, String pathSuffix,
long fileOffset, long blockLen, long genStamp, FsVolumeSpi volume,
Configuration conf, FileSystem remoteFS) {
long fileOffset, long blockLen, long genStamp, PathHandle pathHandle,
FsVolumeSpi volume, Configuration conf, FileSystem remoteFS) {
super(volume, blockId, blockLen, genStamp);
this.fileURI = null;
this.pathPrefix = pathPrefix;
this.pathSuffix = pathSuffix;
this.fileOffset = fileOffset;
this.conf = conf;
this.pathHandle = pathHandle;
if (remoteFS != null) {
this.remoteFS = remoteFS;
} else {
@ -142,6 +149,7 @@ public ProvidedReplica(ProvidedReplica r) {
this.fileOffset = r.fileOffset;
this.conf = r.conf;
this.remoteFS = r.remoteFS;
this.pathHandle = r.pathHandle;
this.pathPrefix = r.pathPrefix;
this.pathSuffix = r.pathSuffix;
}
@ -174,7 +182,18 @@ private URI getRemoteURI() {
@Override
public InputStream getDataInputStream(long seekOffset) throws IOException {
if (remoteFS != null) {
FSDataInputStream ins = remoteFS.open(new Path(getRemoteURI()));
FSDataInputStream ins;
try {
if (pathHandle != null) {
ins = remoteFS.open(pathHandle, conf.getInt(IO_FILE_BUFFER_SIZE_KEY,
IO_FILE_BUFFER_SIZE_DEFAULT));
} else {
ins = remoteFS.open(new Path(getRemoteURI()));
}
} catch (UnsupportedOperationException e) {
throw new IOException("PathHandle specified, but unsuported", e);
}
ins.seek(fileOffset + seekOffset);
return new BoundedInputStream(
new FSDataInputStream(ins), getBlockDataLength());
@ -324,4 +343,9 @@ public void copyBlockdata(URI destination) throws IOException {
throw new UnsupportedOperationException(
"ProvidedReplica does not yet support copy data");
}
@VisibleForTesting
public void setPathHandle(PathHandle pathHandle) {
this.pathHandle = pathHandle;
}
}

View File

@ -22,6 +22,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathHandle;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
@ -53,6 +54,7 @@ public class ReplicaBuilder {
private Configuration conf;
private FileRegion fileRegion;
private FileSystem remoteFS;
private PathHandle pathHandle;
private String pathSuffix;
private Path pathPrefix;
@ -66,6 +68,7 @@ public ReplicaBuilder(ReplicaState state) {
fromReplica = null;
uri = null;
this.state = state;
pathHandle = null;
}
public ReplicaBuilder setState(ReplicaState state) {
@ -170,6 +173,11 @@ public ReplicaBuilder setPathPrefix(Path prefix) {
return this;
}
public ReplicaBuilder setPathHandle(PathHandle pathHandle) {
this.pathHandle = pathHandle;
return this;
}
public LocalReplicaInPipeline buildLocalReplicaInPipeline()
throws IllegalArgumentException {
LocalReplicaInPipeline info = null;
@ -309,10 +317,10 @@ private ProvidedReplica buildProvidedFinalizedReplica()
if (fileRegion == null) {
if (uri != null) {
info = new FinalizedProvidedReplica(blockId, uri, offset,
length, genStamp, volume, conf, remoteFS);
length, genStamp, pathHandle, volume, conf, remoteFS);
} else {
info = new FinalizedProvidedReplica(blockId, pathPrefix, pathSuffix,
offset, length, genStamp, volume, conf, remoteFS);
offset, length, genStamp, pathHandle, volume, conf, remoteFS);
}
} else {
info = new FinalizedProvidedReplica(fileRegion, volume, conf, remoteFS);

View File

@ -20,6 +20,7 @@
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
@ -32,6 +33,8 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathHandle;
import org.apache.hadoop.fs.RawPathHandle;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block;
@ -180,6 +183,11 @@ void fetchVolumeMap(ReplicaMap volumeMap,
region.getProvidedStorageLocation().getPath().toUri())) {
String blockSuffix = getSuffix(blockPrefixPath,
new Path(region.getProvidedStorageLocation().getPath().toUri()));
PathHandle pathHandle = null;
if (region.getProvidedStorageLocation().getNonce().length > 0) {
pathHandle = new RawPathHandle(ByteBuffer
.wrap(region.getProvidedStorageLocation().getNonce()));
}
ReplicaInfo newReplica = new ReplicaBuilder(ReplicaState.FINALIZED)
.setBlockId(region.getBlock().getBlockId())
.setPathPrefix(blockPrefixPath)
@ -187,6 +195,7 @@ void fetchVolumeMap(ReplicaMap volumeMap,
.setOffset(region.getProvidedStorageLocation().getOffset())
.setLength(region.getBlock().getNumBytes())
.setGenerationStamp(region.getBlock().getGenerationStamp())
.setPathHandle(pathHandle)
.setFsVolume(providedVolume)
.setConf(conf)
.setRemoteFS(remoteFS)

View File

@ -87,7 +87,7 @@ private static void createProvidedReplicas(Configuration conf) {
FILE_LEN >= (i+1)*BLK_LEN ? BLK_LEN : FILE_LEN - i*BLK_LEN;
replicas.add(
new FinalizedProvidedReplica(i, providedFile.toURI(), i*BLK_LEN,
currentReplicaLength, 0, null, conf, null));
currentReplicaLength, 0, null, null, conf, null));
}
}

View File

@ -18,8 +18,10 @@
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -44,14 +46,20 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystemTestHelper;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathHandle;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@ -63,6 +71,7 @@
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner;
import org.apache.hadoop.hdfs.server.datanode.FinalizedProvidedReplica;
import org.apache.hadoop.hdfs.server.datanode.ProvidedReplica;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry;
@ -71,6 +80,7 @@
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.BlockIterator;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.StringUtils;
import org.junit.Before;
@ -619,4 +629,51 @@ public void testScannerWithProvidedVolumes() throws Exception {
assertEquals(0, report.get(BLOCK_POOL_IDS[CHOSEN_BP_ID]).length);
}
/**
* Tests that a ProvidedReplica supports path handles.
*
* @throws Exception
*/
@Test
public void testProvidedReplicaWithPathHandle() throws Exception {
Configuration conf = new Configuration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
cluster.waitActive();
DistributedFileSystem fs = cluster.getFileSystem();
// generate random data
int chunkSize = 512;
Random r = new Random(12345L);
byte[] data = new byte[chunkSize];
r.nextBytes(data);
Path file = new Path("/testfile");
try (FSDataOutputStream fout = fs.create(file)) {
fout.write(data);
}
PathHandle pathHandle = fs.getPathHandle(fs.getFileStatus(file),
Options.HandleOpt.changed(true), Options.HandleOpt.moved(true));
FinalizedProvidedReplica replica = new FinalizedProvidedReplica(0,
file.toUri(), 0, chunkSize, 0, pathHandle, null, conf, fs);
byte[] content = new byte[chunkSize];
IOUtils.readFully(replica.getDataInputStream(0), content, 0, chunkSize);
assertArrayEquals(data, content);
fs.rename(file, new Path("/testfile.1"));
// read should continue succeeding after the rename operation
IOUtils.readFully(replica.getDataInputStream(0), content, 0, chunkSize);
assertArrayEquals(data, content);
replica.setPathHandle(null);
try {
// expected to fail as URI of the provided replica is no longer valid.
replica.getDataInputStream(0);
fail("Expected an exception");
} catch (IOException e) {
LOG.info("Expected exception " + e);
}
}
}

View File

@ -55,7 +55,7 @@ protected Iterable<TreePath> getChildren(TreePath path, long id,
try {
ArrayList<TreePath> ret = new ArrayList<>();
for (FileStatus s : fs.listStatus(path.getFileStatus().getPath())) {
ret.add(new TreePath(s, id, i));
ret.add(new TreePath(s, id, i, fs));
}
return ret;
} catch (FileNotFoundException e) {
@ -72,13 +72,13 @@ private FSTreeIterator() {
FSTreeIterator(TreePath p) {
getPendingQueue().addFirst(
new TreePath(p.getFileStatus(), p.getParentId(), this));
new TreePath(p.getFileStatus(), p.getParentId(), this, fs));
}
FSTreeIterator(Path p) throws IOException {
try {
FileStatus s = fs.getFileStatus(root);
getPendingQueue().addFirst(new TreePath(s, -1L, this));
getPendingQueue().addFirst(new TreePath(s, -1L, this, fs));
} catch (FileNotFoundException e) {
if (p.equals(root)) {
throw e;

View File

@ -208,7 +208,7 @@ public void accept(TreePath e) throws IOException {
long id = curInode.getAndIncrement();
e.accept(id);
assert e.getId() < curInode.get();
INode n = e.toINode(ugis, blockIds, blocks, blockPoolID);
INode n = e.toINode(ugis, blockIds, blocks);
writeInode(n);
if (e.getParentId() > 0) {

View File

@ -24,6 +24,9 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.PathHandle;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
import org.apache.hadoop.hdfs.server.common.FileRegion;
@ -31,6 +34,8 @@
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection.INode;
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection.INodeDirectory;
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection.INodeFile;
import static org.apache.hadoop.hdfs.DFSUtil.LOG;
import static org.apache.hadoop.hdfs.DFSUtil.string2Bytes;
import static org.apache.hadoop.hdfs.server.namenode.DirectoryWithQuotaFeature.DEFAULT_NAMESPACE_QUOTA;
import static org.apache.hadoop.hdfs.server.namenode.DirectoryWithQuotaFeature.DEFAULT_STORAGE_SPACE_QUOTA;
@ -46,11 +51,14 @@ public class TreePath {
private final long parentId;
private final FileStatus stat;
private final TreeWalk.TreeIterator i;
private final FileSystem fs;
protected TreePath(FileStatus stat, long parentId, TreeWalk.TreeIterator i) {
protected TreePath(FileStatus stat, long parentId, TreeWalk.TreeIterator i,
FileSystem fs) {
this.i = i;
this.stat = stat;
this.parentId = parentId;
this.fs = fs;
}
public FileStatus getFileStatus() {
@ -74,10 +82,9 @@ void accept(long id) {
}
public INode toINode(UGIResolver ugi, BlockResolver blk,
BlockAliasMap.Writer<FileRegion> out, String blockPoolID)
throws IOException {
BlockAliasMap.Writer<FileRegion> out) throws IOException {
if (stat.isFile()) {
return toFile(ugi, blk, out, blockPoolID);
return toFile(ugi, blk, out);
} else if (stat.isDirectory()) {
return toDirectory(ugi);
} else if (stat.isSymlink()) {
@ -103,16 +110,16 @@ public int hashCode() {
return (int)(pId ^ (pId >>> 32));
}
void writeBlock(long blockId, long offset, long length,
long genStamp, String blockPoolID,
BlockAliasMap.Writer<FileRegion> out) throws IOException {
void writeBlock(long blockId, long offset, long length, long genStamp,
PathHandle pathHandle, BlockAliasMap.Writer<FileRegion> out)
throws IOException {
FileStatus s = getFileStatus();
out.store(new FileRegion(blockId, s.getPath(), offset, length, genStamp));
out.store(new FileRegion(blockId, s.getPath(), offset, length, genStamp,
(pathHandle != null ? pathHandle.toByteArray() : new byte[0])));
}
INode toFile(UGIResolver ugi, BlockResolver blk,
BlockAliasMap.Writer<FileRegion> out, String blockPoolID)
throws IOException {
BlockAliasMap.Writer<FileRegion> out) throws IOException {
final FileStatus s = getFileStatus();
// TODO should this store resolver's user/group?
ugi.addUser(s.getOwner());
@ -124,12 +131,23 @@ INode toFile(UGIResolver ugi, BlockResolver blk,
.setPreferredBlockSize(blk.preferredBlockSize(s))
.setPermission(ugi.resolve(s))
.setStoragePolicyID(HdfsConstants.PROVIDED_STORAGE_POLICY_ID);
// pathhandle allows match as long as the file matches exactly.
PathHandle pathHandle = null;
if (fs != null) {
try {
pathHandle = fs.getPathHandle(s, Options.HandleOpt.exact());
} catch (UnsupportedOperationException e) {
LOG.warn(
"Exact path handle not supported by filesystem " + fs.toString());
}
}
//TODO: storage policy should be configurable per path; use BlockResolver
long off = 0L;
for (BlockProto block : blk.resolve(s)) {
b.addBlocks(block);
writeBlock(block.getBlockId(), off, block.getNumBytes(),
block.getGenStamp(), blockPoolID, out);
block.getGenStamp(), pathHandle, out);
off += block.getNumBytes();
}
INode.Builder ib = INode.newBuilder()

View File

@ -97,7 +97,7 @@ protected Iterable<TreePath> getChildren(TreePath p, long id,
int nChildren = r.nextInt(children);
ArrayList<TreePath> ret = new ArrayList<TreePath>();
for (int i = 0; i < nChildren; ++i) {
ret.add(new TreePath(genFileStatus(p, r), p.getId(), walk));
ret.add(new TreePath(genFileStatus(p, r), p.getId(), walk, null));
}
return ret;
}
@ -165,12 +165,12 @@ class RandomTreeIterator extends TreeIterator {
RandomTreeIterator(long seed) {
Random r = new Random(seed);
FileStatus iroot = genFileStatus(null, r);
getPendingQueue().addFirst(new TreePath(iroot, -1, this));
getPendingQueue().addFirst(new TreePath(iroot, -1, this, null));
}
RandomTreeIterator(TreePath p) {
getPendingQueue().addFirst(
new TreePath(p.getFileStatus(), p.getParentId(), this));
new TreePath(p.getFileStatus(), p.getParentId(), this, null));
}
@Override