HDFS-12713. [READ] Refactor FileRegion and BlockAliasMap to separate out HDFS metadata and PROVIDED storage metadata. Contributed by Ewan Higgs

This commit is contained in:
Virajith Jalaparti 2017-12-05 13:46:30 -08:00 committed by Chris Douglas
parent a027055dd2
commit 9c35be86e1
29 changed files with 349 additions and 168 deletions

View File

@ -342,17 +342,19 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_PROVIDER_STORAGEUUID = "dfs.provided.storage.id"; public static final String DFS_PROVIDER_STORAGEUUID = "dfs.provided.storage.id";
public static final String DFS_PROVIDER_STORAGEUUID_DEFAULT = "DS-PROVIDED"; public static final String DFS_PROVIDER_STORAGEUUID_DEFAULT = "DS-PROVIDED";
public static final String DFS_PROVIDED_ALIASMAP_CLASS = "dfs.provided.aliasmap.class"; public static final String DFS_PROVIDED_ALIASMAP_CLASS = "dfs.provided.aliasmap.class";
public static final String DFS_PROVIDED_ALIASMAP_LOAD_RETRIES = "dfs.provided.aliasmap.load.retries";
public static final String DFS_PROVIDED_ALIASMAP_TEXT_DELIMITER = "dfs.provided.aliasmap.text.delimiter"; public static final String DFS_PROVIDED_ALIASMAP_TEXT_DELIMITER = "dfs.provided.aliasmap.text.delimiter";
public static final String DFS_PROVIDED_ALIASMAP_TEXT_DELIMITER_DEFAULT = ","; public static final String DFS_PROVIDED_ALIASMAP_TEXT_DELIMITER_DEFAULT = ",";
public static final String DFS_PROVIDED_ALIASMAP_TEXT_READ_PATH = "dfs.provided.aliasmap.text.read.path"; public static final String DFS_PROVIDED_ALIASMAP_TEXT_READ_FILE = "dfs.provided.aliasmap.text.read.file";
public static final String DFS_PROVIDED_ALIASMAP_TEXT_PATH_DEFAULT = "file:///tmp/blocks.csv"; public static final String DFS_PROVIDED_ALIASMAP_TEXT_READ_FILE_DEFAULT = "file:///tmp/blocks.csv";
public static final String DFS_PROVIDED_ALIASMAP_TEXT_CODEC = "dfs.provided.aliasmap.text.codec"; public static final String DFS_PROVIDED_ALIASMAP_TEXT_CODEC = "dfs.provided.aliasmap.text.codec";
public static final String DFS_PROVIDED_ALIASMAP_TEXT_WRITE_PATH = "dfs.provided.aliasmap.text.write.path"; public static final String DFS_PROVIDED_ALIASMAP_TEXT_WRITE_DIR = "dfs.provided.aliasmap.text.write.dir";
public static final String DFS_PROVIDED_ALIASMAP_TEXT_WRITE_DIR_DEFAULT = "file:///tmp/";
public static final String DFS_PROVIDED_ALIASMAP_LEVELDB_PATH = "dfs.provided.aliasmap.leveldb.read.path"; public static final String DFS_PROVIDED_ALIASMAP_LEVELDB_PATH = "dfs.provided.aliasmap.leveldb.path";
public static final String DFS_LIST_LIMIT = "dfs.ls.limit"; public static final String DFS_LIST_LIMIT = "dfs.ls.limit";
public static final int DFS_LIST_LIMIT_DEFAULT = 1000; public static final int DFS_LIST_LIMIT_DEFAULT = 1000;

View File

@ -121,4 +121,14 @@ public ListResponseProto list(RpcController controller,
throw new ServiceException(e); throw new ServiceException(e);
} }
} }
public BlockPoolResponseProto getBlockPoolId(RpcController controller,
BlockPoolRequestProto req) throws ServiceException {
try {
String bpid = aliasMap.getBlockPoolId();
return BlockPoolResponseProto.newBuilder().setBlockPoolId(bpid).build();
} catch (IOException e) {
throw new ServiceException(e);
}
}
} }

View File

@ -73,7 +73,8 @@ public InMemoryAliasMapProtocolClientSideTranslatorPB(Configuration conf) {
RPC.getProtocolVersion(AliasMapProtocolPB.class), aliasMapAddr, null, RPC.getProtocolVersion(AliasMapProtocolPB.class), aliasMapAddr, null,
conf, NetUtils.getDefaultSocketFactory(conf), 0); conf, NetUtils.getDefaultSocketFactory(conf), 0);
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); throw new RuntimeException(
"Error in connecting to " + addr + " Got: " + e);
} }
} }
@ -93,8 +94,7 @@ public InMemoryAliasMap.IterationResult list(Optional<Block> marker)
.stream() .stream()
.map(kv -> new FileRegion( .map(kv -> new FileRegion(
PBHelperClient.convert(kv.getKey()), PBHelperClient.convert(kv.getKey()),
PBHelperClient.convert(kv.getValue()), PBHelperClient.convert(kv.getValue())
null
)) ))
.collect(Collectors.toList()); .collect(Collectors.toList());
BlockProto nextMarker = response.getNextMarker(); BlockProto nextMarker = response.getNextMarker();
@ -157,6 +157,17 @@ public void write(@Nonnull Block block,
} }
} }
@Override
public String getBlockPoolId() throws IOException {
try {
BlockPoolResponseProto response = rpcProxy.getBlockPoolId(null,
BlockPoolRequestProto.newBuilder().build());
return response.getBlockPoolId();
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
public void stop() { public void stop() {
RPC.stopProxy(rpcProxy); RPC.stopProxy(rpcProxy);
} }

View File

@ -1122,6 +1122,6 @@ public static KeyValueProto convert(FileRegion fileRegion) {
ProvidedStorageLocation providedStorageLocation = ProvidedStorageLocation providedStorageLocation =
PBHelperClient.convert(providedStorageLocationProto); PBHelperClient.convert(providedStorageLocationProto);
return new FileRegion(block, providedStorageLocation, null); return new FileRegion(block, providedStorageLocation);
} }
} }

View File

@ -121,7 +121,7 @@ public IterationResult list(Optional<Block> marker) throws IOException {
Block block = fromBlockBytes(entry.getKey()); Block block = fromBlockBytes(entry.getKey());
ProvidedStorageLocation providedStorageLocation = ProvidedStorageLocation providedStorageLocation =
fromProvidedStorageLocationBytes(entry.getValue()); fromProvidedStorageLocationBytes(entry.getValue());
batch.add(new FileRegion(block, providedStorageLocation, null)); batch.add(new FileRegion(block, providedStorageLocation));
++i; ++i;
} }
if (iterator.hasNext()) { if (iterator.hasNext()) {
@ -157,6 +157,11 @@ public void write(@Nonnull Block block,
levelDb.put(extendedBlockDbFormat, providedStorageLocationDbFormat); levelDb.put(extendedBlockDbFormat, providedStorageLocationDbFormat);
} }
@Override
public String getBlockPoolId() {
return null;
}
public void close() throws IOException { public void close() throws IOException {
levelDb.close(); levelDb.close();
} }

View File

@ -93,4 +93,11 @@ Optional<ProvidedStorageLocation> read(@Nonnull Block block)
void write(@Nonnull Block block, void write(@Nonnull Block block,
@Nonnull ProvidedStorageLocation providedStorageLocation) @Nonnull ProvidedStorageLocation providedStorageLocation)
throws IOException; throws IOException;
/**
* Get the associated block pool id.
* @return the block pool id associated with the Namenode running
* the in-memory alias map.
*/
String getBlockPoolId() throws IOException;
} }

View File

@ -55,11 +55,13 @@ public class InMemoryLevelDBAliasMapServer implements InMemoryAliasMapProtocol,
private RPC.Server aliasMapServer; private RPC.Server aliasMapServer;
private Configuration conf; private Configuration conf;
private InMemoryAliasMap aliasMap; private InMemoryAliasMap aliasMap;
private String blockPoolId;
public InMemoryLevelDBAliasMapServer( public InMemoryLevelDBAliasMapServer(
CheckedFunction<Configuration, InMemoryAliasMap> initFun) { CheckedFunction<Configuration, InMemoryAliasMap> initFun,
String blockPoolId) {
this.initFun = initFun; this.initFun = initFun;
this.blockPoolId = blockPoolId;
} }
public void start() throws IOException { public void start() throws IOException {
@ -92,7 +94,7 @@ public void start() throws IOException {
.setVerbose(true) .setVerbose(true)
.build(); .build();
LOG.info("Starting InMemoryLevelDBAliasMapServer on ", rpcAddress); LOG.info("Starting InMemoryLevelDBAliasMapServer on {}", rpcAddress);
aliasMapServer.start(); aliasMapServer.start();
} }
@ -116,6 +118,11 @@ public void write(@Nonnull Block block,
aliasMap.write(block, providedStorageLocation); aliasMap.write(block, providedStorageLocation);
} }
@Override
public String getBlockPoolId() {
return blockPoolId;
}
@Override @Override
public void setConf(Configuration conf) { public void setConf(Configuration conf) {
this.conf = conf; this.conf = conf;

View File

@ -152,8 +152,12 @@ private void processProvidedStorageReport()
LOG.info("Calling process first blk report from storage: " LOG.info("Calling process first blk report from storage: "
+ providedStorageInfo); + providedStorageInfo);
// first pass; periodic refresh should call bm.processReport // first pass; periodic refresh should call bm.processReport
bm.processFirstBlockReport(providedStorageInfo, BlockAliasMap.Reader<BlockAlias> reader =
new ProvidedBlockList(aliasMap.getReader(null).iterator())); aliasMap.getReader(null, bm.getBlockPoolId());
if (reader != null) {
bm.processFirstBlockReport(providedStorageInfo,
new ProvidedBlockList(reader.iterator()));
}
} }
} }

View File

@ -34,39 +34,21 @@
public class FileRegion implements BlockAlias { public class FileRegion implements BlockAlias {
private final Pair<Block, ProvidedStorageLocation> pair; private final Pair<Block, ProvidedStorageLocation> pair;
private final String bpid;
public FileRegion(long blockId, Path path, long offset,
long length, String bpid, long genStamp) {
this(new Block(blockId, length, genStamp),
new ProvidedStorageLocation(path, offset, length, new byte[0]), bpid);
}
public FileRegion(long blockId, Path path, long offset,
long length, String bpid) {
this(blockId, path, offset, length, bpid,
HdfsConstants.GRANDFATHER_GENERATION_STAMP);
}
public FileRegion(long blockId, Path path, long offset, public FileRegion(long blockId, Path path, long offset,
long length, long genStamp) { long length, long genStamp) {
this(blockId, path, offset, length, null, genStamp); this(new Block(blockId, length, genStamp),
new ProvidedStorageLocation(path, offset, length, new byte[0]));
}
public FileRegion(long blockId, Path path, long offset, long length) {
this(blockId, path, offset, length,
HdfsConstants.GRANDFATHER_GENERATION_STAMP);
} }
public FileRegion(Block block, public FileRegion(Block block,
ProvidedStorageLocation providedStorageLocation) { ProvidedStorageLocation providedStorageLocation) {
this.pair = Pair.of(block, providedStorageLocation); this.pair = Pair.of(block, providedStorageLocation);
this.bpid = null;
}
public FileRegion(Block block,
ProvidedStorageLocation providedStorageLocation, String bpid) {
this.pair = Pair.of(block, providedStorageLocation);
this.bpid = bpid;
}
public FileRegion(long blockId, Path path, long offset, long length) {
this(blockId, path, offset, length, null);
} }
public Block getBlock() { public Block getBlock() {
@ -77,10 +59,6 @@ public ProvidedStorageLocation getProvidedStorageLocation() {
return pair.getValue(); return pair.getValue();
} }
public String getBlockPoolId() {
return this.bpid;
}
@Override @Override
public boolean equals(Object o) { public boolean equals(Object o) {
if (this == o) { if (this == o) {

View File

@ -61,20 +61,22 @@ public interface Options { }
/** /**
* @param ident block to resolve * @param ident block to resolve
* @return BlockAlias correspoding to the provided block. * @return BlockAlias corresponding to the provided block.
* @throws IOException * @throws IOException
*/ */
public abstract Optional<U> resolve(Block ident) throws IOException; public abstract Optional<U> resolve(Block ident) throws IOException;
} }
/** /**
* Returns a reader to the alias map. * Returns a reader to the alias map.
* @param opts reader options * @param opts reader options
* @return {@link Reader} to the alias map. * @param blockPoolID block pool id to use
* @return {@link Reader} to the alias map. If a Reader for the blockPoolID
* cannot be created, this will return null.
* @throws IOException * @throws IOException
*/ */
public abstract Reader<T> getReader(Reader.Options opts) throws IOException; public abstract Reader<T> getReader(Reader.Options opts, String blockPoolID)
throws IOException;
/** /**
* An abstract class used as a writer for the provided block map. * An abstract class used as a writer for the provided block map.
@ -93,10 +95,12 @@ public interface Options { }
/** /**
* Returns the writer for the alias map. * Returns the writer for the alias map.
* @param opts writer options. * @param opts writer options.
* @param blockPoolID block pool id to use
* @return {@link Writer} to the alias map. * @return {@link Writer} to the alias map.
* @throws IOException * @throws IOException
*/ */
public abstract Writer<T> getWriter(Writer.Options opts) throws IOException; public abstract Writer<T> getWriter(Writer.Options opts, String blockPoolID)
throws IOException;
/** /**
* Refresh the alias map. * Refresh the alias map.

View File

@ -46,6 +46,7 @@ public class InMemoryLevelDBAliasMapClient extends BlockAliasMap<FileRegion>
private Configuration conf; private Configuration conf;
private InMemoryAliasMapProtocolClientSideTranslatorPB aliasMap; private InMemoryAliasMapProtocolClientSideTranslatorPB aliasMap;
private String blockPoolID;
@Override @Override
public void close() { public void close() {
@ -57,7 +58,7 @@ class LevelDbReader extends BlockAliasMap.Reader<FileRegion> {
@Override @Override
public Optional<FileRegion> resolve(Block block) throws IOException { public Optional<FileRegion> resolve(Block block) throws IOException {
Optional<ProvidedStorageLocation> read = aliasMap.read(block); Optional<ProvidedStorageLocation> read = aliasMap.read(block);
return read.map(psl -> new FileRegion(block, psl, null)); return read.map(psl -> new FileRegion(block, psl));
} }
@Override @Override
@ -133,12 +134,29 @@ public void close() throws IOException {
@Override @Override
public Reader<FileRegion> getReader(Reader.Options opts) throws IOException { public Reader<FileRegion> getReader(Reader.Options opts, String blockPoolID)
throws IOException {
if (this.blockPoolID == null) {
this.blockPoolID = aliasMap.getBlockPoolId();
}
// if a block pool id has been supplied, and doesn't match the associated
// block pool id, return null.
if (blockPoolID != null && this.blockPoolID != null
&& !this.blockPoolID.equals(blockPoolID)) {
return null;
}
return new LevelDbReader(); return new LevelDbReader();
} }
@Override @Override
public Writer<FileRegion> getWriter(Writer.Options opts) throws IOException { public Writer<FileRegion> getWriter(Writer.Options opts, String blockPoolID)
throws IOException {
if (this.blockPoolID == null) {
this.blockPoolID = aliasMap.getBlockPoolId();
}
if (blockPoolID != null && !this.blockPoolID.equals(blockPoolID)) {
return null;
}
return new LevelDbWriter(); return new LevelDbWriter();
} }

View File

@ -70,7 +70,8 @@ public Configuration getConf() {
} }
@Override @Override
public Reader<FileRegion> getReader(Reader.Options opts) throws IOException { public Reader<FileRegion> getReader(Reader.Options opts, String blockPoolID)
throws IOException {
if (null == opts) { if (null == opts) {
opts = this.opts; opts = this.opts;
} }
@ -79,11 +80,12 @@ public Reader<FileRegion> getReader(Reader.Options opts) throws IOException {
} }
LevelDBOptions o = (LevelDBOptions) opts; LevelDBOptions o = (LevelDBOptions) opts;
return new LevelDBFileRegionAliasMap.LevelDBReader( return new LevelDBFileRegionAliasMap.LevelDBReader(
createDB(o.levelDBPath, false)); createDB(o.levelDBPath, false, blockPoolID));
} }
@Override @Override
public Writer<FileRegion> getWriter(Writer.Options opts) throws IOException { public Writer<FileRegion> getWriter(Writer.Options opts, String blockPoolID)
throws IOException {
if (null == opts) { if (null == opts) {
opts = this.opts; opts = this.opts;
} }
@ -92,11 +94,11 @@ public Writer<FileRegion> getWriter(Writer.Options opts) throws IOException {
} }
LevelDBOptions o = (LevelDBOptions) opts; LevelDBOptions o = (LevelDBOptions) opts;
return new LevelDBFileRegionAliasMap.LevelDBWriter( return new LevelDBFileRegionAliasMap.LevelDBWriter(
createDB(o.levelDBPath, true)); createDB(o.levelDBPath, true, blockPoolID));
} }
private static DB createDB(String levelDBPath, boolean createIfMissing) private static DB createDB(String levelDBPath, boolean createIfMissing,
throws IOException { String blockPoolID) throws IOException {
if (levelDBPath == null || levelDBPath.length() == 0) { if (levelDBPath == null || levelDBPath.length() == 0) {
throw new IllegalArgumentException( throw new IllegalArgumentException(
"A valid path needs to be specified for " "A valid path needs to be specified for "
@ -105,7 +107,13 @@ private static DB createDB(String levelDBPath, boolean createIfMissing)
} }
org.iq80.leveldb.Options options = new org.iq80.leveldb.Options(); org.iq80.leveldb.Options options = new org.iq80.leveldb.Options();
options.createIfMissing(createIfMissing); options.createIfMissing(createIfMissing);
return factory.open(new File(levelDBPath), options); File dbFile;
if (blockPoolID != null) {
dbFile = new File(levelDBPath, blockPoolID);
} else {
dbFile = new File(levelDBPath);
}
return factory.open(dbFile, options);
} }
@Override @Override

View File

@ -82,7 +82,7 @@ public Configuration getConf() {
} }
@Override @Override
public Reader<FileRegion> getReader(Reader.Options opts) public Reader<FileRegion> getReader(Reader.Options opts, String blockPoolID)
throws IOException { throws IOException {
if (null == opts) { if (null == opts) {
opts = readerOpts; opts = readerOpts;
@ -94,23 +94,29 @@ public Reader<FileRegion> getReader(Reader.Options opts)
Configuration readerConf = (null == o.getConf()) Configuration readerConf = (null == o.getConf())
? new Configuration() ? new Configuration()
: o.getConf(); : o.getConf();
return createReader(o.file, o.delim, readerConf); return createReader(o.file, o.delim, readerConf, blockPoolID);
} }
@VisibleForTesting @VisibleForTesting
TextReader createReader(Path file, String delim, Configuration cfg) TextReader createReader(Path file, String delim, Configuration cfg,
throws IOException { String blockPoolID) throws IOException {
FileSystem fs = file.getFileSystem(cfg); FileSystem fs = file.getFileSystem(cfg);
if (fs instanceof LocalFileSystem) { if (fs instanceof LocalFileSystem) {
fs = ((LocalFileSystem)fs).getRaw(); fs = ((LocalFileSystem)fs).getRaw();
} }
CompressionCodecFactory factory = new CompressionCodecFactory(cfg); CompressionCodecFactory factory = new CompressionCodecFactory(cfg);
CompressionCodec codec = factory.getCodec(file); CompressionCodec codec = factory.getCodec(file);
return new TextReader(fs, file, codec, delim); String filename = fileNameFromBlockPoolID(blockPoolID);
if (codec != null) {
filename = filename + codec.getDefaultExtension();
}
Path bpidFilePath = new Path(file.getParent(), filename);
return new TextReader(fs, bpidFilePath, codec, delim);
} }
@Override @Override
public Writer<FileRegion> getWriter(Writer.Options opts) throws IOException { public Writer<FileRegion> getWriter(Writer.Options opts, String blockPoolID)
throws IOException {
if (null == opts) { if (null == opts) {
opts = writerOpts; opts = writerOpts;
} }
@ -121,14 +127,15 @@ public Writer<FileRegion> getWriter(Writer.Options opts) throws IOException {
Configuration cfg = (null == o.getConf()) Configuration cfg = (null == o.getConf())
? new Configuration() ? new Configuration()
: o.getConf(); : o.getConf();
String baseName = fileNameFromBlockPoolID(blockPoolID);
Path blocksFile = new Path(o.dir, baseName);
if (o.codec != null) { if (o.codec != null) {
CompressionCodecFactory factory = new CompressionCodecFactory(cfg); CompressionCodecFactory factory = new CompressionCodecFactory(cfg);
CompressionCodec codec = factory.getCodecByName(o.codec); CompressionCodec codec = factory.getCodecByName(o.codec);
String name = o.file.getName() + codec.getDefaultExtension(); blocksFile = new Path(o.dir, baseName + codec.getDefaultExtension());
o.filename(new Path(o.file.getParent(), name)); return createWriter(blocksFile, codec, o.delim, cfg);
return createWriter(o.file, codec, o.delim, cfg);
} }
return createWriter(o.file, null, o.delim, conf); return createWriter(blocksFile, null, o.delim, conf);
} }
@VisibleForTesting @VisibleForTesting
@ -154,15 +161,15 @@ public static class ReaderOptions
private String delim = private String delim =
DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_DELIMITER_DEFAULT; DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_DELIMITER_DEFAULT;
private Path file = new Path( private Path file = new Path(
new File(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_PATH_DEFAULT).toURI() new File(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_READ_FILE_DEFAULT)
.toString()); .toURI().toString());
@Override @Override
public void setConf(Configuration conf) { public void setConf(Configuration conf) {
this.conf = conf; this.conf = conf;
String tmpfile = String tmpfile =
conf.get(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_READ_PATH, conf.get(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_READ_FILE,
DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_PATH_DEFAULT); DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_READ_FILE_DEFAULT);
file = new Path(tmpfile); file = new Path(tmpfile);
delim = conf.get(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_DELIMITER, delim = conf.get(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_DELIMITER,
DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_DELIMITER_DEFAULT); DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_DELIMITER_DEFAULT);
@ -195,17 +202,17 @@ public static class WriterOptions
private Configuration conf; private Configuration conf;
private String codec = null; private String codec = null;
private Path file = private Path dir =
new Path(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_PATH_DEFAULT); new Path(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_WRITE_DIR_DEFAULT);
private String delim = private String delim =
DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_DELIMITER_DEFAULT; DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_DELIMITER_DEFAULT;
@Override @Override
public void setConf(Configuration conf) { public void setConf(Configuration conf) {
this.conf = conf; this.conf = conf;
String tmpfile = conf.get( String tmpDir = conf.get(
DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_WRITE_PATH, file.toString()); DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_WRITE_DIR, dir.toString());
file = new Path(tmpfile); dir = new Path(tmpDir);
codec = conf.get(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_CODEC); codec = conf.get(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_CODEC);
delim = conf.get(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_DELIMITER, delim = conf.get(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_DELIMITER,
DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_DELIMITER_DEFAULT); DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_DELIMITER_DEFAULT);
@ -217,8 +224,8 @@ public Configuration getConf() {
} }
@Override @Override
public WriterOptions filename(Path file) { public WriterOptions dirName(Path dir) {
this.file = file; this.dir = dir;
return this; return this;
} }
@ -226,8 +233,8 @@ public String getCodec() {
return codec; return codec;
} }
public Path getFile() { public Path getDir() {
return file; return dir;
} }
@Override @Override
@ -267,6 +274,7 @@ public static ReaderOptions defaults() {
private final FileSystem fs; private final FileSystem fs;
private final CompressionCodec codec; private final CompressionCodec codec;
private final Map<FRIterator, BufferedReader> iterators; private final Map<FRIterator, BufferedReader> iterators;
private final String blockPoolID;
protected TextReader(FileSystem fs, Path file, CompressionCodec codec, protected TextReader(FileSystem fs, Path file, CompressionCodec codec,
String delim) { String delim) {
@ -281,6 +289,7 @@ protected TextReader(FileSystem fs, Path file, CompressionCodec codec,
this.codec = codec; this.codec = codec;
this.delim = delim; this.delim = delim;
this.iterators = Collections.synchronizedMap(iterators); this.iterators = Collections.synchronizedMap(iterators);
this.blockPoolID = blockPoolIDFromFileName(file);
} }
@Override @Override
@ -344,12 +353,11 @@ private FileRegion nextInternal(Iterator<FileRegion> i) throws IOException {
return null; return null;
} }
String[] f = line.split(delim); String[] f = line.split(delim);
if (f.length != 6) { if (f.length != 5) {
throw new IOException("Invalid line: " + line); throw new IOException("Invalid line: " + line);
} }
return new FileRegion(Long.parseLong(f[0]), new Path(f[1]), return new FileRegion(Long.parseLong(f[0]), new Path(f[1]),
Long.parseLong(f[2]), Long.parseLong(f[3]), f[4], Long.parseLong(f[2]), Long.parseLong(f[3]), Long.parseLong(f[4]));
Long.parseLong(f[5]));
} }
public InputStream createStream() throws IOException { public InputStream createStream() throws IOException {
@ -409,7 +417,7 @@ public static class TextWriter extends Writer<FileRegion> {
*/ */
public interface Options extends Writer.Options { public interface Options extends Writer.Options {
Options codec(String codec); Options codec(String codec);
Options filename(Path file); Options dirName(Path dir);
Options delimiter(String delim); Options delimiter(String delim);
} }
@ -434,7 +442,6 @@ public void store(FileRegion token) throws IOException {
out.append(psl.getPath().toString()).append(delim); out.append(psl.getPath().toString()).append(delim);
out.append(Long.toString(psl.getOffset())).append(delim); out.append(Long.toString(psl.getOffset())).append(delim);
out.append(Long.toString(psl.getLength())).append(delim); out.append(Long.toString(psl.getLength())).append(delim);
out.append(token.getBlockPoolId()).append(delim);
out.append(Long.toString(block.getGenerationStamp())).append(delim); out.append(Long.toString(block.getGenerationStamp())).append(delim);
out.append("\n"); out.append("\n");
} }
@ -457,4 +464,17 @@ public void close() throws IOException {
//nothing to do; //nothing to do;
} }
@VisibleForTesting
public static String blockPoolIDFromFileName(Path file) {
if (file == null) {
return "";
}
String fileName = file.getName();
return fileName.substring("blocks_".length()).split("\\.")[0];
}
@VisibleForTesting
public static String fileNameFromBlockPoolID(String blockPoolID) {
return "blocks_" + blockPoolID + ".csv";
}
} }

View File

@ -63,6 +63,8 @@
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PROVIDED_ALIASMAP_LOAD_RETRIES;
/** /**
* This class is used to create provided volumes. * This class is used to create provided volumes.
*/ */
@ -125,6 +127,7 @@ static class ProvidedBlockPoolSlice {
private ReplicaMap bpVolumeMap; private ReplicaMap bpVolumeMap;
private ProvidedVolumeDF df; private ProvidedVolumeDF df;
private AtomicLong numOfBlocks = new AtomicLong(); private AtomicLong numOfBlocks = new AtomicLong();
private int numRetries;
ProvidedBlockPoolSlice(String bpid, ProvidedVolumeImpl volume, ProvidedBlockPoolSlice(String bpid, ProvidedVolumeImpl volume,
Configuration conf) { Configuration conf) {
@ -138,6 +141,7 @@ static class ProvidedBlockPoolSlice {
this.bpid = bpid; this.bpid = bpid;
this.df = new ProvidedVolumeDF(); this.df = new ProvidedVolumeDF();
bpVolumeMap.initBlockPool(bpid); bpVolumeMap.initBlockPool(bpid);
this.numRetries = conf.getInt(DFS_PROVIDED_ALIASMAP_LOAD_RETRIES, 0);
LOG.info("Created alias map using class: " + aliasMap.getClass()); LOG.info("Created alias map using class: " + aliasMap.getClass());
} }
@ -153,18 +157,27 @@ void setFileRegionProvider(BlockAliasMap<FileRegion> blockAliasMap) {
void fetchVolumeMap(ReplicaMap volumeMap, void fetchVolumeMap(ReplicaMap volumeMap,
RamDiskReplicaTracker ramDiskReplicaMap, FileSystem remoteFS) RamDiskReplicaTracker ramDiskReplicaMap, FileSystem remoteFS)
throws IOException { throws IOException {
BlockAliasMap.Reader<FileRegion> reader = aliasMap.getReader(null); BlockAliasMap.Reader<FileRegion> reader = null;
int tries = 1;
do {
try {
reader = aliasMap.getReader(null, bpid);
break;
} catch (IOException e) {
tries++;
reader = null;
}
} while (tries <= numRetries);
if (reader == null) { if (reader == null) {
LOG.warn("Got null reader from BlockAliasMap " + aliasMap LOG.error("Got null reader from BlockAliasMap " + aliasMap
+ "; no blocks will be populated"); + "; no blocks will be populated");
return; return;
} }
Path blockPrefixPath = new Path(providedVolume.getBaseURI()); Path blockPrefixPath = new Path(providedVolume.getBaseURI());
for (FileRegion region : reader) { for (FileRegion region : reader) {
if (region.getBlockPoolId() != null if (containsBlock(providedVolume.baseURI,
&& region.getBlockPoolId().equals(bpid) region.getProvidedStorageLocation().getPath().toUri())) {
&& containsBlock(providedVolume.baseURI,
region.getProvidedStorageLocation().getPath().toUri())) {
String blockSuffix = getSuffix(blockPrefixPath, String blockSuffix = getSuffix(blockPrefixPath,
new Path(region.getProvidedStorageLocation().getPath().toUri())); new Path(region.getProvidedStorageLocation().getPath().toUri()));
ReplicaInfo newReplica = new ReplicaBuilder(ReplicaState.FINALIZED) ReplicaInfo newReplica = new ReplicaBuilder(ReplicaState.FINALIZED)
@ -215,14 +228,12 @@ public void compileReport(LinkedList<ScanInfo> report,
* the ids remain the same. * the ids remain the same.
*/ */
aliasMap.refresh(); aliasMap.refresh();
BlockAliasMap.Reader<FileRegion> reader = aliasMap.getReader(null); BlockAliasMap.Reader<FileRegion> reader = aliasMap.getReader(null, bpid);
for (FileRegion region : reader) { for (FileRegion region : reader) {
reportCompiler.throttle(); reportCompiler.throttle();
if (region.getBlockPoolId().equals(bpid)) { report.add(new ScanInfo(region.getBlock().getBlockId(),
report.add(new ScanInfo(region.getBlock().getBlockId(), providedVolume, region,
providedVolume, region, region.getProvidedStorageLocation().getLength()));
region.getProvidedStorageLocation().getLength()));
}
} }
} }
@ -415,9 +426,7 @@ public ExtendedBlock nextBlock() throws IOException {
if (temp.getBlock().getBlockId() < state.lastBlockId) { if (temp.getBlock().getBlockId() < state.lastBlockId) {
continue; continue;
} }
if (temp.getBlockPoolId().equals(bpid)) { nextRegion = temp;
nextRegion = temp;
}
} }
if (null == nextRegion) { if (null == nextRegion) {
return null; return null;
@ -435,7 +444,7 @@ public boolean atEnd() {
public void rewind() { public void rewind() {
BlockAliasMap.Reader<FileRegion> reader = null; BlockAliasMap.Reader<FileRegion> reader = null;
try { try {
reader = blockAliasMap.getReader(null); reader = blockAliasMap.getReader(null, bpid);
} catch (IOException e) { } catch (IOException e) {
LOG.warn("Exception in getting reader from provided alias map"); LOG.warn("Exception in getting reader from provided alias map");
} }

View File

@ -729,6 +729,7 @@ protected void initialize(Configuration conf) throws IOException {
} }
loadNamesystem(conf); loadNamesystem(conf);
startAliasMapServerIfNecessary(conf);
rpcServer = createRpcServer(conf); rpcServer = createRpcServer(conf);
@ -749,7 +750,6 @@ protected void initialize(Configuration conf) throws IOException {
startCommonServices(conf); startCommonServices(conf);
startMetricsLogger(conf); startMetricsLogger(conf);
startAliasMapServerIfNecessary(conf);
} }
private void startAliasMapServerIfNecessary(Configuration conf) private void startAliasMapServerIfNecessary(Configuration conf)
@ -758,8 +758,8 @@ private void startAliasMapServerIfNecessary(Configuration conf)
DFSConfigKeys.DFS_NAMENODE_PROVIDED_ENABLED_DEFAULT) DFSConfigKeys.DFS_NAMENODE_PROVIDED_ENABLED_DEFAULT)
&& conf.getBoolean(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_ENABLED, && conf.getBoolean(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_ENABLED,
DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_ENABLED_DEFAULT)) { DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_ENABLED_DEFAULT)) {
levelDBAliasMapServer = levelDBAliasMapServer = new InMemoryLevelDBAliasMapServer(
new InMemoryLevelDBAliasMapServer(InMemoryAliasMap::init); InMemoryAliasMap::init, namesystem.getBlockPoolId());
levelDBAliasMapServer.setConf(conf); levelDBAliasMapServer.setConf(conf);
levelDBAliasMapServer.start(); levelDBAliasMapServer.start();
} }

View File

@ -164,6 +164,10 @@ public void setClusterID(String clusterID) {
this.clusterID = clusterID; this.clusterID = clusterID;
} }
public void setBlockPoolID(String blockPoolID) {
this.blockPoolID = blockPoolID;
}
@Override @Override
public String toString(){ public String toString(){
return super.toString() + ";bpid=" + blockPoolID; return super.toString() + ";bpid=" + blockPoolID;

View File

@ -53,8 +53,16 @@ message ListResponseProto {
optional BlockProto nextMarker = 2; optional BlockProto nextMarker = 2;
} }
message BlockPoolRequestProto {
}
message BlockPoolResponseProto {
required string blockPoolId = 1;
}
service AliasMapProtocolService { service AliasMapProtocolService {
rpc write(WriteRequestProto) returns(WriteResponseProto); rpc write(WriteRequestProto) returns(WriteResponseProto);
rpc read(ReadRequestProto) returns(ReadResponseProto); rpc read(ReadRequestProto) returns(ReadResponseProto);
rpc list(ListRequestProto) returns(ListResponseProto); rpc list(ListRequestProto) returns(ListResponseProto);
rpc getBlockPoolId(BlockPoolRequestProto) returns(BlockPoolResponseProto);
} }

View File

@ -4696,7 +4696,7 @@
</property> </property>
<property> <property>
<name>dfs.provided.aliasmap.text.read.path</name> <name>dfs.provided.aliasmap.text.read.file</name>
<value></value> <value></value>
<description> <description>
The path specifying the provided block map as a text file, specified as The path specifying the provided block map as a text file, specified as
@ -4713,7 +4713,7 @@
</property> </property>
<property> <property>
<name>dfs.provided.aliasmap.text.write.path</name> <name>dfs.provided.aliasmap.text.write.dir</name>
<value></value> <value></value>
<description> <description>
The path to which the provided block map should be written as a text The path to which the provided block map should be written as a text
@ -4721,6 +4721,25 @@
</description> </description>
</property> </property>
<property>
<name>dfs.provided.aliasmap.leveldb.path</name>
<value></value>
<description>
The read/write path for the leveldb-based alias map
(org.apache.hadoop.hdfs.server.common.blockaliasmap.impl.LevelDBFileRegionAliasMap).
The path has to be explicitly configured when this alias map is used.
</description>
</property>
<property>
<name>dfs.provided.aliasmap.load.retries</name>
<value>0</value>
<description>
The number of retries on the Datanode to load the provided aliasmap;
defaults to 0.
</description>
</property>
<property> <property>
<name>dfs.lock.suppress.warning.interval</name> <name>dfs.lock.suppress.warning.interval</name>
<value>10s</value> <value>10s</value>

View File

@ -44,6 +44,7 @@ public class TestProvidedStorageMap {
private BlockManager bm; private BlockManager bm;
private RwLock nameSystemLock; private RwLock nameSystemLock;
private String providedStorageID; private String providedStorageID;
private String blockPoolID;
@Before @Before
public void setup() { public void setup() {
@ -55,8 +56,9 @@ public void setup() {
conf.setClass(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_CLASS, conf.setClass(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_CLASS,
TestProvidedImpl.TestFileRegionBlockAliasMap.class, TestProvidedImpl.TestFileRegionBlockAliasMap.class,
BlockAliasMap.class); BlockAliasMap.class);
blockPoolID = "BP-12344-10.1.1.2-12344";
bm = mock(BlockManager.class); bm = mock(BlockManager.class);
when(bm.getBlockPoolId()).thenReturn(blockPoolID);
nameSystemLock = mock(RwLock.class); nameSystemLock = mock(RwLock.class);
} }

View File

@ -56,11 +56,10 @@ public class TestInMemoryLevelDBAliasMapClient {
private InMemoryLevelDBAliasMapClient inMemoryLevelDBAliasMapClient; private InMemoryLevelDBAliasMapClient inMemoryLevelDBAliasMapClient;
private File tempDir; private File tempDir;
private Configuration conf; private Configuration conf;
private final static String BPID = "BPID-0";
@Before @Before
public void setUp() throws IOException { public void setUp() throws IOException {
levelDBAliasMapServer =
new InMemoryLevelDBAliasMapServer(InMemoryAliasMap::init);
conf = new Configuration(); conf = new Configuration();
int port = 9876; int port = 9876;
@ -69,6 +68,8 @@ public void setUp() throws IOException {
tempDir = Files.createTempDir(); tempDir = Files.createTempDir();
conf.set(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_LEVELDB_DIR, conf.set(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_LEVELDB_DIR,
tempDir.getAbsolutePath()); tempDir.getAbsolutePath());
levelDBAliasMapServer =
new InMemoryLevelDBAliasMapServer(InMemoryAliasMap::init, BPID);
inMemoryLevelDBAliasMapClient = new InMemoryLevelDBAliasMapClient(); inMemoryLevelDBAliasMapClient = new InMemoryLevelDBAliasMapClient();
} }
@ -81,20 +82,20 @@ public void tearDown() throws IOException {
@Test @Test
public void writeRead() throws Exception { public void writeRead() throws Exception {
inMemoryLevelDBAliasMapClient.setConf(conf);
levelDBAliasMapServer.setConf(conf); levelDBAliasMapServer.setConf(conf);
levelDBAliasMapServer.start(); levelDBAliasMapServer.start();
inMemoryLevelDBAliasMapClient.setConf(conf);
Block block = new Block(42, 43, 44); Block block = new Block(42, 43, 44);
byte[] nonce = "blackbird".getBytes(); byte[] nonce = "blackbird".getBytes();
ProvidedStorageLocation providedStorageLocation ProvidedStorageLocation providedStorageLocation
= new ProvidedStorageLocation(new Path("cuckoo"), = new ProvidedStorageLocation(new Path("cuckoo"),
45, 46, nonce); 45, 46, nonce);
BlockAliasMap.Writer<FileRegion> writer = BlockAliasMap.Writer<FileRegion> writer =
inMemoryLevelDBAliasMapClient.getWriter(null); inMemoryLevelDBAliasMapClient.getWriter(null, BPID);
writer.store(new FileRegion(block, providedStorageLocation)); writer.store(new FileRegion(block, providedStorageLocation));
BlockAliasMap.Reader<FileRegion> reader = BlockAliasMap.Reader<FileRegion> reader =
inMemoryLevelDBAliasMapClient.getReader(null); inMemoryLevelDBAliasMapClient.getReader(null, BPID);
Optional<FileRegion> fileRegion = reader.resolve(block); Optional<FileRegion> fileRegion = reader.resolve(block);
assertEquals(new FileRegion(block, providedStorageLocation), assertEquals(new FileRegion(block, providedStorageLocation),
fileRegion.get()); fileRegion.get());
@ -102,9 +103,9 @@ public void writeRead() throws Exception {
@Test @Test
public void iterateSingleBatch() throws Exception { public void iterateSingleBatch() throws Exception {
inMemoryLevelDBAliasMapClient.setConf(conf);
levelDBAliasMapServer.setConf(conf); levelDBAliasMapServer.setConf(conf);
levelDBAliasMapServer.start(); levelDBAliasMapServer.start();
inMemoryLevelDBAliasMapClient.setConf(conf);
Block block1 = new Block(42, 43, 44); Block block1 = new Block(42, 43, 44);
Block block2 = new Block(43, 44, 45); Block block2 = new Block(43, 44, 45);
byte[] nonce1 = "blackbird".getBytes(); byte[] nonce1 = "blackbird".getBytes();
@ -116,14 +117,14 @@ public void iterateSingleBatch() throws Exception {
new ProvidedStorageLocation(new Path("falcon"), new ProvidedStorageLocation(new Path("falcon"),
46, 47, nonce2); 46, 47, nonce2);
BlockAliasMap.Writer<FileRegion> writer1 = BlockAliasMap.Writer<FileRegion> writer1 =
inMemoryLevelDBAliasMapClient.getWriter(null); inMemoryLevelDBAliasMapClient.getWriter(null, BPID);
writer1.store(new FileRegion(block1, providedStorageLocation1)); writer1.store(new FileRegion(block1, providedStorageLocation1));
BlockAliasMap.Writer<FileRegion> writer2 = BlockAliasMap.Writer<FileRegion> writer2 =
inMemoryLevelDBAliasMapClient.getWriter(null); inMemoryLevelDBAliasMapClient.getWriter(null, BPID);
writer2.store(new FileRegion(block2, providedStorageLocation2)); writer2.store(new FileRegion(block2, providedStorageLocation2));
BlockAliasMap.Reader<FileRegion> reader = BlockAliasMap.Reader<FileRegion> reader =
inMemoryLevelDBAliasMapClient.getReader(null); inMemoryLevelDBAliasMapClient.getReader(null, BPID);
List<FileRegion> actualFileRegions = List<FileRegion> actualFileRegions =
Lists.newArrayListWithCapacity(2); Lists.newArrayListWithCapacity(2);
for (FileRegion fileRegion : reader) { for (FileRegion fileRegion : reader) {
@ -140,8 +141,8 @@ public void iterateSingleBatch() throws Exception {
public void iterateThreeBatches() throws Exception { public void iterateThreeBatches() throws Exception {
conf.set(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_BATCH_SIZE, "2"); conf.set(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_BATCH_SIZE, "2");
levelDBAliasMapServer.setConf(conf); levelDBAliasMapServer.setConf(conf);
inMemoryLevelDBAliasMapClient.setConf(conf);
levelDBAliasMapServer.start(); levelDBAliasMapServer.start();
inMemoryLevelDBAliasMapClient.setConf(conf);
Block block1 = new Block(42, 43, 44); Block block1 = new Block(42, 43, 44);
Block block2 = new Block(43, 44, 45); Block block2 = new Block(43, 44, 45);
Block block3 = new Block(44, 45, 46); Block block3 = new Block(44, 45, 46);
@ -173,26 +174,26 @@ public void iterateThreeBatches() throws Exception {
new ProvidedStorageLocation(new Path("duck"), new ProvidedStorageLocation(new Path("duck"),
56, 57, nonce6); 56, 57, nonce6);
inMemoryLevelDBAliasMapClient inMemoryLevelDBAliasMapClient
.getWriter(null) .getWriter(null, BPID)
.store(new FileRegion(block1, providedStorageLocation1)); .store(new FileRegion(block1, providedStorageLocation1));
inMemoryLevelDBAliasMapClient inMemoryLevelDBAliasMapClient
.getWriter(null) .getWriter(null, BPID)
.store(new FileRegion(block2, providedStorageLocation2)); .store(new FileRegion(block2, providedStorageLocation2));
inMemoryLevelDBAliasMapClient inMemoryLevelDBAliasMapClient
.getWriter(null) .getWriter(null, BPID)
.store(new FileRegion(block3, providedStorageLocation3)); .store(new FileRegion(block3, providedStorageLocation3));
inMemoryLevelDBAliasMapClient inMemoryLevelDBAliasMapClient
.getWriter(null) .getWriter(null, BPID)
.store(new FileRegion(block4, providedStorageLocation4)); .store(new FileRegion(block4, providedStorageLocation4));
inMemoryLevelDBAliasMapClient inMemoryLevelDBAliasMapClient
.getWriter(null) .getWriter(null, BPID)
.store(new FileRegion(block5, providedStorageLocation5)); .store(new FileRegion(block5, providedStorageLocation5));
inMemoryLevelDBAliasMapClient inMemoryLevelDBAliasMapClient
.getWriter(null) .getWriter(null, BPID)
.store(new FileRegion(block6, providedStorageLocation6)); .store(new FileRegion(block6, providedStorageLocation6));
BlockAliasMap.Reader<FileRegion> reader = BlockAliasMap.Reader<FileRegion> reader =
inMemoryLevelDBAliasMapClient.getReader(null); inMemoryLevelDBAliasMapClient.getReader(null, BPID);
List<FileRegion> actualFileRegions = List<FileRegion> actualFileRegions =
Lists.newArrayListWithCapacity(6); Lists.newArrayListWithCapacity(6);
for (FileRegion fileRegion : reader) { for (FileRegion fileRegion : reader) {
@ -278,9 +279,9 @@ public FileRegion generateRandomFileRegion(int seed) {
@Test @Test
public void multipleReads() throws IOException { public void multipleReads() throws IOException {
inMemoryLevelDBAliasMapClient.setConf(conf);
levelDBAliasMapServer.setConf(conf); levelDBAliasMapServer.setConf(conf);
levelDBAliasMapServer.start(); levelDBAliasMapServer.start();
inMemoryLevelDBAliasMapClient.setConf(conf);
Random r = new Random(); Random r = new Random();
List<FileRegion> expectedFileRegions = r.ints(0, 200) List<FileRegion> expectedFileRegions = r.ints(0, 200)
@ -291,9 +292,9 @@ public void multipleReads() throws IOException {
BlockAliasMap.Reader<FileRegion> reader = BlockAliasMap.Reader<FileRegion> reader =
inMemoryLevelDBAliasMapClient.getReader(null); inMemoryLevelDBAliasMapClient.getReader(null, BPID);
BlockAliasMap.Writer<FileRegion> writer = BlockAliasMap.Writer<FileRegion> writer =
inMemoryLevelDBAliasMapClient.getWriter(null); inMemoryLevelDBAliasMapClient.getWriter(null, BPID);
ExecutorService executor = Executors.newCachedThreadPool(); ExecutorService executor = Executors.newCachedThreadPool();

View File

@ -35,6 +35,8 @@
*/ */
public class TestLevelDBFileRegionAliasMap { public class TestLevelDBFileRegionAliasMap {
private static final String BPID = "BPID-0";
/** /**
* A basic test to verify that we can write data and read it back again. * A basic test to verify that we can write data and read it back again.
* @throws Exception * @throws Exception
@ -48,13 +50,13 @@ public void testReadBack() throws Exception {
LevelDBFileRegionAliasMap.LevelDBOptions opts = LevelDBFileRegionAliasMap.LevelDBOptions opts =
new LevelDBFileRegionAliasMap.LevelDBOptions() new LevelDBFileRegionAliasMap.LevelDBOptions()
.filename(dbFile.getAbsolutePath()); .filename(dbFile.getAbsolutePath());
BlockAliasMap.Writer<FileRegion> writer = frf.getWriter(opts); BlockAliasMap.Writer<FileRegion> writer = frf.getWriter(opts, BPID);
FileRegion fr = new FileRegion(1, new Path("/file"), 1, 1, 1); FileRegion fr = new FileRegion(1, new Path("/file"), 1, 1, 1);
writer.store(fr); writer.store(fr);
writer.close(); writer.close();
BlockAliasMap.Reader<FileRegion> reader = frf.getReader(opts); BlockAliasMap.Reader<FileRegion> reader = frf.getReader(opts, BPID);
FileRegion fr2 = reader.resolve(new Block(1, 1, 1)).get(); FileRegion fr2 = reader.resolve(new Block(1, 1, 1)).get();
assertEquals(fr, fr2); assertEquals(fr, fr2);
reader.close(); reader.close();
@ -86,14 +88,14 @@ public void testIterate() throws Exception {
LevelDBFileRegionAliasMap.LevelDBOptions opts = LevelDBFileRegionAliasMap.LevelDBOptions opts =
new LevelDBFileRegionAliasMap.LevelDBOptions() new LevelDBFileRegionAliasMap.LevelDBOptions()
.filename(dbFile.getAbsolutePath()); .filename(dbFile.getAbsolutePath());
BlockAliasMap.Writer<FileRegion> writer = frf.getWriter(opts); BlockAliasMap.Writer<FileRegion> writer = frf.getWriter(opts, BPID);
for (FileRegion fr : regions) { for (FileRegion fr : regions) {
writer.store(fr); writer.store(fr);
} }
writer.close(); writer.close();
BlockAliasMap.Reader<FileRegion> reader = frf.getReader(opts); BlockAliasMap.Reader<FileRegion> reader = frf.getReader(opts, BPID);
Iterator<FileRegion> it = reader.iterator(); Iterator<FileRegion> it = reader.iterator();
int last = -1; int last = -1;
int count = 0; int count = 0;

View File

@ -36,6 +36,7 @@
import static org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType; import static org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType;
import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/** /**
* Tests the in-memory alias map with a mock level-db implementation. * Tests the in-memory alias map with a mock level-db implementation.
@ -46,12 +47,14 @@ public class TestLevelDbMockAliasMapClient {
private File tempDir; private File tempDir;
private Configuration conf; private Configuration conf;
private InMemoryAliasMap aliasMapMock; private InMemoryAliasMap aliasMapMock;
private final String bpid = "BPID-0";
@Before @Before
public void setUp() throws IOException { public void setUp() throws IOException {
aliasMapMock = mock(InMemoryAliasMap.class); aliasMapMock = mock(InMemoryAliasMap.class);
when(aliasMapMock.getBlockPoolId()).thenReturn(bpid);
levelDBAliasMapServer = new InMemoryLevelDBAliasMapServer( levelDBAliasMapServer = new InMemoryLevelDBAliasMapServer(
config -> aliasMapMock); config -> aliasMapMock, bpid);
conf = new Configuration(); conf = new Configuration();
int port = 9877; int port = 9877;
@ -60,10 +63,10 @@ public void setUp() throws IOException {
tempDir = Files.createTempDir(); tempDir = Files.createTempDir();
conf.set(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_LEVELDB_DIR, conf.set(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_LEVELDB_DIR,
tempDir.getAbsolutePath()); tempDir.getAbsolutePath());
inMemoryLevelDBAliasMapClient = new InMemoryLevelDBAliasMapClient();
inMemoryLevelDBAliasMapClient.setConf(conf);
levelDBAliasMapServer.setConf(conf); levelDBAliasMapServer.setConf(conf);
levelDBAliasMapServer.start(); levelDBAliasMapServer.start();
inMemoryLevelDBAliasMapClient = new InMemoryLevelDBAliasMapClient();
inMemoryLevelDBAliasMapClient.setConf(conf);
} }
@After @After
@ -83,11 +86,13 @@ public void readFailure() throws Exception {
assertThatExceptionOfType(IOException.class) assertThatExceptionOfType(IOException.class)
.isThrownBy(() -> .isThrownBy(() ->
inMemoryLevelDBAliasMapClient.getReader(null).resolve(block)); inMemoryLevelDBAliasMapClient.getReader(null, bpid)
.resolve(block));
assertThatExceptionOfType(IOException.class) assertThatExceptionOfType(IOException.class)
.isThrownBy(() -> .isThrownBy(() ->
inMemoryLevelDBAliasMapClient.getReader(null).resolve(block)); inMemoryLevelDBAliasMapClient.getReader(null, bpid)
.resolve(block));
} }
@Test @Test
@ -104,12 +109,12 @@ public void writeFailure() throws IOException {
assertThatExceptionOfType(IOException.class) assertThatExceptionOfType(IOException.class)
.isThrownBy(() -> .isThrownBy(() ->
inMemoryLevelDBAliasMapClient.getWriter(null) inMemoryLevelDBAliasMapClient.getWriter(null, bpid)
.store(new FileRegion(block, providedStorageLocation))); .store(new FileRegion(block, providedStorageLocation)));
assertThatExceptionOfType(IOException.class) assertThatExceptionOfType(IOException.class)
.isThrownBy(() -> .isThrownBy(() ->
inMemoryLevelDBAliasMapClient.getWriter(null) inMemoryLevelDBAliasMapClient.getWriter(null, bpid)
.store(new FileRegion(block, providedStorageLocation))); .store(new FileRegion(block, providedStorageLocation)));
} }

View File

@ -31,7 +31,10 @@
import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.junit.Test; import org.junit.Test;
import static org.apache.hadoop.hdfs.server.common.blockaliasmap.impl.TextFileRegionAliasMap.fileNameFromBlockPoolID;
import static org.junit.Assert.*; import static org.junit.Assert.*;
/** /**
@ -39,7 +42,10 @@
*/ */
public class TestTextBlockAliasMap { public class TestTextBlockAliasMap {
static final Path OUTFILE = new Path("hdfs://dummyServer:0000/dummyFile.txt"); static final String OUTFILE_PATH = "hdfs://dummyServer:0000/";
static final String OUTFILE_BASENAME = "dummyFile";
static final Path OUTFILE = new Path(OUTFILE_PATH, OUTFILE_BASENAME + "txt");
static final String BPID = "BPID-0";
void check(TextWriter.Options opts, final Path vp, void check(TextWriter.Options opts, final Path vp,
final Class<? extends CompressionCodec> vc) throws IOException { final Class<? extends CompressionCodec> vc) throws IOException {
@ -56,7 +62,25 @@ public TextWriter createWriter(Path file, CompressionCodec codec,
return null; // ignored return null; // ignored
} }
}; };
mFmt.getWriter(opts); mFmt.getWriter(opts, BPID);
}
void check(TextReader.Options opts, final Path vp,
final Class<? extends CompressionCodec> vc) throws IOException {
TextFileRegionAliasMap aliasMap = new TextFileRegionAliasMap() {
@Override
public TextReader createReader(Path file, String delim, Configuration cfg,
String blockPoolID) throws IOException {
assertEquals(vp, file);
if (null != vc) {
CompressionCodecFactory factory = new CompressionCodecFactory(cfg);
CompressionCodec codec = factory.getCodec(file);
assertEquals(vc, codec.getClass());
}
return null; // ignored
}
};
aliasMap.getReader(opts, BPID);
} }
@Test @Test
@ -64,18 +88,33 @@ public void testWriterOptions() throws Exception {
TextWriter.Options opts = TextWriter.defaults(); TextWriter.Options opts = TextWriter.defaults();
assertTrue(opts instanceof WriterOptions); assertTrue(opts instanceof WriterOptions);
WriterOptions wopts = (WriterOptions) opts; WriterOptions wopts = (WriterOptions) opts;
Path def = new Path(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_PATH_DEFAULT); Path def =
assertEquals(def, wopts.getFile()); new Path(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_WRITE_DIR_DEFAULT);
assertEquals(def, wopts.getDir());
assertNull(wopts.getCodec()); assertNull(wopts.getCodec());
opts.filename(OUTFILE); Path cp = new Path(OUTFILE_PATH, "blocks_" + BPID + ".csv");
check(opts, OUTFILE, null); opts.dirName(new Path(OUTFILE_PATH));
check(opts, cp, null);
opts.filename(OUTFILE);
opts.codec("gzip"); opts.codec("gzip");
Path cp = new Path(OUTFILE.getParent(), OUTFILE.getName() + ".gz"); cp = new Path(OUTFILE_PATH, "blocks_" + BPID + ".csv.gz");
check(opts, cp, org.apache.hadoop.io.compress.GzipCodec.class); check(opts, cp, org.apache.hadoop.io.compress.GzipCodec.class);
}
@Test
public void testReaderOptions() throws Exception {
TextReader.Options opts = TextReader.defaults();
assertTrue(opts instanceof ReaderOptions);
ReaderOptions ropts = (ReaderOptions) opts;
Path cp = new Path(OUTFILE_PATH, fileNameFromBlockPoolID(BPID));
opts.filename(cp);
check(opts, cp, null);
cp = new Path(OUTFILE_PATH, "blocks_" + BPID + ".csv.gz");
opts.filename(cp);
check(opts, cp, org.apache.hadoop.io.compress.GzipCodec.class);
} }
@Test @Test

View File

@ -150,7 +150,7 @@ public FileRegion next() {
} }
} }
region = new FileRegion(currentCount, new Path(newFile.toString()), region = new FileRegion(currentCount, new Path(newFile.toString()),
0, BLK_LEN, BLOCK_POOL_IDS[CHOSEN_BP_ID]); 0, BLK_LEN);
currentCount++; currentCount++;
} }
return region; return region;
@ -194,9 +194,12 @@ public static class TestFileRegionBlockAliasMap
} }
@Override @Override
public Reader<FileRegion> getReader(Reader.Options opts) public Reader<FileRegion> getReader(Reader.Options opts, String blockPoolId)
throws IOException { throws IOException {
if (!blockPoolId.equals(BLOCK_POOL_IDS[CHOSEN_BP_ID])) {
return null;
}
BlockAliasMap.Reader<FileRegion> reader = BlockAliasMap.Reader<FileRegion> reader =
new BlockAliasMap.Reader<FileRegion>() { new BlockAliasMap.Reader<FileRegion>() {
@Override @Override
@ -224,7 +227,7 @@ public Optional<FileRegion> resolve(Block ident)
} }
@Override @Override
public Writer<FileRegion> getWriter(Writer.Options opts) public Writer<FileRegion> getWriter(Writer.Options opts, String blockPoolId)
throws IOException { throws IOException {
// not implemented // not implemented
return null; return null;

View File

@ -73,6 +73,7 @@ static Options options() {
options.addOption("i", "blockidclass", true, "Block resolver class"); options.addOption("i", "blockidclass", true, "Block resolver class");
options.addOption("c", "cachedirs", true, "Max active dirents"); options.addOption("c", "cachedirs", true, "Max active dirents");
options.addOption("cid", "clusterID", true, "Cluster ID"); options.addOption("cid", "clusterID", true, "Cluster ID");
options.addOption("bpid", "blockPoolID", true, "Block Pool ID");
options.addOption("h", "help", false, "Print usage"); options.addOption("h", "help", false, "Print usage");
return options; return options;
} }
@ -120,6 +121,9 @@ public int run(String[] argv) throws Exception {
case "cid": case "cid":
opts.clusterID(o.getValue()); opts.clusterID(o.getValue());
break; break;
case "bpid":
opts.blockPoolID(o.getValue());
break;
default: default:
throw new UnsupportedOperationException("Internal error"); throw new UnsupportedOperationException("Internal error");
} }

View File

@ -134,6 +134,11 @@ public ImageWriter(Options opts) throws IOException {
if (opts.clusterID.length() > 0) { if (opts.clusterID.length() > 0) {
info.setClusterID(opts.clusterID); info.setClusterID(opts.clusterID);
} }
// if block pool id is given
if (opts.blockPoolID.length() > 0) {
info.setBlockPoolID(opts.blockPoolID);
}
stor.format(info); stor.format(info);
blockPoolID = info.getBlockPoolID(); blockPoolID = info.getBlockPoolID();
} }
@ -165,7 +170,7 @@ public ImageWriter(Options opts) throws IOException {
BlockAliasMap<FileRegion> fmt = null == opts.blocks BlockAliasMap<FileRegion> fmt = null == opts.blocks
? ReflectionUtils.newInstance(opts.aliasMap, opts.getConf()) ? ReflectionUtils.newInstance(opts.aliasMap, opts.getConf())
: opts.blocks; : opts.blocks;
blocks = fmt.getWriter(null); blocks = fmt.getWriter(null, blockPoolID);
blockIds = null == opts.blockIds blockIds = null == opts.blockIds
? ReflectionUtils.newInstance(opts.blockIdsClass, opts.getConf()) ? ReflectionUtils.newInstance(opts.blockIdsClass, opts.getConf())
: opts.blockIds; : opts.blockIds;
@ -525,6 +530,7 @@ public static class Options implements Configurable {
private Class<? extends UGIResolver> ugisClass; private Class<? extends UGIResolver> ugisClass;
private BlockAliasMap<FileRegion> blocks; private BlockAliasMap<FileRegion> blocks;
private String clusterID; private String clusterID;
private String blockPoolID;
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
private Class<? extends BlockAliasMap> aliasMap; private Class<? extends BlockAliasMap> aliasMap;
@ -552,6 +558,7 @@ public void setConf(Configuration conf) {
blockIdsClass = conf.getClass(BLOCK_RESOLVER_CLASS, blockIdsClass = conf.getClass(BLOCK_RESOLVER_CLASS,
FixedBlockResolver.class, BlockResolver.class); FixedBlockResolver.class, BlockResolver.class);
clusterID = ""; clusterID = "";
blockPoolID = "";
} }
@Override @Override
@ -614,6 +621,11 @@ public Options clusterID(String clusterID) {
this.clusterID = clusterID; this.clusterID = clusterID;
return this; return this;
} }
public Options blockPoolID(String blockPoolID) {
this.blockPoolID = blockPoolID;
return this;
}
} }
} }

View File

@ -36,7 +36,8 @@
public class NullBlockAliasMap extends BlockAliasMap<FileRegion> { public class NullBlockAliasMap extends BlockAliasMap<FileRegion> {
@Override @Override
public Reader<FileRegion> getReader(Reader.Options opts) throws IOException { public Reader<FileRegion> getReader(Reader.Options opts, String blockPoolID)
throws IOException {
return new Reader<FileRegion>() { return new Reader<FileRegion>() {
@Override @Override
public Iterator<FileRegion> iterator() { public Iterator<FileRegion> iterator() {
@ -69,7 +70,8 @@ public Optional<FileRegion> resolve(Block ident) throws IOException {
} }
@Override @Override
public Writer getWriter(Writer.Options opts) throws IOException { public Writer getWriter(Writer.Options opts, String blockPoolID)
throws IOException {
return new Writer<FileRegion>() { return new Writer<FileRegion>() {
@Override @Override
public void store(FileRegion token) throws IOException { public void store(FileRegion token) throws IOException {

View File

@ -107,8 +107,7 @@ void writeBlock(long blockId, long offset, long length,
long genStamp, String blockPoolID, long genStamp, String blockPoolID,
BlockAliasMap.Writer<FileRegion> out) throws IOException { BlockAliasMap.Writer<FileRegion> out) throws IOException {
FileStatus s = getFileStatus(); FileStatus s = getFileStatus();
out.store(new FileRegion(blockId, s.getPath(), offset, length, out.store(new FileRegion(blockId, s.getPath(), offset, length, genStamp));
blockPoolID, genStamp));
} }
INode toFile(UGIResolver ugi, BlockResolver blk, INode toFile(UGIResolver ugi, BlockResolver blk,

View File

@ -41,7 +41,6 @@
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.BlockMissingException;
import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSTestUtil;
@ -79,6 +78,7 @@
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import static org.apache.hadoop.hdfs.server.common.blockaliasmap.impl.TextFileRegionAliasMap.fileNameFromBlockPoolID;
import static org.apache.hadoop.net.NodeBase.PATH_SEPARATOR_STR; import static org.apache.hadoop.net.NodeBase.PATH_SEPARATOR_STR;
import static org.junit.Assert.*; import static org.junit.Assert.*;
@ -93,7 +93,6 @@ public class TestNameNodeProvidedImplementation {
final Path BASE = new Path(fBASE.toURI().toString()); final Path BASE = new Path(fBASE.toURI().toString());
final Path NAMEPATH = new Path(BASE, "providedDir"); final Path NAMEPATH = new Path(BASE, "providedDir");
final Path NNDIRPATH = new Path(BASE, "nnDir"); final Path NNDIRPATH = new Path(BASE, "nnDir");
final Path BLOCKFILE = new Path(NNDIRPATH, "blocks.csv");
final String SINGLEUSER = "usr1"; final String SINGLEUSER = "usr1";
final String SINGLEGROUP = "grp1"; final String SINGLEGROUP = "grp1";
private final int numFiles = 10; private final int numFiles = 10;
@ -101,6 +100,7 @@ public class TestNameNodeProvidedImplementation {
private final String fileSuffix = ".dat"; private final String fileSuffix = ".dat";
private final int baseFileLen = 1024; private final int baseFileLen = 1024;
private long providedDataSize = 0; private long providedDataSize = 0;
private final String bpid = "BP-1234-10.1.1.1-1224";
Configuration conf; Configuration conf;
MiniDFSCluster cluster; MiniDFSCluster cluster;
@ -123,10 +123,10 @@ public void setSeed() throws Exception {
conf.setClass(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_CLASS, conf.setClass(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_CLASS,
TextFileRegionAliasMap.class, BlockAliasMap.class); TextFileRegionAliasMap.class, BlockAliasMap.class);
conf.set(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_WRITE_PATH, conf.set(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_WRITE_DIR,
BLOCKFILE.toString()); NNDIRPATH.toString());
conf.set(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_READ_PATH, conf.set(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_READ_FILE,
BLOCKFILE.toString()); new Path(NNDIRPATH, fileNameFromBlockPoolID(bpid)).toString());
conf.set(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_DELIMITER, ","); conf.set(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_DELIMITER, ",");
conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR_PROVIDED, conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR_PROVIDED,
@ -189,14 +189,14 @@ void createImage(TreeWalk t, Path out,
opts.output(out.toString()) opts.output(out.toString())
.blocks(aliasMapClass) .blocks(aliasMapClass)
.blockIds(blockIdsClass) .blockIds(blockIdsClass)
.clusterID(clusterID); .clusterID(clusterID)
.blockPoolID(bpid);
try (ImageWriter w = new ImageWriter(opts)) { try (ImageWriter w = new ImageWriter(opts)) {
for (TreePath e : t) { for (TreePath e : t) {
w.accept(e); w.accept(e);
} }
} }
} }
void startCluster(Path nspath, int numDatanodes, void startCluster(Path nspath, int numDatanodes,
StorageType[] storageTypes, StorageType[] storageTypes,
StorageType[][] storageTypesPerDatanode, StorageType[][] storageTypesPerDatanode,
@ -743,9 +743,7 @@ public void testNumberOfProvidedLocationsManyBlocks() throws Exception {
} }
// This test will fail until there is a refactoring of the FileRegion @Test
// (HDFS-12713).
@Test(expected=BlockMissingException.class)
public void testInMemoryAliasMap() throws Exception { public void testInMemoryAliasMap() throws Exception {
conf.setClass(ImageWriter.Options.UGI_CLASS, conf.setClass(ImageWriter.Options.UGI_CLASS,
FsUGIResolver.class, UGIResolver.class); FsUGIResolver.class, UGIResolver.class);
@ -758,9 +756,9 @@ public void testInMemoryAliasMap() throws Exception {
conf.set(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_LEVELDB_DIR, conf.set(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_LEVELDB_DIR,
tempDirectory.getAbsolutePath()); tempDirectory.getAbsolutePath());
conf.setBoolean(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_ENABLED, true); conf.setBoolean(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_ENABLED, true);
conf.setInt(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_LOAD_RETRIES, 10);
InMemoryLevelDBAliasMapServer levelDBAliasMapServer = InMemoryLevelDBAliasMapServer levelDBAliasMapServer =
new InMemoryLevelDBAliasMapServer(InMemoryAliasMap::init); new InMemoryLevelDBAliasMapServer(InMemoryAliasMap::init, bpid);
levelDBAliasMapServer.setConf(conf); levelDBAliasMapServer.setConf(conf);
levelDBAliasMapServer.start(); levelDBAliasMapServer.start();