HDFS-12912. [READ] Fix configuration and implementation of LevelDB-based alias maps
This commit is contained in:
parent
c89b29bd42
commit
80c3fec3a1
@ -59,6 +59,7 @@ public class InMemoryAliasMap implements InMemoryAliasMapProtocol,
|
||||
|
||||
private final DB levelDb;
|
||||
private Configuration conf;
|
||||
private String blockPoolID;
|
||||
|
||||
@Override
|
||||
public void setConf(Configuration conf) {
|
||||
@ -79,32 +80,38 @@ static String createPathErrorMessage(String directory) {
|
||||
.toString();
|
||||
}
|
||||
|
||||
public static @Nonnull InMemoryAliasMap init(Configuration conf)
|
||||
throws IOException {
|
||||
public static @Nonnull InMemoryAliasMap init(Configuration conf,
|
||||
String blockPoolID) throws IOException {
|
||||
Options options = new Options();
|
||||
options.createIfMissing(true);
|
||||
String directory =
|
||||
conf.get(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_LEVELDB_DIR);
|
||||
LOG.info("Attempting to load InMemoryAliasMap from \"{}\"", directory);
|
||||
File path = new File(directory);
|
||||
if (!path.exists()) {
|
||||
File levelDBpath;
|
||||
if (blockPoolID != null) {
|
||||
levelDBpath = new File(directory, blockPoolID);
|
||||
} else {
|
||||
levelDBpath = new File(directory);
|
||||
}
|
||||
if (!levelDBpath.exists()) {
|
||||
String error = createPathErrorMessage(directory);
|
||||
throw new IOException(error);
|
||||
}
|
||||
DB levelDb = JniDBFactory.factory.open(path, options);
|
||||
InMemoryAliasMap aliasMap = new InMemoryAliasMap(levelDb);
|
||||
DB levelDb = JniDBFactory.factory.open(levelDBpath, options);
|
||||
InMemoryAliasMap aliasMap = new InMemoryAliasMap(levelDb, blockPoolID);
|
||||
aliasMap.setConf(conf);
|
||||
return aliasMap;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
InMemoryAliasMap(DB levelDb) {
|
||||
InMemoryAliasMap(DB levelDb, String blockPoolID) {
|
||||
this.levelDb = levelDb;
|
||||
this.blockPoolID = blockPoolID;
|
||||
}
|
||||
|
||||
@Override
|
||||
public IterationResult list(Optional<Block> marker) throws IOException {
|
||||
return withIterator((DBIterator iterator) -> {
|
||||
try (DBIterator iterator = levelDb.iterator()) {
|
||||
Integer batchSize =
|
||||
conf.getInt(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_BATCH_SIZE,
|
||||
DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_BATCH_SIZE_DEFAULT);
|
||||
@ -130,8 +137,7 @@ public IterationResult list(Optional<Block> marker) throws IOException {
|
||||
} else {
|
||||
return new IterationResult(batch, Optional.empty());
|
||||
}
|
||||
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
public @Nonnull Optional<ProvidedStorageLocation> read(@Nonnull Block block)
|
||||
@ -159,7 +165,7 @@ public void write(@Nonnull Block block,
|
||||
|
||||
@Override
|
||||
public String getBlockPoolId() {
|
||||
return null;
|
||||
return blockPoolID;
|
||||
}
|
||||
|
||||
public void close() throws IOException {
|
||||
@ -202,21 +208,15 @@ public static byte[] toProtoBufBytes(@Nonnull Block block)
|
||||
return blockOutputStream.toByteArray();
|
||||
}
|
||||
|
||||
private IterationResult withIterator(
|
||||
CheckedFunction<DBIterator, IterationResult> func) throws IOException {
|
||||
try (DBIterator iterator = levelDb.iterator()) {
|
||||
return func.apply(iterator);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* CheckedFunction is akin to {@link java.util.function.Function} but
|
||||
* specifies an IOException.
|
||||
* @param <T> Argument type.
|
||||
* @param <T1> First argument type.
|
||||
* @param <T2> Second argument type.
|
||||
* @param <R> Return type.
|
||||
*/
|
||||
@FunctionalInterface
|
||||
public interface CheckedFunction<T, R> {
|
||||
R apply(T t) throws IOException;
|
||||
public interface CheckedFunction2<T1, T2, R> {
|
||||
R apply(T1 t1, T2 t2) throws IOException;
|
||||
}
|
||||
}
|
||||
|
@ -38,7 +38,7 @@
|
||||
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.protocol.proto.AliasMapProtocolProtos.*;
|
||||
import static org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMap.CheckedFunction;
|
||||
import static org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMap.CheckedFunction2;
|
||||
|
||||
/**
|
||||
* InMemoryLevelDBAliasMapServer is the entry point from the Namenode into
|
||||
@ -51,14 +51,15 @@ public class InMemoryLevelDBAliasMapServer implements InMemoryAliasMapProtocol,
|
||||
|
||||
private static final Logger LOG = LoggerFactory
|
||||
.getLogger(InMemoryLevelDBAliasMapServer.class);
|
||||
private final CheckedFunction<Configuration, InMemoryAliasMap> initFun;
|
||||
private final CheckedFunction2<Configuration, String, InMemoryAliasMap>
|
||||
initFun;
|
||||
private RPC.Server aliasMapServer;
|
||||
private Configuration conf;
|
||||
private InMemoryAliasMap aliasMap;
|
||||
private String blockPoolId;
|
||||
|
||||
public InMemoryLevelDBAliasMapServer(
|
||||
CheckedFunction<Configuration, InMemoryAliasMap> initFun,
|
||||
CheckedFunction2<Configuration, String, InMemoryAliasMap> initFun,
|
||||
String blockPoolId) {
|
||||
this.initFun = initFun;
|
||||
this.blockPoolId = blockPoolId;
|
||||
@ -127,7 +128,7 @@ public String getBlockPoolId() {
|
||||
public void setConf(Configuration conf) {
|
||||
this.conf = conf;
|
||||
try {
|
||||
this.aliasMap = initFun.apply(conf);
|
||||
this.aliasMap = initFun.apply(conf, blockPoolId);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
@ -113,6 +113,11 @@ private static DB createDB(String levelDBPath, boolean createIfMissing,
|
||||
} else {
|
||||
dbFile = new File(levelDBPath);
|
||||
}
|
||||
if (createIfMissing && !dbFile.exists()) {
|
||||
if (!dbFile.mkdirs()) {
|
||||
throw new IOException("Unable to create " + dbFile);
|
||||
}
|
||||
}
|
||||
return factory.open(dbFile, options);
|
||||
}
|
||||
|
||||
|
@ -134,7 +134,7 @@ hadoop org.apache.hadoop.hdfs.server.namenode.FileSystemImage \
|
||||
Assign ownership based on a custom `UGIResolver`, in LevelDB:
|
||||
```
|
||||
hadoop org.apache.hadoop.hdfs.server.namenode.FileSystemImage \
|
||||
-Ddfs.provided.aliasmap.leveldb.path=file:///path/to/leveldb/map/dingos.db \
|
||||
-Ddfs.provided.aliasmap.leveldb.path=/path/to/leveldb/map/dingos.db \
|
||||
-b org.apache.hadoop.hdfs.server.common.blockaliasmap.impl.LevelDBFileRegionAliasMap \
|
||||
-o file:///tmp/name \
|
||||
-u CustomResolver \
|
||||
@ -180,7 +180,7 @@ Datanodes contact this alias map using the `org.apache.hadoop.hdfs.server.aliasm
|
||||
|
||||
<property>
|
||||
<name>dfs.provided.aliasmap.inmemory.leveldb.dir</name>
|
||||
<value>file:///path/to/leveldb/map/dingos.db</value>
|
||||
<value>/path/to/leveldb/map/dingos.db</value>
|
||||
<description>
|
||||
The directory where the leveldb files will be kept
|
||||
</description>
|
||||
|
@ -44,14 +44,17 @@
|
||||
public class ITestInMemoryAliasMap {
|
||||
private InMemoryAliasMap aliasMap;
|
||||
private File tempDirectory;
|
||||
private static String bpid = "bpid-0";
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
tempDirectory = Files.createTempDirectory("seagull").toFile();
|
||||
File temp = Files.createTempDirectory("seagull").toFile();
|
||||
tempDirectory = new File(temp, bpid);
|
||||
tempDirectory.mkdirs();
|
||||
conf.set(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_LEVELDB_DIR,
|
||||
tempDirectory.getAbsolutePath());
|
||||
aliasMap = InMemoryAliasMap.init(conf);
|
||||
temp.getAbsolutePath());
|
||||
aliasMap = InMemoryAliasMap.init(conf, bpid);
|
||||
}
|
||||
|
||||
@After
|
||||
|
@ -39,7 +39,7 @@ public void testInit() {
|
||||
nonExistingDirectory);
|
||||
|
||||
assertThatExceptionOfType(IOException.class)
|
||||
.isThrownBy(() -> InMemoryAliasMap.init(conf)).withMessage(
|
||||
.isThrownBy(() -> InMemoryAliasMap.init(conf, "bpid")).withMessage(
|
||||
InMemoryAliasMap.createPathErrorMessage(nonExistingDirectory));
|
||||
}
|
||||
}
|
@ -66,6 +66,8 @@ public void setUp() throws IOException {
|
||||
conf.set(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS,
|
||||
"localhost:" + port);
|
||||
tempDir = Files.createTempDir();
|
||||
File levelDBDir = new File(tempDir, BPID);
|
||||
levelDBDir.mkdirs();
|
||||
conf.set(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_LEVELDB_DIR,
|
||||
tempDir.getAbsolutePath());
|
||||
levelDBAliasMapServer =
|
||||
|
@ -54,7 +54,7 @@ public void setUp() throws IOException {
|
||||
aliasMapMock = mock(InMemoryAliasMap.class);
|
||||
when(aliasMapMock.getBlockPoolId()).thenReturn(bpid);
|
||||
levelDBAliasMapServer = new InMemoryLevelDBAliasMapServer(
|
||||
config -> aliasMapMock, bpid);
|
||||
(config, blockPoolID) -> aliasMapMock, bpid);
|
||||
conf = new Configuration();
|
||||
int port = 9877;
|
||||
|
||||
|
@ -784,6 +784,8 @@ public void testInMemoryAliasMap() throws Exception {
|
||||
"localhost:32445");
|
||||
File tempDirectory =
|
||||
Files.createTempDirectory("in-memory-alias-map").toFile();
|
||||
File leveDBPath = new File(tempDirectory, bpid);
|
||||
leveDBPath.mkdirs();
|
||||
conf.set(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_LEVELDB_DIR,
|
||||
tempDirectory.getAbsolutePath());
|
||||
conf.setBoolean(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_ENABLED, true);
|
||||
|
Loading…
Reference in New Issue
Block a user