HDFS-11902. [READ] Merge BlockFormatProvider and FileRegionProvider.
This commit is contained in:
parent
d6a9a89973
commit
98f5ed5aa3
@ -331,22 +331,19 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||
public static final String DFS_NAMENODE_PROVIDED_ENABLED = "dfs.namenode.provided.enabled";
|
||||
public static final boolean DFS_NAMENODE_PROVIDED_ENABLED_DEFAULT = false;
|
||||
|
||||
public static final String DFS_NAMENODE_BLOCK_PROVIDER_CLASS = "dfs.namenode.block.provider.class";
|
||||
|
||||
public static final String DFS_PROVIDER_CLASS = "dfs.provider.class";
|
||||
public static final String DFS_PROVIDER_DF_CLASS = "dfs.provided.df.class";
|
||||
public static final String DFS_PROVIDER_STORAGEUUID = "dfs.provided.storage.id";
|
||||
public static final String DFS_PROVIDER_STORAGEUUID_DEFAULT = "DS-PROVIDED";
|
||||
public static final String DFS_PROVIDER_BLK_FORMAT_CLASS = "dfs.provided.blockformat.class";
|
||||
public static final String DFS_PROVIDED_ALIASMAP_CLASS = "dfs.provided.aliasmap.class";
|
||||
|
||||
public static final String DFS_PROVIDED_BLOCK_MAP_DELIMITER = "dfs.provided.textprovider.delimiter";
|
||||
public static final String DFS_PROVIDED_BLOCK_MAP_DELIMITER_DEFAULT = ",";
|
||||
public static final String DFS_PROVIDED_ALIASMAP_TEXT_DELIMITER = "dfs.provided.aliasmap.text.delimiter";
|
||||
public static final String DFS_PROVIDED_ALIASMAP_TEXT_DELIMITER_DEFAULT = ",";
|
||||
|
||||
public static final String DFS_PROVIDED_BLOCK_MAP_READ_PATH = "dfs.provided.textprovider.read.path";
|
||||
public static final String DFS_PROVIDED_BLOCK_MAP_PATH_DEFAULT = "file:///tmp/blocks.csv";
|
||||
public static final String DFS_PROVIDED_ALIASMAP_TEXT_READ_PATH = "dfs.provided.aliasmap.text.read.path";
|
||||
public static final String DFS_PROVIDED_ALIASMAP_TEXT_PATH_DEFAULT = "file:///tmp/blocks.csv";
|
||||
|
||||
public static final String DFS_PROVIDED_BLOCK_MAP_CODEC = "dfs.provided.textprovider.read.codec";
|
||||
public static final String DFS_PROVIDED_BLOCK_MAP_WRITE_PATH = "dfs.provided.textprovider.write.path";
|
||||
public static final String DFS_PROVIDED_ALIASMAP_TEXT_CODEC = "dfs.provided.aliasmap.text.codec";
|
||||
public static final String DFS_PROVIDED_ALIASMAP_TEXT_WRITE_PATH = "dfs.provided.aliasmap.text.write.path";
|
||||
|
||||
public static final String DFS_LIST_LIMIT = "dfs.ls.limit";
|
||||
public static final int DFS_LIST_LIMIT_DEFAULT = 1000;
|
||||
|
@ -1,91 +0,0 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Iterator;
|
||||
|
||||
import org.apache.hadoop.conf.Configurable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.server.common.BlockAlias;
|
||||
import org.apache.hadoop.hdfs.server.common.BlockFormat;
|
||||
import org.apache.hadoop.hdfs.server.common.TextFileRegionFormat;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Loads provided blocks from a {@link BlockFormat}.
|
||||
*/
|
||||
public class BlockFormatProvider extends BlockProvider
|
||||
implements Configurable {
|
||||
|
||||
private Configuration conf;
|
||||
private BlockFormat<? extends BlockAlias> blockFormat;
|
||||
public static final Logger LOG =
|
||||
LoggerFactory.getLogger(BlockFormatProvider.class);
|
||||
|
||||
@Override
|
||||
@SuppressWarnings({ "rawtypes", "unchecked" })
|
||||
public void setConf(Configuration conf) {
|
||||
Class<? extends BlockFormat> c = conf.getClass(
|
||||
DFSConfigKeys.DFS_PROVIDER_BLK_FORMAT_CLASS,
|
||||
TextFileRegionFormat.class, BlockFormat.class);
|
||||
blockFormat = ReflectionUtils.newInstance(c, conf);
|
||||
LOG.info("Loaded BlockFormat class : " + c.getClass().getName());
|
||||
this.conf = conf;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Configuration getConf() {
|
||||
return conf;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterator<Block> iterator() {
|
||||
try {
|
||||
final BlockFormat.Reader<? extends BlockAlias> reader =
|
||||
blockFormat.getReader(null);
|
||||
|
||||
return new Iterator<Block>() {
|
||||
|
||||
private final Iterator<? extends BlockAlias> inner = reader.iterator();
|
||||
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
return inner.hasNext();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Block next() {
|
||||
return inner.next().getBlock();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void remove() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
};
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException("Failed to read provided blocks", e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -1,75 +0,0 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||
|
||||
import java.io.IOException;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.ProvidedStorageMap.ProvidedBlockList;
|
||||
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
|
||||
import org.apache.hadoop.hdfs.util.RwLock;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Used to load provided blocks in the {@link BlockManager}.
|
||||
*/
|
||||
public abstract class BlockProvider implements Iterable<Block> {
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(ProvidedStorageMap.class);
|
||||
|
||||
private RwLock lock;
|
||||
private BlockManager bm;
|
||||
private DatanodeStorageInfo storage;
|
||||
private boolean hasDNs = false;
|
||||
|
||||
/**
|
||||
* @param lock the namesystem lock
|
||||
* @param bm block manager
|
||||
* @param storage storage for provided blocks
|
||||
*/
|
||||
void init(RwLock lock, BlockManager bm, DatanodeStorageInfo storage) {
|
||||
this.bm = bm;
|
||||
this.lock = lock;
|
||||
this.storage = storage;
|
||||
}
|
||||
|
||||
/**
|
||||
* start the processing of block report for provided blocks.
|
||||
* @throws IOException
|
||||
*/
|
||||
void start(BlockReportContext context) throws IOException {
|
||||
assert lock.hasWriteLock() : "Not holding write lock";
|
||||
if (hasDNs) {
|
||||
return;
|
||||
}
|
||||
if (storage.getBlockReportCount() == 0) {
|
||||
LOG.info("Calling process first blk report from storage: " + storage);
|
||||
// first pass; periodic refresh should call bm.processReport
|
||||
bm.processFirstBlockReport(storage, new ProvidedBlockList(iterator()));
|
||||
} else {
|
||||
bm.processReport(storage, new ProvidedBlockList(iterator()), context);
|
||||
}
|
||||
hasDNs = true;
|
||||
}
|
||||
|
||||
void stop() {
|
||||
assert lock.hasWriteLock() : "Not holding write lock";
|
||||
hasDNs = false;
|
||||
}
|
||||
}
|
@ -40,7 +40,10 @@
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||
import org.apache.hadoop.hdfs.server.common.blockaliasmap.BlockAliasMap;
|
||||
import org.apache.hadoop.hdfs.server.common.blockaliasmap.impl.TextFileRegionAliasMap;
|
||||
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
|
||||
import org.apache.hadoop.hdfs.server.common.BlockAlias;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
|
||||
import org.apache.hadoop.hdfs.util.RwLock;
|
||||
@ -61,7 +64,11 @@ public class ProvidedStorageMap {
|
||||
LoggerFactory.getLogger(ProvidedStorageMap.class);
|
||||
|
||||
// limit to a single provider for now
|
||||
private final BlockProvider blockProvider;
|
||||
private RwLock lock;
|
||||
private BlockManager bm;
|
||||
private boolean hasDNs = false;
|
||||
private BlockAliasMap aliasMap;
|
||||
|
||||
private final String storageId;
|
||||
private final ProvidedDescriptor providedDescriptor;
|
||||
private final DatanodeStorageInfo providedStorageInfo;
|
||||
@ -79,7 +86,7 @@ public class ProvidedStorageMap {
|
||||
|
||||
if (!providedEnabled) {
|
||||
// disable mapping
|
||||
blockProvider = null;
|
||||
aliasMap = null;
|
||||
providedDescriptor = null;
|
||||
providedStorageInfo = null;
|
||||
return;
|
||||
@ -90,15 +97,17 @@ public class ProvidedStorageMap {
|
||||
providedDescriptor = new ProvidedDescriptor();
|
||||
providedStorageInfo = providedDescriptor.createProvidedStorage(ds);
|
||||
|
||||
// load block reader into storage
|
||||
Class<? extends BlockProvider> fmt = conf.getClass(
|
||||
DFSConfigKeys.DFS_NAMENODE_BLOCK_PROVIDER_CLASS,
|
||||
BlockFormatProvider.class, BlockProvider.class);
|
||||
this.bm = bm;
|
||||
this.lock = lock;
|
||||
|
||||
blockProvider = ReflectionUtils.newInstance(fmt, conf);
|
||||
blockProvider.init(lock, bm, providedStorageInfo);
|
||||
LOG.info("Loaded block provider class: " +
|
||||
blockProvider.getClass() + " storage: " + providedStorageInfo);
|
||||
// load block reader into storage
|
||||
Class<? extends BlockAliasMap> aliasMapClass = conf.getClass(
|
||||
DFSConfigKeys.DFS_PROVIDED_ALIASMAP_CLASS,
|
||||
TextFileRegionAliasMap.class, BlockAliasMap.class);
|
||||
aliasMap = ReflectionUtils.newInstance(aliasMapClass, conf);
|
||||
|
||||
LOG.info("Loaded alias map class: " +
|
||||
aliasMap.getClass() + " storage: " + providedStorageInfo);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -114,8 +123,7 @@ DatanodeStorageInfo getStorage(DatanodeDescriptor dn, DatanodeStorage s,
|
||||
BlockReportContext context) throws IOException {
|
||||
if (providedEnabled && storageId.equals(s.getStorageID())) {
|
||||
if (StorageType.PROVIDED.equals(s.getStorageType())) {
|
||||
// poll service, initiate
|
||||
blockProvider.start(context);
|
||||
processProvidedStorageReport(context);
|
||||
dn.injectStorage(providedStorageInfo);
|
||||
return providedDescriptor.getProvidedStorage(dn, s);
|
||||
}
|
||||
@ -124,6 +132,26 @@ DatanodeStorageInfo getStorage(DatanodeDescriptor dn, DatanodeStorage s,
|
||||
return dn.getStorageInfo(s.getStorageID());
|
||||
}
|
||||
|
||||
private void processProvidedStorageReport(BlockReportContext context)
|
||||
throws IOException {
|
||||
assert lock.hasWriteLock() : "Not holding write lock";
|
||||
if (hasDNs) {
|
||||
return;
|
||||
}
|
||||
if (providedStorageInfo.getBlockReportCount() == 0) {
|
||||
LOG.info("Calling process first blk report from storage: "
|
||||
+ providedStorageInfo);
|
||||
// first pass; periodic refresh should call bm.processReport
|
||||
bm.processFirstBlockReport(providedStorageInfo,
|
||||
new ProvidedBlockList(aliasMap.getReader(null).iterator()));
|
||||
} else {
|
||||
bm.processReport(providedStorageInfo,
|
||||
new ProvidedBlockList(aliasMap.getReader(null).iterator()),
|
||||
context);
|
||||
}
|
||||
hasDNs = true;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public DatanodeStorageInfo getProvidedStorageInfo() {
|
||||
return providedStorageInfo;
|
||||
@ -137,10 +165,11 @@ public LocatedBlockBuilder newLocatedBlocks(int maxValue) {
|
||||
}
|
||||
|
||||
public void removeDatanode(DatanodeDescriptor dnToRemove) {
|
||||
if (providedDescriptor != null) {
|
||||
if (providedEnabled) {
|
||||
assert lock.hasWriteLock() : "Not holding write lock";
|
||||
int remainingDatanodes = providedDescriptor.remove(dnToRemove);
|
||||
if (remainingDatanodes == 0) {
|
||||
blockProvider.stop();
|
||||
hasDNs = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -443,9 +472,9 @@ boolean removeBlock(BlockInfo b) {
|
||||
*/
|
||||
static class ProvidedBlockList extends BlockListAsLongs {
|
||||
|
||||
private final Iterator<Block> inner;
|
||||
private final Iterator<BlockAlias> inner;
|
||||
|
||||
ProvidedBlockList(Iterator<Block> inner) {
|
||||
ProvidedBlockList(Iterator<BlockAlias> inner) {
|
||||
this.inner = inner;
|
||||
}
|
||||
|
||||
@ -454,7 +483,7 @@ public Iterator<BlockReportReplica> iterator() {
|
||||
return new Iterator<BlockReportReplica>() {
|
||||
@Override
|
||||
public BlockReportReplica next() {
|
||||
return new BlockReportReplica(inner.next());
|
||||
return new BlockReportReplica(inner.next().getBlock());
|
||||
}
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
|
@ -1,88 +0,0 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hdfs.server.common;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Iterator;
|
||||
|
||||
import org.apache.hadoop.conf.Configurable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
|
||||
/**
|
||||
* This class is used to read file regions from block maps
|
||||
* specified using delimited text.
|
||||
*/
|
||||
public class TextFileRegionProvider
|
||||
extends FileRegionProvider implements Configurable {
|
||||
|
||||
private Configuration conf;
|
||||
private BlockFormat<FileRegion> fmt;
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public void setConf(Configuration conf) {
|
||||
fmt = ReflectionUtils.newInstance(
|
||||
conf.getClass(DFSConfigKeys.DFS_PROVIDER_BLK_FORMAT_CLASS,
|
||||
TextFileRegionFormat.class,
|
||||
BlockFormat.class),
|
||||
conf);
|
||||
((Configurable)fmt).setConf(conf); //redundant?
|
||||
this.conf = conf;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Configuration getConf() {
|
||||
return conf;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterator<FileRegion> iterator() {
|
||||
try {
|
||||
final BlockFormat.Reader<FileRegion> r = fmt.getReader(null);
|
||||
return new Iterator<FileRegion>() {
|
||||
|
||||
private final Iterator<FileRegion> inner = r.iterator();
|
||||
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
return inner.hasNext();
|
||||
}
|
||||
|
||||
@Override
|
||||
public FileRegion next() {
|
||||
return inner.next();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void remove() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
};
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException("Failed to read provided blocks", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void refresh() throws IOException {
|
||||
fmt.refresh();
|
||||
}
|
||||
}
|
@ -15,17 +15,18 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.common;
|
||||
package org.apache.hadoop.hdfs.server.common.blockaliasmap;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.server.common.BlockAlias;
|
||||
|
||||
/**
|
||||
* An abstract class used to read and write block maps for provided blocks.
|
||||
*/
|
||||
public abstract class BlockFormat<T extends BlockAlias> {
|
||||
public abstract class BlockAliasMap<T extends BlockAlias> {
|
||||
|
||||
/**
|
||||
* An abstract class that is used to read {@link BlockAlias}es
|
||||
@ -39,14 +40,19 @@ public static abstract class Reader<U extends BlockAlias>
|
||||
*/
|
||||
public interface Options { }
|
||||
|
||||
/**
|
||||
* @param ident block to resolve
|
||||
* @return BlockAlias correspoding to the provided block.
|
||||
* @throws IOException
|
||||
*/
|
||||
public abstract U resolve(Block ident) throws IOException;
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the reader for the provided block map.
|
||||
* Returns a reader to the alias map.
|
||||
* @param opts reader options
|
||||
* @return {@link Reader} to the block map.
|
||||
* @return {@link Reader} to the alias map.
|
||||
* @throws IOException
|
||||
*/
|
||||
public abstract Reader<T> getReader(Reader.Options opts) throws IOException;
|
||||
@ -66,15 +72,15 @@ public interface Options { }
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the writer for the provided block map.
|
||||
* Returns the writer for the alias map.
|
||||
* @param opts writer options.
|
||||
* @return {@link Writer} to the block map.
|
||||
* @return {@link Writer} to the alias map.
|
||||
* @throws IOException
|
||||
*/
|
||||
public abstract Writer<T> getWriter(Writer.Options opts) throws IOException;
|
||||
|
||||
/**
|
||||
* Refresh based on the underlying block map.
|
||||
* Refresh the alias map.
|
||||
* @throws IOException
|
||||
*/
|
||||
public abstract void refresh() throws IOException;
|
@ -16,7 +16,7 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hdfs.server.common;
|
||||
package org.apache.hadoop.hdfs.server.common.blockaliasmap.impl;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
@ -40,6 +40,8 @@
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.server.common.FileRegion;
|
||||
import org.apache.hadoop.hdfs.server.common.blockaliasmap.BlockAliasMap;
|
||||
import org.apache.hadoop.io.MultipleIOException;
|
||||
import org.apache.hadoop.io.compress.CompressionCodec;
|
||||
import org.apache.hadoop.io.compress.CompressionCodecFactory;
|
||||
@ -52,15 +54,15 @@
|
||||
* This class is used for block maps stored as text files,
|
||||
* with a specified delimiter.
|
||||
*/
|
||||
public class TextFileRegionFormat
|
||||
extends BlockFormat<FileRegion> implements Configurable {
|
||||
public class TextFileRegionAliasMap
|
||||
extends BlockAliasMap<FileRegion> implements Configurable {
|
||||
|
||||
private Configuration conf;
|
||||
private ReaderOptions readerOpts = TextReader.defaults();
|
||||
private WriterOptions writerOpts = TextWriter.defaults();
|
||||
|
||||
public static final Logger LOG =
|
||||
LoggerFactory.getLogger(TextFileRegionFormat.class);
|
||||
LoggerFactory.getLogger(TextFileRegionAliasMap.class);
|
||||
@Override
|
||||
public void setConf(Configuration conf) {
|
||||
readerOpts.setConf(conf);
|
||||
@ -137,27 +139,28 @@ TextWriter createWriter(Path file, CompressionCodec codec, String delim,
|
||||
}
|
||||
|
||||
/**
|
||||
* Class specifying reader options for the {@link TextFileRegionFormat}.
|
||||
* Class specifying reader options for the {@link TextFileRegionAliasMap}.
|
||||
*/
|
||||
public static class ReaderOptions
|
||||
implements TextReader.Options, Configurable {
|
||||
|
||||
private Configuration conf;
|
||||
private String delim =
|
||||
DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_DELIMITER_DEFAULT;
|
||||
DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_DELIMITER_DEFAULT;
|
||||
private Path file = new Path(
|
||||
new File(DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_PATH_DEFAULT)
|
||||
.toURI().toString());
|
||||
new File(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_PATH_DEFAULT).toURI()
|
||||
.toString());
|
||||
|
||||
@Override
|
||||
public void setConf(Configuration conf) {
|
||||
this.conf = conf;
|
||||
String tmpfile = conf.get(DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_READ_PATH,
|
||||
DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_PATH_DEFAULT);
|
||||
String tmpfile =
|
||||
conf.get(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_READ_PATH,
|
||||
DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_PATH_DEFAULT);
|
||||
file = new Path(tmpfile);
|
||||
delim = conf.get(DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_DELIMITER,
|
||||
DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_DELIMITER_DEFAULT);
|
||||
LOG.info("TextFileRegionFormat: read path " + tmpfile.toString());
|
||||
delim = conf.get(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_DELIMITER,
|
||||
DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_DELIMITER_DEFAULT);
|
||||
LOG.info("TextFileRegionAliasMap: read path " + tmpfile.toString());
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -179,7 +182,7 @@ public ReaderOptions delimiter(String delim) {
|
||||
}
|
||||
|
||||
/**
|
||||
* Class specifying writer options for the {@link TextFileRegionFormat}.
|
||||
* Class specifying writer options for the {@link TextFileRegionAliasMap}.
|
||||
*/
|
||||
public static class WriterOptions
|
||||
implements TextWriter.Options, Configurable {
|
||||
@ -187,19 +190,19 @@ public static class WriterOptions
|
||||
private Configuration conf;
|
||||
private String codec = null;
|
||||
private Path file =
|
||||
new Path(DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_PATH_DEFAULT);
|
||||
new Path(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_PATH_DEFAULT);;
|
||||
private String delim =
|
||||
DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_DELIMITER_DEFAULT;
|
||||
DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_DELIMITER_DEFAULT;
|
||||
|
||||
@Override
|
||||
public void setConf(Configuration conf) {
|
||||
this.conf = conf;
|
||||
String tmpfile = conf.get(
|
||||
DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_WRITE_PATH, file.toString());
|
||||
DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_WRITE_PATH, file.toString());
|
||||
file = new Path(tmpfile);
|
||||
codec = conf.get(DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_CODEC);
|
||||
delim = conf.get(DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_DELIMITER,
|
||||
DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_DELIMITER_DEFAULT);
|
||||
codec = conf.get(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_CODEC);
|
||||
delim = conf.get(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_DELIMITER,
|
||||
DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_DELIMITER_DEFAULT);
|
||||
}
|
||||
|
||||
@Override
|
@ -1,4 +1,4 @@
|
||||
/**
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
@ -15,23 +15,13 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hdfs.server.common;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Unstable
|
||||
package org.apache.hadoop.hdfs.server.common.blockaliasmap;
|
||||
|
||||
/**
|
||||
* This class is a stub for reading file regions from the block map.
|
||||
* The AliasMap defines mapping of PROVIDED HDFS blocks to data in remote
|
||||
* storage systems.
|
||||
*/
|
||||
public class FileRegionProvider implements Iterable<FileRegion> {
|
||||
@Override
|
||||
public Iterator<FileRegion> iterator() {
|
||||
return Collections.emptyListIterator();
|
||||
}
|
||||
|
||||
public void refresh() throws IOException {
|
||||
return;
|
||||
}
|
||||
}
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
@ -35,9 +35,9 @@
|
||||
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.server.common.FileRegion;
|
||||
import org.apache.hadoop.hdfs.server.common.FileRegionProvider;
|
||||
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
|
||||
import org.apache.hadoop.hdfs.server.common.TextFileRegionProvider;
|
||||
import org.apache.hadoop.hdfs.server.common.blockaliasmap.BlockAliasMap;
|
||||
import org.apache.hadoop.hdfs.server.common.blockaliasmap.impl.TextFileRegionAliasMap;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
|
||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline;
|
||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
|
||||
@ -68,7 +68,7 @@ public class ProvidedVolumeImpl extends FsVolumeImpl {
|
||||
static class ProvidedBlockPoolSlice {
|
||||
private ProvidedVolumeImpl providedVolume;
|
||||
|
||||
private FileRegionProvider provider;
|
||||
private BlockAliasMap<FileRegion> aliasMap;
|
||||
private Configuration conf;
|
||||
private String bpid;
|
||||
private ReplicaMap bpVolumeMap;
|
||||
@ -77,29 +77,35 @@ static class ProvidedBlockPoolSlice {
|
||||
Configuration conf) {
|
||||
this.providedVolume = volume;
|
||||
bpVolumeMap = new ReplicaMap(new AutoCloseableLock());
|
||||
Class<? extends FileRegionProvider> fmt =
|
||||
conf.getClass(DFSConfigKeys.DFS_PROVIDER_CLASS,
|
||||
TextFileRegionProvider.class, FileRegionProvider.class);
|
||||
provider = ReflectionUtils.newInstance(fmt, conf);
|
||||
Class<? extends BlockAliasMap> fmt =
|
||||
conf.getClass(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_CLASS,
|
||||
TextFileRegionAliasMap.class, BlockAliasMap.class);
|
||||
aliasMap = ReflectionUtils.newInstance(fmt, conf);
|
||||
this.conf = conf;
|
||||
this.bpid = bpid;
|
||||
bpVolumeMap.initBlockPool(bpid);
|
||||
LOG.info("Created provider: " + provider.getClass());
|
||||
LOG.info("Created alias map using class: " + aliasMap.getClass());
|
||||
}
|
||||
|
||||
FileRegionProvider getFileRegionProvider() {
|
||||
return provider;
|
||||
BlockAliasMap<FileRegion> getBlockAliasMap() {
|
||||
return aliasMap;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
void setFileRegionProvider(FileRegionProvider newProvider) {
|
||||
this.provider = newProvider;
|
||||
void setFileRegionProvider(BlockAliasMap<FileRegion> blockAliasMap) {
|
||||
this.aliasMap = blockAliasMap;
|
||||
}
|
||||
|
||||
public void getVolumeMap(ReplicaMap volumeMap,
|
||||
RamDiskReplicaTracker ramDiskReplicaMap, FileSystem remoteFS)
|
||||
throws IOException {
|
||||
Iterator<FileRegion> iter = provider.iterator();
|
||||
BlockAliasMap.Reader<FileRegion> reader = aliasMap.getReader(null);
|
||||
if (reader == null) {
|
||||
LOG.warn("Got null reader from BlockAliasMap " + aliasMap
|
||||
+ "; no blocks will be populated");
|
||||
return;
|
||||
}
|
||||
Iterator<FileRegion> iter = reader.iterator();
|
||||
while (iter.hasNext()) {
|
||||
FileRegion region = iter.next();
|
||||
if (region.getBlockPoolId() != null
|
||||
@ -140,14 +146,20 @@ public void shutdown(BlockListAsLongs blocksListsAsLongs) {
|
||||
public void compileReport(LinkedList<ScanInfo> report,
|
||||
ReportCompiler reportCompiler)
|
||||
throws IOException, InterruptedException {
|
||||
/* refresh the provider and return the list of blocks found.
|
||||
/* refresh the aliasMap and return the list of blocks found.
|
||||
* the assumption here is that the block ids in the external
|
||||
* block map, after the refresh, are consistent with those
|
||||
* from before the refresh, i.e., for blocks which did not change,
|
||||
* the ids remain the same.
|
||||
*/
|
||||
provider.refresh();
|
||||
Iterator<FileRegion> iter = provider.iterator();
|
||||
aliasMap.refresh();
|
||||
BlockAliasMap.Reader<FileRegion> reader = aliasMap.getReader(null);
|
||||
if (reader == null) {
|
||||
LOG.warn("Got null reader from BlockAliasMap " + aliasMap
|
||||
+ "; no blocks will be populated in scan report");
|
||||
return;
|
||||
}
|
||||
Iterator<FileRegion> iter = reader.iterator();
|
||||
while(iter.hasNext()) {
|
||||
reportCompiler.throttle();
|
||||
FileRegion region = iter.next();
|
||||
@ -284,15 +296,15 @@ private class ProviderBlockIteratorImpl
|
||||
|
||||
private String bpid;
|
||||
private String name;
|
||||
private FileRegionProvider provider;
|
||||
private BlockAliasMap<FileRegion> blockAliasMap;
|
||||
private Iterator<FileRegion> blockIterator;
|
||||
private ProvidedBlockIteratorState state;
|
||||
|
||||
ProviderBlockIteratorImpl(String bpid, String name,
|
||||
FileRegionProvider provider) {
|
||||
BlockAliasMap<FileRegion> blockAliasMap) {
|
||||
this.bpid = bpid;
|
||||
this.name = name;
|
||||
this.provider = provider;
|
||||
this.blockAliasMap = blockAliasMap;
|
||||
rewind();
|
||||
}
|
||||
|
||||
@ -330,7 +342,17 @@ public boolean atEnd() {
|
||||
|
||||
@Override
|
||||
public void rewind() {
|
||||
blockIterator = provider.iterator();
|
||||
BlockAliasMap.Reader<FileRegion> reader = null;
|
||||
try {
|
||||
reader = blockAliasMap.getReader(null);
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Exception in getting reader from provided alias map");
|
||||
}
|
||||
if (reader != null) {
|
||||
blockIterator = reader.iterator();
|
||||
} else {
|
||||
blockIterator = null;
|
||||
}
|
||||
state = new ProvidedBlockIteratorState();
|
||||
}
|
||||
|
||||
@ -372,14 +394,14 @@ public void load() throws IOException {
|
||||
@Override
|
||||
public BlockIterator newBlockIterator(String bpid, String name) {
|
||||
return new ProviderBlockIteratorImpl(bpid, name,
|
||||
bpSlices.get(bpid).getFileRegionProvider());
|
||||
bpSlices.get(bpid).getBlockAliasMap());
|
||||
}
|
||||
|
||||
@Override
|
||||
public BlockIterator loadBlockIterator(String bpid, String name)
|
||||
throws IOException {
|
||||
ProviderBlockIteratorImpl iter = new ProviderBlockIteratorImpl(bpid, name,
|
||||
bpSlices.get(bpid).getFileRegionProvider());
|
||||
bpSlices.get(bpid).getBlockAliasMap());
|
||||
iter.load();
|
||||
return iter;
|
||||
}
|
||||
@ -425,8 +447,8 @@ void getVolumeMap(String bpid, ReplicaMap volumeMap,
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
FileRegionProvider getFileRegionProvider(String bpid) throws IOException {
|
||||
return getProvidedBlockPoolSlice(bpid).getFileRegionProvider();
|
||||
BlockAliasMap<FileRegion> getBlockFormat(String bpid) throws IOException {
|
||||
return getProvidedBlockPoolSlice(bpid).getBlockAliasMap();
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -571,12 +593,12 @@ public static boolean containsBlock(URI volumeURI, URI blockURI) {
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
void setFileRegionProvider(String bpid, FileRegionProvider provider)
|
||||
throws IOException {
|
||||
void setFileRegionProvider(String bpid,
|
||||
BlockAliasMap<FileRegion> blockAliasMap) throws IOException {
|
||||
ProvidedBlockPoolSlice bp = bpSlices.get(bpid);
|
||||
if (bp == null) {
|
||||
throw new IOException("block pool " + bpid + " is not found");
|
||||
}
|
||||
bp.setFileRegionProvider(provider);
|
||||
bp.setFileRegionProvider(blockAliasMap);
|
||||
}
|
||||
}
|
||||
|
@ -4629,26 +4629,6 @@
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.namenode.block.provider.class</name>
|
||||
<value>org.apache.hadoop.hdfs.server.blockmanagement.BlockFormatProvider</value>
|
||||
<description>
|
||||
The class that is used to load provided blocks in the Namenode.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.provider.class</name>
|
||||
<value>org.apache.hadoop.hdfs.server.common.TextFileRegionProvider</value>
|
||||
<description>
|
||||
The class that is used to load information about blocks stored in
|
||||
provided storages.
|
||||
org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.TextFileRegionProvider
|
||||
is used as the default, which expects the blocks to be specified
|
||||
using a delimited text file.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.provided.df.class</name>
|
||||
<value>org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.DefaultProvidedVolumeDF</value>
|
||||
@ -4666,12 +4646,12 @@
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.provided.blockformat.class</name>
|
||||
<value>org.apache.hadoop.hdfs.server.common.TextFileRegionFormat</value>
|
||||
<name>dfs.provided.aliasmap.class</name>
|
||||
<value>org.apache.hadoop.hdfs.server.common.blockaliasmap.impl.TextFileRegionAliasMap</value>
|
||||
<description>
|
||||
The class that is used to specify the input format of the blocks on
|
||||
provided storages. The default is
|
||||
org.apache.hadoop.hdfs.server.common.TextFileRegionFormat which uses
|
||||
org.apache.hadoop.hdfs.server.common.TextFileRegionAliasMap which uses
|
||||
file regions to describe blocks. The file regions are specified as a
|
||||
delimited text file. Each file region is a 6-tuple containing the
|
||||
block id, remote file path, offset into file, length of block, the
|
||||
@ -4681,7 +4661,7 @@
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.provided.textprovider.delimiter</name>
|
||||
<name>dfs.provided.aliasmap.text.delimiter</name>
|
||||
<value>,</value>
|
||||
<description>
|
||||
The delimiter used when the provided block map is specified as
|
||||
@ -4690,7 +4670,7 @@
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.provided.textprovider.read.path</name>
|
||||
<name>dfs.provided.aliasmap.text.read.path</name>
|
||||
<value></value>
|
||||
<description>
|
||||
The path specifying the provided block map as a text file, specified as
|
||||
@ -4699,7 +4679,7 @@
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.provided.textprovider.read.codec</name>
|
||||
<name>dfs.provided.aliasmap.text.codec</name>
|
||||
<value></value>
|
||||
<description>
|
||||
The codec used to de-compress the provided block map.
|
||||
@ -4707,7 +4687,7 @@
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.provided.textprovider.write.path</name>
|
||||
<name>dfs.provided.aliasmap.text.write.path</name>
|
||||
<value></value>
|
||||
<description>
|
||||
The path to which the provided block map should be written as a text
|
||||
|
@ -17,20 +17,19 @@
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||
|
||||
import org.apache.hadoop.conf.Configurable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.StorageType;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.server.common.blockaliasmap.BlockAliasMap;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.TestProvidedImpl;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
||||
import org.apache.hadoop.hdfs.util.RwLock;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Iterator;
|
||||
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
@ -47,37 +46,6 @@ public class TestProvidedStorageMap {
|
||||
private RwLock nameSystemLock;
|
||||
private String providedStorageID;
|
||||
|
||||
static class TestBlockProvider extends BlockProvider
|
||||
implements Configurable {
|
||||
|
||||
@Override
|
||||
public void setConf(Configuration conf) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Configuration getConf() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterator<Block> iterator() {
|
||||
return new Iterator<Block>() {
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
return false;
|
||||
}
|
||||
@Override
|
||||
public Block next() {
|
||||
return null;
|
||||
}
|
||||
@Override
|
||||
public void remove() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
providedStorageID = DFSConfigKeys.DFS_PROVIDER_STORAGEUUID_DEFAULT;
|
||||
@ -85,8 +53,9 @@ public void setup() {
|
||||
conf.set(DFSConfigKeys.DFS_PROVIDER_STORAGEUUID,
|
||||
providedStorageID);
|
||||
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_PROVIDED_ENABLED, true);
|
||||
conf.setClass(DFSConfigKeys.DFS_NAMENODE_BLOCK_PROVIDER_CLASS,
|
||||
TestBlockProvider.class, BlockProvider.class);
|
||||
conf.setClass(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_CLASS,
|
||||
TestProvidedImpl.TestFileRegionBlockAliasMap.class,
|
||||
BlockAliasMap.class);
|
||||
|
||||
bm = mock(BlockManager.class);
|
||||
nameSystemLock = mock(RwLock.class);
|
||||
|
@ -15,7 +15,7 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.common;
|
||||
package org.apache.hadoop.hdfs.server.common.blockaliasmap.impl;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
@ -25,7 +25,8 @@
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.server.common.TextFileRegionFormat.*;
|
||||
import org.apache.hadoop.hdfs.server.common.blockaliasmap.impl.TextFileRegionAliasMap.*;
|
||||
import org.apache.hadoop.hdfs.server.common.FileRegion;
|
||||
import org.apache.hadoop.io.DataInputBuffer;
|
||||
import org.apache.hadoop.io.DataOutputBuffer;
|
||||
import org.apache.hadoop.io.compress.CompressionCodec;
|
||||
@ -36,13 +37,13 @@
|
||||
/**
|
||||
* Test for the text based block format for provided block maps.
|
||||
*/
|
||||
public class TestTextBlockFormat {
|
||||
public class TestTextBlockAliasMap {
|
||||
|
||||
static final Path OUTFILE = new Path("hdfs://dummyServer:0000/dummyFile.txt");
|
||||
|
||||
void check(TextWriter.Options opts, final Path vp,
|
||||
final Class<? extends CompressionCodec> vc) throws IOException {
|
||||
TextFileRegionFormat mFmt = new TextFileRegionFormat() {
|
||||
TextFileRegionAliasMap mFmt = new TextFileRegionAliasMap() {
|
||||
@Override
|
||||
public TextWriter createWriter(Path file, CompressionCodec codec,
|
||||
String delim, Configuration conf) throws IOException {
|
||||
@ -63,7 +64,7 @@ public void testWriterOptions() throws Exception {
|
||||
TextWriter.Options opts = TextWriter.defaults();
|
||||
assertTrue(opts instanceof WriterOptions);
|
||||
WriterOptions wopts = (WriterOptions) opts;
|
||||
Path def = new Path(DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_PATH_DEFAULT);
|
||||
Path def = new Path(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_PATH_DEFAULT);
|
||||
assertEquals(def, wopts.getFile());
|
||||
assertNull(wopts.getCodec());
|
||||
|
@ -52,11 +52,12 @@
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.StorageType;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.server.common.FileRegion;
|
||||
import org.apache.hadoop.hdfs.server.common.FileRegionProvider;
|
||||
import org.apache.hadoop.hdfs.server.common.Storage;
|
||||
import org.apache.hadoop.hdfs.server.common.blockaliasmap.BlockAliasMap;
|
||||
import org.apache.hadoop.hdfs.server.datanode.BlockScanner;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DNConf;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
@ -168,49 +169,66 @@ public void resetBlockCount(int numBlocks) {
|
||||
}
|
||||
|
||||
/**
|
||||
* A simple FileRegion provider for tests.
|
||||
* A simple FileRegion BlockAliasMap for tests.
|
||||
*/
|
||||
public static class TestFileRegionProvider
|
||||
extends FileRegionProvider implements Configurable {
|
||||
public static class TestFileRegionBlockAliasMap
|
||||
extends BlockAliasMap<FileRegion> {
|
||||
|
||||
private Configuration conf;
|
||||
private int minId;
|
||||
private int numBlocks;
|
||||
private Iterator<FileRegion> suppliedIterator;
|
||||
|
||||
TestFileRegionProvider() {
|
||||
TestFileRegionBlockAliasMap() {
|
||||
this(null, MIN_BLK_ID, NUM_PROVIDED_BLKS);
|
||||
}
|
||||
|
||||
TestFileRegionProvider(Iterator<FileRegion> iterator, int minId,
|
||||
int numBlocks) {
|
||||
TestFileRegionBlockAliasMap(Iterator<FileRegion> iterator, int minId,
|
||||
int numBlocks) {
|
||||
this.suppliedIterator = iterator;
|
||||
this.minId = minId;
|
||||
this.numBlocks = numBlocks;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterator<FileRegion> iterator() {
|
||||
if (suppliedIterator == null) {
|
||||
return new TestFileRegionIterator(providedBasePath, minId, numBlocks);
|
||||
} else {
|
||||
return suppliedIterator;
|
||||
}
|
||||
public Reader<FileRegion> getReader(Reader.Options opts)
|
||||
throws IOException {
|
||||
|
||||
BlockAliasMap.Reader<FileRegion> reader =
|
||||
new BlockAliasMap.Reader<FileRegion>() {
|
||||
@Override
|
||||
public Iterator<FileRegion> iterator() {
|
||||
if (suppliedIterator == null) {
|
||||
return new TestFileRegionIterator(providedBasePath, minId,
|
||||
numBlocks);
|
||||
} else {
|
||||
return suppliedIterator;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public FileRegion resolve(Block ident) throws IOException {
|
||||
return null;
|
||||
}
|
||||
};
|
||||
return reader;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setConf(Configuration conf) {
|
||||
this.conf = conf;
|
||||
public Writer<FileRegion> getWriter(Writer.Options opts)
|
||||
throws IOException {
|
||||
// not implemented
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Configuration getConf() {
|
||||
return conf;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void refresh() {
|
||||
//do nothing!
|
||||
public void refresh() throws IOException {
|
||||
// do nothing!
|
||||
}
|
||||
|
||||
public void setMinBlkId(int minId) {
|
||||
@ -359,8 +377,8 @@ public void setUp() throws IOException {
|
||||
new ShortCircuitRegistry(conf);
|
||||
when(datanode.getShortCircuitRegistry()).thenReturn(shortCircuitRegistry);
|
||||
|
||||
conf.setClass(DFSConfigKeys.DFS_PROVIDER_CLASS,
|
||||
TestFileRegionProvider.class, FileRegionProvider.class);
|
||||
this.conf.setClass(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_CLASS,
|
||||
TestFileRegionBlockAliasMap.class, BlockAliasMap.class);
|
||||
conf.setClass(DFSConfigKeys.DFS_PROVIDER_DF_CLASS,
|
||||
TestProvidedVolumeDF.class, ProvidedVolumeDF.class);
|
||||
|
||||
@ -496,12 +514,13 @@ public void testRefresh() throws IOException {
|
||||
conf.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY, 1);
|
||||
for (int i = 0; i < providedVolumes.size(); i++) {
|
||||
ProvidedVolumeImpl vol = (ProvidedVolumeImpl) providedVolumes.get(i);
|
||||
TestFileRegionProvider provider = (TestFileRegionProvider)
|
||||
vol.getFileRegionProvider(BLOCK_POOL_IDS[CHOSEN_BP_ID]);
|
||||
TestFileRegionBlockAliasMap testBlockFormat =
|
||||
(TestFileRegionBlockAliasMap) vol
|
||||
.getBlockFormat(BLOCK_POOL_IDS[CHOSEN_BP_ID]);
|
||||
//equivalent to two new blocks appearing
|
||||
provider.setBlockCount(NUM_PROVIDED_BLKS + 2);
|
||||
testBlockFormat.setBlockCount(NUM_PROVIDED_BLKS + 2);
|
||||
//equivalent to deleting the first block
|
||||
provider.setMinBlkId(MIN_BLK_ID + 1);
|
||||
testBlockFormat.setMinBlkId(MIN_BLK_ID + 1);
|
||||
|
||||
DirectoryScanner scanner = new DirectoryScanner(datanode, dataset, conf);
|
||||
scanner.reconcile();
|
||||
@ -525,7 +544,7 @@ private int getBlocksInProvidedVolumes(String basePath, int numBlocks,
|
||||
for (int i = 0; i < providedVolumes.size(); i++) {
|
||||
ProvidedVolumeImpl vol = (ProvidedVolumeImpl) providedVolumes.get(i);
|
||||
vol.setFileRegionProvider(BLOCK_POOL_IDS[CHOSEN_BP_ID],
|
||||
new TestFileRegionProvider(fileRegionIterator, minBlockId,
|
||||
new TestFileRegionBlockAliasMap(fileRegionIterator, minBlockId,
|
||||
numBlocks));
|
||||
ReplicaMap volumeMap = new ReplicaMap(new AutoCloseableLock());
|
||||
vol.getVolumeMap(BLOCK_POOL_IDS[CHOSEN_BP_ID], volumeMap, null);
|
||||
|
@ -29,7 +29,7 @@
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.server.common.BlockFormat;
|
||||
import org.apache.hadoop.hdfs.server.common.blockaliasmap.BlockAliasMap;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
import org.apache.hadoop.util.Tool;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
@ -103,7 +103,7 @@ public int run(String[] argv) throws Exception {
|
||||
break;
|
||||
case "b":
|
||||
opts.blocks(
|
||||
Class.forName(o.getValue()).asSubclass(BlockFormat.class));
|
||||
Class.forName(o.getValue()).asSubclass(BlockAliasMap.class));
|
||||
break;
|
||||
case "i":
|
||||
opts.blockIds(
|
||||
|
@ -44,8 +44,8 @@
|
||||
import org.apache.hadoop.fs.LocalFileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.server.common.BlockFormat;
|
||||
import org.apache.hadoop.hdfs.server.common.FileRegion;
|
||||
import org.apache.hadoop.hdfs.server.common.blockaliasmap.BlockAliasMap;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSImageFormatProtobuf.SectionName;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.CacheManagerSection;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.FileSummary;
|
||||
@ -88,7 +88,7 @@ public class ImageWriter implements Closeable {
|
||||
private final long startBlock;
|
||||
private final long startInode;
|
||||
private final UGIResolver ugis;
|
||||
private final BlockFormat.Writer<FileRegion> blocks;
|
||||
private final BlockAliasMap.Writer<FileRegion> blocks;
|
||||
private final BlockResolver blockIds;
|
||||
private final Map<Long, DirEntry.Builder> dircache;
|
||||
private final TrackedOutputStream<DigestOutputStream> raw;
|
||||
@ -155,8 +155,8 @@ public ImageWriter(Options opts) throws IOException {
|
||||
ugis = null == opts.ugis
|
||||
? ReflectionUtils.newInstance(opts.ugisClass, opts.getConf())
|
||||
: opts.ugis;
|
||||
BlockFormat<FileRegion> fmt = null == opts.blocks
|
||||
? ReflectionUtils.newInstance(opts.blockFormatClass, opts.getConf())
|
||||
BlockAliasMap<FileRegion> fmt = null == opts.blocks
|
||||
? ReflectionUtils.newInstance(opts.aliasMap, opts.getConf())
|
||||
: opts.blocks;
|
||||
blocks = fmt.getWriter(null);
|
||||
blockIds = null == opts.blockIds
|
||||
@ -509,10 +509,10 @@ public static class Options implements Configurable {
|
||||
private long startInode;
|
||||
private UGIResolver ugis;
|
||||
private Class<? extends UGIResolver> ugisClass;
|
||||
private BlockFormat<FileRegion> blocks;
|
||||
private BlockAliasMap<FileRegion> blocks;
|
||||
|
||||
@SuppressWarnings("rawtypes")
|
||||
private Class<? extends BlockFormat> blockFormatClass;
|
||||
private Class<? extends BlockAliasMap> aliasMap;
|
||||
private BlockResolver blockIds;
|
||||
private Class<? extends BlockResolver> blockIdsClass;
|
||||
private FSImageCompression compress =
|
||||
@ -524,7 +524,6 @@ protected Options() {
|
||||
@Override
|
||||
public void setConf(Configuration conf) {
|
||||
this.conf = conf;
|
||||
//long lastTxn = conf.getLong(LAST_TXN, 0L);
|
||||
String def = new File("hdfs/name").toURI().toString();
|
||||
outdir = new Path(conf.get(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, def));
|
||||
startBlock = conf.getLong(FixedBlockResolver.START_BLOCK, (1L << 30) + 1);
|
||||
@ -532,9 +531,9 @@ public void setConf(Configuration conf) {
|
||||
maxdircache = conf.getInt(CACHE_ENTRY, 100);
|
||||
ugisClass = conf.getClass(UGI_CLASS,
|
||||
SingleUGIResolver.class, UGIResolver.class);
|
||||
blockFormatClass = conf.getClass(
|
||||
DFSConfigKeys.DFS_PROVIDER_BLK_FORMAT_CLASS,
|
||||
NullBlockFormat.class, BlockFormat.class);
|
||||
aliasMap = conf.getClass(
|
||||
DFSConfigKeys.DFS_PROVIDED_ALIASMAP_CLASS,
|
||||
NullBlockAliasMap.class, BlockAliasMap.class);
|
||||
blockIdsClass = conf.getClass(BLOCK_RESOLVER_CLASS,
|
||||
FixedBlockResolver.class, BlockResolver.class);
|
||||
}
|
||||
@ -584,14 +583,14 @@ public Options blockIds(Class<? extends BlockResolver> blockIdsClass) {
|
||||
return this;
|
||||
}
|
||||
|
||||
public Options blocks(BlockFormat<FileRegion> blocks) {
|
||||
public Options blocks(BlockAliasMap<FileRegion> blocks) {
|
||||
this.blocks = blocks;
|
||||
return this;
|
||||
}
|
||||
|
||||
@SuppressWarnings("rawtypes")
|
||||
public Options blocks(Class<? extends BlockFormat> blocksClass) {
|
||||
this.blockFormatClass = blocksClass;
|
||||
public Options blocks(Class<? extends BlockAliasMap> blocksClass) {
|
||||
this.aliasMap = blocksClass;
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -22,17 +22,16 @@
|
||||
import java.util.NoSuchElementException;
|
||||
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.server.common.BlockFormat;
|
||||
import org.apache.hadoop.hdfs.server.common.BlockFormat.Reader.Options;
|
||||
import org.apache.hadoop.hdfs.server.common.FileRegion;
|
||||
import org.apache.hadoop.hdfs.server.common.blockaliasmap.BlockAliasMap;
|
||||
|
||||
/**
|
||||
* Null sink for region information emitted from FSImage.
|
||||
*/
|
||||
public class NullBlockFormat extends BlockFormat<FileRegion> {
|
||||
public class NullBlockAliasMap extends BlockAliasMap<FileRegion> {
|
||||
|
||||
@Override
|
||||
public Reader<FileRegion> getReader(Options opts) throws IOException {
|
||||
public Reader<FileRegion> getReader(Reader.Options opts) throws IOException {
|
||||
return new Reader<FileRegion>() {
|
||||
@Override
|
||||
public Iterator<FileRegion> iterator() {
|
@ -24,8 +24,8 @@
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
|
||||
import org.apache.hadoop.hdfs.server.common.BlockFormat;
|
||||
import org.apache.hadoop.hdfs.server.common.FileRegion;
|
||||
import org.apache.hadoop.hdfs.server.common.blockaliasmap.BlockAliasMap;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection.INode;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection.INodeDirectory;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection.INodeFile;
|
||||
@ -70,7 +70,7 @@ void accept(long id) {
|
||||
}
|
||||
|
||||
public INode toINode(UGIResolver ugi, BlockResolver blk,
|
||||
BlockFormat.Writer<FileRegion> out, String blockPoolID)
|
||||
BlockAliasMap.Writer<FileRegion> out, String blockPoolID)
|
||||
throws IOException {
|
||||
if (stat.isFile()) {
|
||||
return toFile(ugi, blk, out, blockPoolID);
|
||||
@ -101,14 +101,14 @@ public int hashCode() {
|
||||
|
||||
void writeBlock(long blockId, long offset, long length,
|
||||
long genStamp, String blockPoolID,
|
||||
BlockFormat.Writer<FileRegion> out) throws IOException {
|
||||
BlockAliasMap.Writer<FileRegion> out) throws IOException {
|
||||
FileStatus s = getFileStatus();
|
||||
out.store(new FileRegion(blockId, s.getPath(), offset, length,
|
||||
blockPoolID, genStamp));
|
||||
}
|
||||
|
||||
INode toFile(UGIResolver ugi, BlockResolver blk,
|
||||
BlockFormat.Writer<FileRegion> out, String blockPoolID)
|
||||
BlockAliasMap.Writer<FileRegion> out, String blockPoolID)
|
||||
throws IOException {
|
||||
final FileStatus s = getFileStatus();
|
||||
// TODO should this store resolver's user/group?
|
||||
|
@ -44,13 +44,9 @@
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockFormatProvider;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockProvider;
|
||||
import org.apache.hadoop.hdfs.server.common.BlockFormat;
|
||||
import org.apache.hadoop.hdfs.server.common.FileRegionProvider;
|
||||
import org.apache.hadoop.hdfs.server.common.TextFileRegionFormat;
|
||||
import org.apache.hadoop.hdfs.server.common.TextFileRegionProvider;
|
||||
import org.apache.hadoop.hdfs.server.common.blockaliasmap.BlockAliasMap;
|
||||
import org.apache.hadoop.hdfs.server.common.blockaliasmap.impl.TextFileRegionAliasMap;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY;
|
||||
@ -103,18 +99,13 @@ public void setSeed() throws Exception {
|
||||
DFSConfigKeys.DFS_PROVIDER_STORAGEUUID_DEFAULT);
|
||||
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_PROVIDED_ENABLED, true);
|
||||
|
||||
conf.setClass(DFSConfigKeys.DFS_NAMENODE_BLOCK_PROVIDER_CLASS,
|
||||
BlockFormatProvider.class, BlockProvider.class);
|
||||
conf.setClass(DFSConfigKeys.DFS_PROVIDER_CLASS,
|
||||
TextFileRegionProvider.class, FileRegionProvider.class);
|
||||
conf.setClass(DFSConfigKeys.DFS_PROVIDER_BLK_FORMAT_CLASS,
|
||||
TextFileRegionFormat.class, BlockFormat.class);
|
||||
|
||||
conf.set(DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_WRITE_PATH,
|
||||
conf.setClass(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_CLASS,
|
||||
TextFileRegionAliasMap.class, BlockAliasMap.class);
|
||||
conf.set(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_WRITE_PATH,
|
||||
BLOCKFILE.toString());
|
||||
conf.set(DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_READ_PATH,
|
||||
conf.set(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_READ_PATH,
|
||||
BLOCKFILE.toString());
|
||||
conf.set(DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_DELIMITER, ",");
|
||||
conf.set(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_DELIMITER, ",");
|
||||
|
||||
conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR_PROVIDED,
|
||||
new File(NAMEPATH.toUri()).toString());
|
||||
@ -167,7 +158,7 @@ void createImage(TreeWalk t, Path out,
|
||||
ImageWriter.Options opts = ImageWriter.defaults();
|
||||
opts.setConf(conf);
|
||||
opts.output(out.toString())
|
||||
.blocks(TextFileRegionFormat.class)
|
||||
.blocks(TextFileRegionAliasMap.class)
|
||||
.blockIds(blockIdsClass);
|
||||
try (ImageWriter w = new ImageWriter(opts)) {
|
||||
for (TreePath e : t) {
|
||||
|
Loading…
Reference in New Issue
Block a user