HDFS-11190. [READ] Namenode support for data stored in external stores.

This commit is contained in:
Virajith Jalaparti 2017-04-21 11:12:36 -07:00 committed by Chris Douglas
parent 8da3a6e314
commit d65df0f273
15 changed files with 1292 additions and 86 deletions

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.protocol;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import com.google.common.base.Preconditions;
@ -62,40 +63,50 @@ public class LocatedBlock {
public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs) {
// By default, startOffset is unknown(-1) and corrupt is false.
this(b, locs, null, null, -1, false, EMPTY_LOCS);
this(b, convert(locs, null, null), null, null, -1, false, EMPTY_LOCS);
}
public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs,
String[] storageIDs, StorageType[] storageTypes) {
this(b, locs, storageIDs, storageTypes, -1, false, EMPTY_LOCS);
this(b, convert(locs, storageIDs, storageTypes),
storageIDs, storageTypes, -1, false, EMPTY_LOCS);
}
public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs, String[] storageIDs,
StorageType[] storageTypes, long startOffset,
public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs,
String[] storageIDs, StorageType[] storageTypes, long startOffset,
boolean corrupt, DatanodeInfo[] cachedLocs) {
this(b, convert(locs, storageIDs, storageTypes),
storageIDs, storageTypes, startOffset, corrupt,
null == cachedLocs || 0 == cachedLocs.length ? EMPTY_LOCS : cachedLocs);
}
public LocatedBlock(ExtendedBlock b, DatanodeInfoWithStorage[] locs,
String[] storageIDs, StorageType[] storageTypes, long startOffset,
boolean corrupt, DatanodeInfo[] cachedLocs) {
this.b = b;
this.offset = startOffset;
this.corrupt = corrupt;
if (locs==null) {
this.locs = EMPTY_LOCS;
} else {
this.locs = new DatanodeInfoWithStorage[locs.length];
for(int i = 0; i < locs.length; i++) {
DatanodeInfo di = locs[i];
DatanodeInfoWithStorage storage = new DatanodeInfoWithStorage(di,
storageIDs != null ? storageIDs[i] : null,
storageTypes != null ? storageTypes[i] : null);
this.locs[i] = storage;
}
}
this.locs = null == locs ? EMPTY_LOCS : locs;
this.storageIDs = storageIDs;
this.storageTypes = storageTypes;
this.cachedLocs = null == cachedLocs || 0 == cachedLocs.length
? EMPTY_LOCS
: cachedLocs;
}
if (cachedLocs == null || cachedLocs.length == 0) {
this.cachedLocs = EMPTY_LOCS;
} else {
this.cachedLocs = cachedLocs;
private static DatanodeInfoWithStorage[] convert(
DatanodeInfo[] infos, String[] storageIDs, StorageType[] storageTypes) {
if (null == infos) {
return EMPTY_LOCS;
}
DatanodeInfoWithStorage[] ret = new DatanodeInfoWithStorage[infos.length];
for(int i = 0; i < infos.length; i++) {
ret[i] = new DatanodeInfoWithStorage(infos[i],
storageIDs != null ? storageIDs[i] : null,
storageTypes != null ? storageTypes[i] : null);
}
return ret;
}
public Token<BlockTokenIdentifier> getBlockToken() {
@ -145,6 +156,51 @@ public void updateCachedStorageInfo() {
}
}
/**
* Comparator that ensures that a PROVIDED storage type is greater than
* any other storage type. Any other storage types are considered equal.
*/
private class ProvidedLastComparator
implements Comparator<DatanodeInfoWithStorage> {
@Override
public int compare(DatanodeInfoWithStorage dns1,
DatanodeInfoWithStorage dns2) {
if (StorageType.PROVIDED.equals(dns1.getStorageType())
&& !StorageType.PROVIDED.equals(dns2.getStorageType())) {
return 1;
}
if (!StorageType.PROVIDED.equals(dns1.getStorageType())
&& StorageType.PROVIDED.equals(dns2.getStorageType())) {
return -1;
}
// Storage types of dns1 and dns2 are now both provided or not provided;
// thus, are essentially equal for the purpose of this comparator.
return 0;
}
}
/**
* Moves all locations that have {@link StorageType}
* {@code PROVIDED} to the end of the locations array without
* changing the relative ordering of the remaining locations
* Only the first {@code activeLen} locations are considered.
* The caller must immediately invoke {@link
* org.apache.hadoop.hdfs.protocol.LocatedBlock#updateCachedStorageInfo}
* to update the cached Storage ID/Type arrays.
* @param activeLen
*/
public void moveProvidedToEnd(int activeLen) {
if (activeLen <= 0) {
return;
}
// as this is a stable sort, for elements that are equal,
// the current order of the elements is maintained
Arrays.sort(locs, 0,
(activeLen < locs.length) ? activeLen : locs.length,
new ProvidedLastComparator());
}
public long getStartOffset() {
return offset;
}

View File

@ -328,6 +328,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
"dfs.namenode.edits.asynclogging";
public static final boolean DFS_NAMENODE_EDITS_ASYNC_LOGGING_DEFAULT = true;
public static final String DFS_NAMENODE_PROVIDED_ENABLED = "dfs.namenode.provided.enabled";
public static final boolean DFS_NAMENODE_PROVIDED_ENABLED_DEFAULT = false;
public static final String DFS_NAMENODE_BLOCK_PROVIDER_CLASS = "dfs.namenode.block.provider.class";
public static final String DFS_PROVIDER_CLASS = "dfs.provider.class";
public static final String DFS_PROVIDER_DF_CLASS = "dfs.provided.df.class";
public static final String DFS_PROVIDER_STORAGEUUID = "dfs.provided.storage.id";

View File

@ -0,0 +1,91 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.common.BlockAlias;
import org.apache.hadoop.hdfs.server.common.BlockFormat;
import org.apache.hadoop.hdfs.server.common.TextFileRegionFormat;
import org.apache.hadoop.util.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Loads provided blocks from a {@link BlockFormat}.
*/
public class BlockFormatProvider extends BlockProvider
implements Configurable {
private Configuration conf;
private BlockFormat<? extends BlockAlias> blockFormat;
public static final Logger LOG =
LoggerFactory.getLogger(BlockFormatProvider.class);
@Override
@SuppressWarnings({ "rawtypes", "unchecked" })
public void setConf(Configuration conf) {
Class<? extends BlockFormat> c = conf.getClass(
DFSConfigKeys.DFS_PROVIDER_BLK_FORMAT_CLASS,
TextFileRegionFormat.class, BlockFormat.class);
blockFormat = ReflectionUtils.newInstance(c, conf);
LOG.info("Loaded BlockFormat class : " + c.getClass().getName());
this.conf = conf;
}
@Override
public Configuration getConf() {
return conf;
}
@Override
public Iterator<Block> iterator() {
try {
final BlockFormat.Reader<? extends BlockAlias> reader =
blockFormat.getReader(null);
return new Iterator<Block>() {
private final Iterator<? extends BlockAlias> inner = reader.iterator();
@Override
public boolean hasNext() {
return inner.hasNext();
}
@Override
public Block next() {
return inner.next().getBlock();
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
} catch (IOException e) {
throw new RuntimeException("Failed to read provided blocks", e);
}
}
}

View File

@ -435,6 +435,9 @@ public long getTotalECBlockGroups() {
*/
private final short minReplicationToBeInMaintenance;
/** Storages accessible from multiple DNs. */
private final ProvidedStorageMap providedStorageMap;
public BlockManager(final Namesystem namesystem, boolean haEnabled,
final Configuration conf) throws IOException {
this.namesystem = namesystem;
@ -467,6 +470,8 @@ public BlockManager(final Namesystem namesystem, boolean haEnabled,
blockTokenSecretManager = createBlockTokenSecretManager(conf);
providedStorageMap = new ProvidedStorageMap(namesystem, this, conf);
this.maxCorruptFilesReturned = conf.getInt(
DFSConfigKeys.DFS_DEFAULT_MAX_CORRUPT_FILES_RETURNED_KEY,
DFSConfigKeys.DFS_DEFAULT_MAX_CORRUPT_FILES_RETURNED);
@ -1144,7 +1149,7 @@ public LocatedBlock convertLastBlockToUnderConstruction(
final long fileLength = bc.computeContentSummary(
getStoragePolicySuite()).getLength();
final long pos = fileLength - lastBlock.getNumBytes();
return createLocatedBlock(lastBlock, pos,
return createLocatedBlock(null, lastBlock, pos,
BlockTokenIdentifier.AccessMode.WRITE);
}
@ -1165,8 +1170,10 @@ private List<DatanodeStorageInfo> getValidLocations(BlockInfo block) {
return locations;
}
private List<LocatedBlock> createLocatedBlockList(final BlockInfo[] blocks,
final long offset, final long length, final int nrBlocksToReturn,
private void createLocatedBlockList(
LocatedBlockBuilder locatedBlocks,
final BlockInfo[] blocks,
final long offset, final long length,
final AccessMode mode) throws IOException {
int curBlk;
long curPos = 0, blkSize = 0;
@ -1181,21 +1188,22 @@ private List<LocatedBlock> createLocatedBlockList(final BlockInfo[] blocks,
}
if (nrBlocks > 0 && curBlk == nrBlocks) // offset >= end of file
return Collections.emptyList();
return;
long endOff = offset + length;
List<LocatedBlock> results = new ArrayList<>(blocks.length);
do {
results.add(createLocatedBlock(blocks[curBlk], curPos, mode));
locatedBlocks.addBlock(
createLocatedBlock(locatedBlocks, blocks[curBlk], curPos, mode));
curPos += blocks[curBlk].getNumBytes();
curBlk++;
} while (curPos < endOff
&& curBlk < blocks.length
&& results.size() < nrBlocksToReturn);
return results;
&& !locatedBlocks.isBlockMax());
return;
}
private LocatedBlock createLocatedBlock(final BlockInfo[] blocks,
private LocatedBlock createLocatedBlock(LocatedBlockBuilder locatedBlocks,
final BlockInfo[] blocks,
final long endPos, final AccessMode mode) throws IOException {
int curBlk;
long curPos = 0;
@ -1208,12 +1216,13 @@ private LocatedBlock createLocatedBlock(final BlockInfo[] blocks,
curPos += blkSize;
}
return createLocatedBlock(blocks[curBlk], curPos, mode);
return createLocatedBlock(locatedBlocks, blocks[curBlk], curPos, mode);
}
private LocatedBlock createLocatedBlock(final BlockInfo blk, final long pos,
final AccessMode mode) throws IOException {
final LocatedBlock lb = createLocatedBlock(blk, pos);
private LocatedBlock createLocatedBlock(LocatedBlockBuilder locatedBlocks,
final BlockInfo blk, final long pos, final AccessMode mode)
throws IOException {
final LocatedBlock lb = createLocatedBlock(locatedBlocks, blk, pos);
if (mode != null) {
setBlockToken(lb, mode);
}
@ -1221,21 +1230,24 @@ private LocatedBlock createLocatedBlock(final BlockInfo blk, final long pos,
}
/** @return a LocatedBlock for the given block */
private LocatedBlock createLocatedBlock(final BlockInfo blk, final long pos)
throws IOException {
private LocatedBlock createLocatedBlock(LocatedBlockBuilder locatedBlocks,
final BlockInfo blk, final long pos) throws IOException {
if (!blk.isComplete()) {
final BlockUnderConstructionFeature uc = blk.getUnderConstructionFeature();
if (blk.isStriped()) {
final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations();
final ExtendedBlock eb = new ExtendedBlock(getBlockPoolId(),
blk);
//TODO use locatedBlocks builder??
return newLocatedStripedBlock(eb, storages, uc.getBlockIndices(), pos,
false);
} else {
final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations();
final ExtendedBlock eb = new ExtendedBlock(getBlockPoolId(),
blk);
return newLocatedBlock(eb, storages, pos, false);
return null == locatedBlocks
? newLocatedBlock(eb, storages, pos, false)
: locatedBlocks.newLocatedBlock(eb, storages, pos, false);
}
}
@ -1299,9 +1311,10 @@ private LocatedBlock createLocatedBlock(final BlockInfo blk, final long pos)
" numCorrupt: " + numCorruptNodes +
" numCorruptRepls: " + numCorruptReplicas;
final ExtendedBlock eb = new ExtendedBlock(getBlockPoolId(), blk);
return blockIndices == null ?
newLocatedBlock(eb, machines, pos, isCorrupt) :
newLocatedStripedBlock(eb, machines, blockIndices, pos, isCorrupt);
return blockIndices == null
? null == locatedBlocks ? newLocatedBlock(eb, machines, pos, isCorrupt)
: locatedBlocks.newLocatedBlock(eb, machines, pos, isCorrupt)
: newLocatedStripedBlock(eb, machines, blockIndices, pos, isCorrupt);
}
/** Create a LocatedBlocks. */
@ -1323,27 +1336,31 @@ public LocatedBlocks createLocatedBlocks(final BlockInfo[] blocks,
LOG.debug("blocks = {}", java.util.Arrays.asList(blocks));
}
final AccessMode mode = needBlockToken? BlockTokenIdentifier.AccessMode.READ: null;
final List<LocatedBlock> locatedblocks = createLocatedBlockList(
blocks, offset, length, Integer.MAX_VALUE, mode);
final LocatedBlock lastlb;
final boolean isComplete;
LocatedBlockBuilder locatedBlocks = providedStorageMap
.newLocatedBlocks(Integer.MAX_VALUE)
.fileLength(fileSizeExcludeBlocksUnderConstruction)
.lastUC(isFileUnderConstruction)
.encryption(feInfo)
.erasureCoding(ecPolicy);
createLocatedBlockList(locatedBlocks, blocks, offset, length, mode);
if (!inSnapshot) {
final BlockInfo last = blocks[blocks.length - 1];
final long lastPos = last.isComplete()?
fileSizeExcludeBlocksUnderConstruction - last.getNumBytes()
: fileSizeExcludeBlocksUnderConstruction;
lastlb = createLocatedBlock(last, lastPos, mode);
isComplete = last.isComplete();
locatedBlocks
.lastBlock(createLocatedBlock(locatedBlocks, last, lastPos, mode))
.lastComplete(last.isComplete());
} else {
lastlb = createLocatedBlock(blocks,
fileSizeExcludeBlocksUnderConstruction, mode);
isComplete = true;
locatedBlocks
.lastBlock(createLocatedBlock(locatedBlocks, blocks,
fileSizeExcludeBlocksUnderConstruction, mode))
.lastComplete(true);
}
LocatedBlocks locations = new LocatedBlocks(
fileSizeExcludeBlocksUnderConstruction,
isFileUnderConstruction, locatedblocks, lastlb, isComplete, feInfo,
ecPolicy);
LocatedBlocks locations = locatedBlocks.build();
// Set caching information for the located blocks.
CacheManager cm = namesystem.getCacheManager();
if (cm != null) {
@ -2442,7 +2459,10 @@ public boolean processReport(final DatanodeID nodeID,
// To minimize startup time, we discard any second (or later) block reports
// that we receive while still in startup phase.
DatanodeStorageInfo storageInfo = node.getStorageInfo(storage.getStorageID());
// !#! Register DN with provided storage, not with storage owned by DN
// !#! DN should still have a ref to the DNStorageInfo
DatanodeStorageInfo storageInfo =
providedStorageMap.getStorage(node, storage);
if (storageInfo == null) {
// We handle this for backwards compatibility.
@ -2474,9 +2494,12 @@ public boolean processReport(final DatanodeID nodeID,
nodeID.getDatanodeUuid());
processFirstBlockReport(storageInfo, newReport);
} else {
invalidatedBlocks = processReport(storageInfo, newReport, context);
// Block reports for provided storage are not
// maintained by DN heartbeats
if (!StorageType.PROVIDED.equals(storageInfo.getStorageType())) {
invalidatedBlocks = processReport(storageInfo, newReport, context);
}
}
storageInfo.receivedBlockReport();
} finally {
endTime = Time.monotonicNow();
@ -2690,7 +2713,7 @@ public void markBlockReplicasAsCorrupt(Block oldBlock,
* @param report - the initial block report, to be processed
* @throws IOException
*/
private void processFirstBlockReport(
void processFirstBlockReport(
final DatanodeStorageInfo storageInfo,
final BlockListAsLongs report) throws IOException {
if (report == null) return;

View File

@ -0,0 +1,65 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import java.io.IOException;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.blockmanagement.ProvidedStorageMap.ProvidedBlockList;
import org.apache.hadoop.hdfs.util.RwLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Used to load provided blocks in the {@link BlockManager}.
*/
public abstract class BlockProvider implements Iterable<Block> {
private static final Logger LOG =
LoggerFactory.getLogger(ProvidedStorageMap.class);
private RwLock lock;
private BlockManager bm;
private DatanodeStorageInfo storage;
private boolean hasDNs = false;
/**
* @param lock the namesystem lock
* @param bm block manager
* @param storage storage for provided blocks
*/
void init(RwLock lock, BlockManager bm, DatanodeStorageInfo storage) {
this.bm = bm;
this.lock = lock;
this.storage = storage;
}
/**
* start the processing of block report for provided blocks.
* @throws IOException
*/
void start() throws IOException {
assert lock.hasWriteLock() : "Not holding write lock";
if (hasDNs) {
return;
}
LOG.info("Calling process first blk report from storage: " + storage);
// first pass; periodic refresh should call bm.processReport
bm.processFirstBlockReport(storage, new ProvidedBlockList(iterator()));
hasDNs = true;
}
}

View File

@ -82,6 +82,12 @@ public static BlockStoragePolicySuite createDefaultSuite() {
HdfsConstants.COLD_STORAGE_POLICY_NAME,
new StorageType[]{StorageType.ARCHIVE}, StorageType.EMPTY_ARRAY,
StorageType.EMPTY_ARRAY);
final byte providedId = HdfsConstants.PROVIDED_STORAGE_POLICY_ID;
policies[providedId] = new BlockStoragePolicy(providedId,
HdfsConstants.PROVIDED_STORAGE_POLICY_NAME,
new StorageType[]{StorageType.PROVIDED, StorageType.DISK},
new StorageType[]{StorageType.PROVIDED, StorageType.DISK},
new StorageType[]{StorageType.PROVIDED, StorageType.DISK});
return new BlockStoragePolicySuite(hotId, policies);
}

View File

@ -151,7 +151,7 @@ public Type getType() {
private final LeavingServiceStatus leavingServiceStatus =
new LeavingServiceStatus();
private final Map<String, DatanodeStorageInfo> storageMap =
protected final Map<String, DatanodeStorageInfo> storageMap =
new HashMap<>();
/**
@ -322,6 +322,12 @@ public StorageReport[] getStorageReports() {
boolean hasStaleStorages() {
synchronized (storageMap) {
for (DatanodeStorageInfo storage : storageMap.values()) {
if (StorageType.PROVIDED.equals(storage.getStorageType())) {
// to verify provided storage participated in this hb, requires
// check to pass DNDesc.
// e.g., storageInfo.verifyBlockReportId(this, curBlockReportId)
continue;
}
if (storage.areBlockContentsStale()) {
return true;
}
@ -443,17 +449,22 @@ private void updateStorageStats(StorageReport[] reports, long cacheCapacity,
this.volumeFailures = volFailures;
this.volumeFailureSummary = volumeFailureSummary;
for (StorageReport report : reports) {
totalCapacity += report.getCapacity();
totalRemaining += report.getRemaining();
totalBlockPoolUsed += report.getBlockPoolUsed();
totalDfsUsed += report.getDfsUsed();
totalNonDfsUsed += report.getNonDfsUsed();
if (StorageType.PROVIDED.equals(
report.getStorage().getStorageType())) {
continue;
}
DatanodeStorageInfo storage = updateStorage(report.getStorage());
if (checkFailedStorages) {
failedStorageInfos.remove(storage);
}
storage.receivedHeartbeat(report);
totalCapacity += report.getCapacity();
totalRemaining += report.getRemaining();
totalBlockPoolUsed += report.getBlockPoolUsed();
totalDfsUsed += report.getDfsUsed();
totalNonDfsUsed += report.getNonDfsUsed();
}
// Update total metrics for the node.
@ -474,6 +485,17 @@ private void updateStorageStats(StorageReport[] reports, long cacheCapacity,
}
}
void injectStorage(DatanodeStorageInfo s) {
synchronized (storageMap) {
DatanodeStorageInfo storage = storageMap.get(s.getStorageID());
if (null == storage) {
storageMap.put(s.getStorageID(), s);
} else {
assert storage == s : "found " + storage + " expected " + s;
}
}
}
/**
* Remove stale storages from storageMap. We must not remove any storages
* as long as they have associated block replicas.

View File

@ -532,6 +532,8 @@ private void sortLocatedBlock(final LocatedBlock lb, String targetHost,
} else {
networktopology.sortByDistance(client, lb.getLocations(), activeLen);
}
//move PROVIDED storage to the end to prefer local replicas.
lb.moveProvidedToEnd(activeLen);
// must update cache since we modified locations array
lb.updateCachedStorageInfo();
}

View File

@ -172,6 +172,10 @@ void setState(State state) {
this.state = state;
}
void setHeartbeatedSinceFailover(boolean value) {
heartbeatedSinceFailover = value;
}
boolean areBlocksOnFailedStorage() {
return getState() == State.FAILED && !blocks.isEmpty();
}

View File

@ -0,0 +1,109 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.fs.FileEncryptionInfo;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@InterfaceAudience.Private
@InterfaceStability.Unstable
class LocatedBlockBuilder {
protected long flen;
protected List<LocatedBlock> blocks = Collections.<LocatedBlock>emptyList();
protected boolean isUC;
protected LocatedBlock last;
protected boolean lastComplete;
protected FileEncryptionInfo feInfo;
private final int maxBlocks;
protected ErasureCodingPolicy ecPolicy;
LocatedBlockBuilder(int maxBlocks) {
this.maxBlocks = maxBlocks;
}
boolean isBlockMax() {
return blocks.size() >= maxBlocks;
}
LocatedBlockBuilder fileLength(long fileLength) {
flen = fileLength;
return this;
}
LocatedBlockBuilder addBlock(LocatedBlock block) {
if (blocks.isEmpty()) {
blocks = new ArrayList<>();
}
blocks.add(block);
return this;
}
// return new block so tokens can be set
LocatedBlock newLocatedBlock(ExtendedBlock eb,
DatanodeStorageInfo[] storage,
long pos, boolean isCorrupt) {
LocatedBlock blk =
BlockManager.newLocatedBlock(eb, storage, pos, isCorrupt);
return blk;
}
LocatedBlockBuilder lastUC(boolean underConstruction) {
isUC = underConstruction;
return this;
}
LocatedBlockBuilder lastBlock(LocatedBlock block) {
last = block;
return this;
}
LocatedBlockBuilder lastComplete(boolean complete) {
lastComplete = complete;
return this;
}
LocatedBlockBuilder encryption(FileEncryptionInfo fileEncryptionInfo) {
feInfo = fileEncryptionInfo;
return this;
}
LocatedBlockBuilder erasureCoding(ErasureCodingPolicy codingPolicy) {
ecPolicy = codingPolicy;
return this;
}
LocatedBlocks build(DatanodeDescriptor client) {
return build();
}
LocatedBlocks build() {
return new LocatedBlocks(flen, isUC, blocks, last,
lastComplete, feInfo, ecPolicy);
}
}

View File

@ -0,0 +1,427 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DatanodeInfoWithStorage;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
import org.apache.hadoop.hdfs.util.RwLock;
import org.apache.hadoop.util.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.protobuf.ByteString;
/**
* This class allows us to manage and multiplex between storages local to
* datanodes, and provided storage.
*/
public class ProvidedStorageMap {
private static final Logger LOG =
LoggerFactory.getLogger(ProvidedStorageMap.class);
// limit to a single provider for now
private final BlockProvider blockProvider;
private final String storageId;
private final ProvidedDescriptor providedDescriptor;
private final DatanodeStorageInfo providedStorageInfo;
private boolean providedEnabled;
ProvidedStorageMap(RwLock lock, BlockManager bm, Configuration conf)
throws IOException {
storageId = conf.get(DFSConfigKeys.DFS_PROVIDER_STORAGEUUID,
DFSConfigKeys.DFS_PROVIDER_STORAGEUUID_DEFAULT);
providedEnabled = conf.getBoolean(
DFSConfigKeys.DFS_NAMENODE_PROVIDED_ENABLED,
DFSConfigKeys.DFS_NAMENODE_PROVIDED_ENABLED_DEFAULT);
if (!providedEnabled) {
// disable mapping
blockProvider = null;
providedDescriptor = null;
providedStorageInfo = null;
return;
}
DatanodeStorage ds = new DatanodeStorage(
storageId, State.NORMAL, StorageType.PROVIDED);
providedDescriptor = new ProvidedDescriptor();
providedStorageInfo = providedDescriptor.createProvidedStorage(ds);
// load block reader into storage
Class<? extends BlockProvider> fmt = conf.getClass(
DFSConfigKeys.DFS_NAMENODE_BLOCK_PROVIDER_CLASS,
BlockFormatProvider.class, BlockProvider.class);
blockProvider = ReflectionUtils.newInstance(fmt, conf);
blockProvider.init(lock, bm, providedStorageInfo);
LOG.info("Loaded block provider class: " +
blockProvider.getClass() + " storage: " + providedStorageInfo);
}
/**
* @param dn datanode descriptor
* @param s data node storage
* @return the {@link DatanodeStorageInfo} for the specified datanode.
* If {@code s} corresponds to a provided storage, the storage info
* representing provided storage is returned.
* @throws IOException
*/
DatanodeStorageInfo getStorage(DatanodeDescriptor dn, DatanodeStorage s)
throws IOException {
if (providedEnabled && storageId.equals(s.getStorageID())) {
if (StorageType.PROVIDED.equals(s.getStorageType())) {
// poll service, initiate
blockProvider.start();
dn.injectStorage(providedStorageInfo);
return providedDescriptor.getProvidedStorage(dn, s);
}
LOG.warn("Reserved storage {} reported as non-provided from {}", s, dn);
}
return dn.getStorageInfo(s.getStorageID());
}
public LocatedBlockBuilder newLocatedBlocks(int maxValue) {
if (!providedEnabled) {
return new LocatedBlockBuilder(maxValue);
}
return new ProvidedBlocksBuilder(maxValue);
}
/**
* Builder used for creating {@link LocatedBlocks} when a block is provided.
*/
class ProvidedBlocksBuilder extends LocatedBlockBuilder {
private ShadowDatanodeInfoWithStorage pending;
ProvidedBlocksBuilder(int maxBlocks) {
super(maxBlocks);
pending = new ShadowDatanodeInfoWithStorage(
providedDescriptor, storageId);
}
@Override
LocatedBlock newLocatedBlock(ExtendedBlock eb,
DatanodeStorageInfo[] storages, long pos, boolean isCorrupt) {
DatanodeInfoWithStorage[] locs =
new DatanodeInfoWithStorage[storages.length];
String[] sids = new String[storages.length];
StorageType[] types = new StorageType[storages.length];
for (int i = 0; i < storages.length; ++i) {
sids[i] = storages[i].getStorageID();
types[i] = storages[i].getStorageType();
if (StorageType.PROVIDED.equals(storages[i].getStorageType())) {
locs[i] = pending;
} else {
locs[i] = new DatanodeInfoWithStorage(
storages[i].getDatanodeDescriptor(), sids[i], types[i]);
}
}
return new LocatedBlock(eb, locs, sids, types, pos, isCorrupt, null);
}
@Override
LocatedBlocks build(DatanodeDescriptor client) {
// TODO: to support multiple provided storages, need to pass/maintain map
// set all fields of pending DatanodeInfo
List<String> excludedUUids = new ArrayList<String>();
for (LocatedBlock b: blocks) {
DatanodeInfo[] infos = b.getLocations();
StorageType[] types = b.getStorageTypes();
for (int i = 0; i < types.length; i++) {
if (!StorageType.PROVIDED.equals(types[i])) {
excludedUUids.add(infos[i].getDatanodeUuid());
}
}
}
DatanodeDescriptor dn = providedDescriptor.choose(client, excludedUUids);
if (dn == null) {
dn = providedDescriptor.choose(client);
}
pending.replaceInternal(dn);
return new LocatedBlocks(
flen, isUC, blocks, last, lastComplete, feInfo, ecPolicy);
}
@Override
LocatedBlocks build() {
return build(providedDescriptor.chooseRandom());
}
}
/**
* An abstract {@link DatanodeInfoWithStorage} to represent provided storage.
*/
static class ShadowDatanodeInfoWithStorage extends DatanodeInfoWithStorage {
private String shadowUuid;
ShadowDatanodeInfoWithStorage(DatanodeDescriptor d, String storageId) {
super(d, storageId, StorageType.PROVIDED);
}
@Override
public String getDatanodeUuid() {
return shadowUuid;
}
public void setDatanodeUuid(String uuid) {
shadowUuid = uuid;
}
void replaceInternal(DatanodeDescriptor dn) {
updateRegInfo(dn); // overwrite DatanodeID (except UUID)
setDatanodeUuid(dn.getDatanodeUuid());
setCapacity(dn.getCapacity());
setDfsUsed(dn.getDfsUsed());
setRemaining(dn.getRemaining());
setBlockPoolUsed(dn.getBlockPoolUsed());
setCacheCapacity(dn.getCacheCapacity());
setCacheUsed(dn.getCacheUsed());
setLastUpdate(dn.getLastUpdate());
setLastUpdateMonotonic(dn.getLastUpdateMonotonic());
setXceiverCount(dn.getXceiverCount());
setNetworkLocation(dn.getNetworkLocation());
adminState = dn.getAdminState();
setUpgradeDomain(dn.getUpgradeDomain());
}
@Override
public boolean equals(Object obj) {
return super.equals(obj);
}
@Override
public int hashCode() {
return super.hashCode();
}
}
/**
* An abstract DatanodeDescriptor to track datanodes with provided storages.
* NOTE: never resolved through registerDatanode, so not in the topology.
*/
static class ProvidedDescriptor extends DatanodeDescriptor {
private final NavigableMap<String, DatanodeDescriptor> dns =
new ConcurrentSkipListMap<>();
ProvidedDescriptor() {
super(new DatanodeID(
null, // String ipAddr,
null, // String hostName,
UUID.randomUUID().toString(), // String datanodeUuid,
0, // int xferPort,
0, // int infoPort,
0, // int infoSecurePort,
0)); // int ipcPort
}
DatanodeStorageInfo getProvidedStorage(
DatanodeDescriptor dn, DatanodeStorage s) {
dns.put(dn.getDatanodeUuid(), dn);
// TODO: maintain separate RPC ident per dn
return storageMap.get(s.getStorageID());
}
DatanodeStorageInfo createProvidedStorage(DatanodeStorage ds) {
assert null == storageMap.get(ds.getStorageID());
DatanodeStorageInfo storage = new DatanodeStorageInfo(this, ds);
storage.setHeartbeatedSinceFailover(true);
storageMap.put(storage.getStorageID(), storage);
return storage;
}
DatanodeDescriptor choose(DatanodeDescriptor client) {
// exact match for now
DatanodeDescriptor dn = dns.get(client.getDatanodeUuid());
if (null == dn) {
dn = chooseRandom();
}
return dn;
}
DatanodeDescriptor choose(DatanodeDescriptor client,
List<String> excludedUUids) {
// exact match for now
DatanodeDescriptor dn = dns.get(client.getDatanodeUuid());
if (null == dn || excludedUUids.contains(client.getDatanodeUuid())) {
dn = null;
Set<String> exploredUUids = new HashSet<String>();
while(exploredUUids.size() < dns.size()) {
Map.Entry<String, DatanodeDescriptor> d =
dns.ceilingEntry(UUID.randomUUID().toString());
if (null == d) {
d = dns.firstEntry();
}
String uuid = d.getValue().getDatanodeUuid();
//this node has already been explored, and was not selected earlier
if (exploredUUids.contains(uuid)) {
continue;
}
exploredUUids.add(uuid);
//this node has been excluded
if (excludedUUids.contains(uuid)) {
continue;
}
return dns.get(uuid);
}
}
return dn;
}
DatanodeDescriptor chooseRandom(DatanodeStorageInfo[] excludedStorages) {
// TODO: Currently this is not uniformly random;
// skewed toward sparse sections of the ids
Set<DatanodeDescriptor> excludedNodes =
new HashSet<DatanodeDescriptor>();
if (excludedStorages != null) {
for (int i= 0; i < excludedStorages.length; i++) {
LOG.info("Excluded: " + excludedStorages[i].getDatanodeDescriptor());
excludedNodes.add(excludedStorages[i].getDatanodeDescriptor());
}
}
Set<DatanodeDescriptor> exploredNodes = new HashSet<DatanodeDescriptor>();
while(exploredNodes.size() < dns.size()) {
Map.Entry<String, DatanodeDescriptor> d =
dns.ceilingEntry(UUID.randomUUID().toString());
if (null == d) {
d = dns.firstEntry();
}
DatanodeDescriptor node = d.getValue();
//this node has already been explored, and was not selected earlier
if (exploredNodes.contains(node)) {
continue;
}
exploredNodes.add(node);
//this node has been excluded
if (excludedNodes.contains(node)) {
continue;
}
return node;
}
return null;
}
DatanodeDescriptor chooseRandom() {
return chooseRandom(null);
}
@Override
void addBlockToBeReplicated(Block block, DatanodeStorageInfo[] targets) {
// pick a random datanode, delegate to it
DatanodeDescriptor node = chooseRandom(targets);
if (node != null) {
node.addBlockToBeReplicated(block, targets);
} else {
LOG.error("Cannot find a source node to replicate block: "
+ block + " from");
}
}
@Override
public boolean equals(Object obj) {
return (this == obj) || super.equals(obj);
}
@Override
public int hashCode() {
return super.hashCode();
}
}
/**
* Used to emulate block reports for provided blocks.
*/
static class ProvidedBlockList extends BlockListAsLongs {
private final Iterator<Block> inner;
ProvidedBlockList(Iterator<Block> inner) {
this.inner = inner;
}
@Override
public Iterator<BlockReportReplica> iterator() {
return new Iterator<BlockReportReplica>() {
@Override
public BlockReportReplica next() {
return new BlockReportReplica(inner.next());
}
@Override
public boolean hasNext() {
return inner.hasNext();
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
@Override
public int getNumberOfBlocks() {
// VERIFY: only printed for debugging
return -1;
}
@Override
public ByteString getBlocksBuffer() {
throw new UnsupportedOperationException();
}
@Override
public long[] getBlockListAsLongs() {
// should only be used for backwards compat, DN.ver > NN.ver
throw new UnsupportedOperationException();
}
}
}

View File

@ -4621,15 +4621,31 @@
</description>
</property>
<property>
<name>dfs.namenode.provided.enabled</name>
<value>false</value>
<description>
Enables the Namenode to handle provided storages.
</description>
</property>
<property>
<name>dfs.namenode.block.provider.class</name>
<value>org.apache.hadoop.hdfs.server.blockmanagement.BlockFormatProvider</value>
<description>
The class that is used to load provided blocks in the Namenode.
</description>
</property>
<property>
<name>dfs.provider.class</name>
<value>org.apache.hadoop.hdfs.server.common.TextFileRegionProvider</value>
<description>
The class that is used to load information about blocks stored in
provided storages.
org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.TextFileRegionProvider
is used as the default, which expects the blocks to be specified
using a delimited text file.
The class that is used to load information about blocks stored in
provided storages.
org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.TextFileRegionProvider
is used as the default, which expects the blocks to be specified
using a delimited text file.
</description>
</property>
@ -4637,7 +4653,7 @@
<name>dfs.provided.df.class</name>
<value>org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.DefaultProvidedVolumeDF</value>
<description>
The class that is used to measure usage statistics of provided stores.
The class that is used to measure usage statistics of provided stores.
</description>
</property>
@ -4645,7 +4661,7 @@
<name>dfs.provided.storage.id</name>
<value>DS-PROVIDED</value>
<description>
The storage ID used for provided stores.
The storage ID used for provided stores.
</description>
</property>

View File

@ -84,6 +84,7 @@ public class TestBlockStoragePolicy {
static final byte ONESSD = HdfsConstants.ONESSD_STORAGE_POLICY_ID;
static final byte ALLSSD = HdfsConstants.ALLSSD_STORAGE_POLICY_ID;
static final byte LAZY_PERSIST = HdfsConstants.MEMORY_STORAGE_POLICY_ID;
static final byte PROVIDED = HdfsConstants.PROVIDED_STORAGE_POLICY_ID;
@Test (timeout=300000)
public void testConfigKeyEnabled() throws IOException {
@ -143,6 +144,9 @@ public void testDefaultPolicies() {
expectedPolicyStrings.put(ALLSSD, "BlockStoragePolicy{ALL_SSD:" + ALLSSD +
", storageTypes=[SSD], creationFallbacks=[DISK], " +
"replicationFallbacks=[DISK]}");
expectedPolicyStrings.put(PROVIDED, "BlockStoragePolicy{PROVIDED:" + PROVIDED +
", storageTypes=[PROVIDED, DISK], creationFallbacks=[PROVIDED, DISK], " +
"replicationFallbacks=[PROVIDED, DISK]}");
for(byte i = 1; i < 16; i++) {
final BlockStoragePolicy policy = POLICY_SUITE.getPolicy(i);

View File

@ -300,7 +300,7 @@ public void reloadCachedMappings(List<String> names) {
*/
@Test
public void testSortLocatedBlocks() throws IOException, URISyntaxException {
HelperFunction(null);
HelperFunction(null, 0);
}
/**
@ -312,7 +312,7 @@ public void testSortLocatedBlocks() throws IOException, URISyntaxException {
*/
@Test
public void testgoodScript() throws IOException, URISyntaxException {
HelperFunction("/" + Shell.appendScriptExtension("topology-script"));
HelperFunction("/" + Shell.appendScriptExtension("topology-script"), 0);
}
@ -325,7 +325,21 @@ public void testgoodScript() throws IOException, URISyntaxException {
*/
@Test
public void testBadScript() throws IOException, URISyntaxException {
HelperFunction("/"+ Shell.appendScriptExtension("topology-broken-script"));
HelperFunction("/"+ Shell.appendScriptExtension("topology-broken-script"), 0);
}
/**
* Test with different sorting functions but include datanodes
* with provided storage
* @throws IOException
* @throws URISyntaxException
*/
@Test
public void testWithProvidedTypes() throws IOException, URISyntaxException {
HelperFunction(null, 1);
HelperFunction(null, 3);
HelperFunction("/" + Shell.appendScriptExtension("topology-script"), 1);
HelperFunction("/" + Shell.appendScriptExtension("topology-script"), 2);
}
/**
@ -333,11 +347,12 @@ public void testBadScript() throws IOException, URISyntaxException {
* we invoke this function with and without topology scripts
*
* @param scriptFileName - Script Name or null
* @param providedStorages - number of provided storages to add
*
* @throws URISyntaxException
* @throws IOException
*/
public void HelperFunction(String scriptFileName)
public void HelperFunction(String scriptFileName, int providedStorages)
throws URISyntaxException, IOException {
// create the DatanodeManager which will be tested
Configuration conf = new Configuration();
@ -352,17 +367,25 @@ public void HelperFunction(String scriptFileName)
}
DatanodeManager dm = mockDatanodeManager(fsn, conf);
int totalDNs = 5 + providedStorages;
// register 5 datanodes, each with different storage ID and type
DatanodeInfo[] locs = new DatanodeInfo[5];
String[] storageIDs = new String[5];
StorageType[] storageTypes = new StorageType[]{
StorageType.ARCHIVE,
StorageType.DEFAULT,
StorageType.DISK,
StorageType.RAM_DISK,
StorageType.SSD
};
for (int i = 0; i < 5; i++) {
DatanodeInfo[] locs = new DatanodeInfo[totalDNs];
String[] storageIDs = new String[totalDNs];
List<StorageType> storageTypesList = new ArrayList<>(
Arrays.asList(StorageType.ARCHIVE,
StorageType.DEFAULT,
StorageType.DISK,
StorageType.RAM_DISK,
StorageType.SSD));
for (int i = 0; i < providedStorages; i++) {
storageTypesList.add(StorageType.PROVIDED);
}
StorageType[] storageTypes= storageTypesList.toArray(new StorageType[0]);
for (int i = 0; i < totalDNs; i++) {
// register new datanode
String uuid = "UUID-" + i;
String ip = "IP-" + i;
@ -398,9 +421,9 @@ public void HelperFunction(String scriptFileName)
DatanodeInfo[] sortedLocs = block.getLocations();
storageIDs = block.getStorageIDs();
storageTypes = block.getStorageTypes();
assertThat(sortedLocs.length, is(5));
assertThat(storageIDs.length, is(5));
assertThat(storageTypes.length, is(5));
assertThat(sortedLocs.length, is(totalDNs));
assertThat(storageIDs.length, is(totalDNs));
assertThat(storageTypes.length, is(totalDNs));
for (int i = 0; i < sortedLocs.length; i++) {
assertThat(((DatanodeInfoWithStorage) sortedLocs[i]).getStorageID(),
is(storageIDs[i]));
@ -414,6 +437,14 @@ public void HelperFunction(String scriptFileName)
is(DatanodeInfo.AdminStates.DECOMMISSIONED));
assertThat(sortedLocs[sortedLocs.length - 2].getAdminState(),
is(DatanodeInfo.AdminStates.DECOMMISSIONED));
// check that the StorageType of datanoodes immediately
// preceding the decommissioned datanodes is PROVIDED
for (int i = 0; i < providedStorages; i++) {
assertThat(
((DatanodeInfoWithStorage)
sortedLocs[sortedLocs.length - 3 - i]).getStorageType(),
is(StorageType.PROVIDED));
}
}
/**

View File

@ -0,0 +1,345 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.util.Random;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockFormatProvider;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockProvider;
import org.apache.hadoop.hdfs.server.common.BlockFormat;
import org.apache.hadoop.hdfs.server.common.FileRegionProvider;
import org.apache.hadoop.hdfs.server.common.TextFileRegionFormat;
import org.apache.hadoop.hdfs.server.common.TextFileRegionProvider;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.Assert.*;
public class TestNameNodeProvidedImplementation {
@Rule public TestName name = new TestName();
public static final Logger LOG =
LoggerFactory.getLogger(TestNameNodeProvidedImplementation.class);
final Random r = new Random();
final File fBASE = new File(MiniDFSCluster.getBaseDirectory());
final Path BASE = new Path(fBASE.toURI().toString());
final Path NAMEPATH = new Path(BASE, "providedDir");;
final Path NNDIRPATH = new Path(BASE, "nnDir");
final Path BLOCKFILE = new Path(NNDIRPATH, "blocks.csv");
final String SINGLEUSER = "usr1";
final String SINGLEGROUP = "grp1";
Configuration conf;
MiniDFSCluster cluster;
@Before
public void setSeed() throws Exception {
if (fBASE.exists() && !FileUtil.fullyDelete(fBASE)) {
throw new IOException("Could not fully delete " + fBASE);
}
long seed = r.nextLong();
r.setSeed(seed);
System.out.println(name.getMethodName() + " seed: " + seed);
conf = new HdfsConfiguration();
conf.set(SingleUGIResolver.USER, SINGLEUSER);
conf.set(SingleUGIResolver.GROUP, SINGLEGROUP);
conf.set(DFSConfigKeys.DFS_PROVIDER_STORAGEUUID,
DFSConfigKeys.DFS_PROVIDER_STORAGEUUID_DEFAULT);
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_PROVIDED_ENABLED, true);
conf.setClass(DFSConfigKeys.DFS_NAMENODE_BLOCK_PROVIDER_CLASS,
BlockFormatProvider.class, BlockProvider.class);
conf.setClass(DFSConfigKeys.DFS_PROVIDER_CLASS,
TextFileRegionProvider.class, FileRegionProvider.class);
conf.setClass(DFSConfigKeys.DFS_PROVIDER_BLK_FORMAT_CLASS,
TextFileRegionFormat.class, BlockFormat.class);
conf.set(DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_WRITE_PATH,
BLOCKFILE.toString());
conf.set(DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_READ_PATH,
BLOCKFILE.toString());
conf.set(DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_DELIMITER, ",");
File imageDir = new File(NAMEPATH.toUri());
if (!imageDir.exists()) {
LOG.info("Creating directory: " + imageDir);
imageDir.mkdirs();
}
File nnDir = new File(NNDIRPATH.toUri());
if (!nnDir.exists()) {
nnDir.mkdirs();
}
// create 10 random files under BASE
for (int i=0; i < 10; i++) {
File newFile = new File(new Path(NAMEPATH, "file" + i).toUri());
if(!newFile.exists()) {
try {
LOG.info("Creating " + newFile.toString());
newFile.createNewFile();
Writer writer = new OutputStreamWriter(
new FileOutputStream(newFile.getAbsolutePath()), "utf-8");
for(int j=0; j < 10*i; j++) {
writer.write("0");
}
writer.flush();
writer.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
@After
public void shutdown() throws Exception {
try {
if (cluster != null) {
cluster.shutdown(true, true);
}
} finally {
cluster = null;
}
}
void createImage(TreeWalk t, Path out,
Class<? extends BlockResolver> blockIdsClass) throws Exception {
ImageWriter.Options opts = ImageWriter.defaults();
opts.setConf(conf);
opts.output(out.toString())
.blocks(TextFileRegionFormat.class)
.blockIds(blockIdsClass);
try (ImageWriter w = new ImageWriter(opts)) {
for (TreePath e : t) {
w.accept(e);
}
}
}
void startCluster(Path nspath, int numDatanodes,
StorageType[] storageTypes,
StorageType[][] storageTypesPerDatanode)
throws IOException {
conf.set(DFS_NAMENODE_NAME_DIR_KEY, nspath.toString());
if (storageTypesPerDatanode != null) {
cluster = new MiniDFSCluster.Builder(conf)
.format(false)
.manageNameDfsDirs(false)
.numDataNodes(numDatanodes)
.storageTypes(storageTypesPerDatanode)
.build();
} else if (storageTypes != null) {
cluster = new MiniDFSCluster.Builder(conf)
.format(false)
.manageNameDfsDirs(false)
.numDataNodes(numDatanodes)
.storagesPerDatanode(storageTypes.length)
.storageTypes(storageTypes)
.build();
} else {
cluster = new MiniDFSCluster.Builder(conf)
.format(false)
.manageNameDfsDirs(false)
.numDataNodes(numDatanodes)
.build();
}
cluster.waitActive();
}
@Test(timeout = 20000)
public void testLoadImage() throws Exception {
final long seed = r.nextLong();
LOG.info("NAMEPATH: " + NAMEPATH);
createImage(new RandomTreeWalk(seed), NNDIRPATH, FixedBlockResolver.class);
startCluster(NNDIRPATH, 0, new StorageType[] {StorageType.PROVIDED}, null);
FileSystem fs = cluster.getFileSystem();
for (TreePath e : new RandomTreeWalk(seed)) {
FileStatus rs = e.getFileStatus();
Path hp = new Path(rs.getPath().toUri().getPath());
assertTrue(fs.exists(hp));
FileStatus hs = fs.getFileStatus(hp);
assertEquals(rs.getPath().toUri().getPath(),
hs.getPath().toUri().getPath());
assertEquals(rs.getPermission(), hs.getPermission());
assertEquals(rs.getLen(), hs.getLen());
assertEquals(SINGLEUSER, hs.getOwner());
assertEquals(SINGLEGROUP, hs.getGroup());
assertEquals(rs.getAccessTime(), hs.getAccessTime());
assertEquals(rs.getModificationTime(), hs.getModificationTime());
}
}
@Test(timeout=20000)
public void testBlockLoad() throws Exception {
conf.setClass(ImageWriter.Options.UGI_CLASS,
SingleUGIResolver.class, UGIResolver.class);
createImage(new FSTreeWalk(NAMEPATH, conf), NNDIRPATH,
FixedBlockResolver.class);
startCluster(NNDIRPATH, 1, new StorageType[] {StorageType.PROVIDED}, null);
}
@Test(timeout=500000)
public void testDefaultReplication() throws Exception {
int targetReplication = 2;
conf.setInt(FixedBlockMultiReplicaResolver.REPLICATION, targetReplication);
createImage(new FSTreeWalk(NAMEPATH, conf), NNDIRPATH,
FixedBlockMultiReplicaResolver.class);
// make the last Datanode with only DISK
startCluster(NNDIRPATH, 3, null,
new StorageType[][] {
{StorageType.PROVIDED},
{StorageType.PROVIDED},
{StorageType.DISK}}
);
// wait for the replication to finish
Thread.sleep(50000);
FileSystem fs = cluster.getFileSystem();
int count = 0;
for (TreePath e : new FSTreeWalk(NAMEPATH, conf)) {
FileStatus rs = e.getFileStatus();
Path hp = removePrefix(NAMEPATH, rs.getPath());
LOG.info("hp " + hp.toUri().getPath());
//skip HDFS specific files, which may have been created later on.
if (hp.toString().contains("in_use.lock")
|| hp.toString().contains("current")) {
continue;
}
e.accept(count++);
assertTrue(fs.exists(hp));
FileStatus hs = fs.getFileStatus(hp);
if (rs.isFile()) {
BlockLocation[] bl = fs.getFileBlockLocations(
hs.getPath(), 0, hs.getLen());
int i = 0;
for(; i < bl.length; i++) {
int currentRep = bl[i].getHosts().length;
assertEquals(targetReplication , currentRep);
}
}
}
}
static Path removePrefix(Path base, Path walk) {
Path wpath = new Path(walk.toUri().getPath());
Path bpath = new Path(base.toUri().getPath());
Path ret = new Path("/");
while (!(bpath.equals(wpath) || "".equals(wpath.getName()))) {
ret = "".equals(ret.getName())
? new Path("/", wpath.getName())
: new Path(new Path("/", wpath.getName()),
new Path(ret.toString().substring(1)));
wpath = wpath.getParent();
}
if (!bpath.equals(wpath)) {
throw new IllegalArgumentException(base + " not a prefix of " + walk);
}
return ret;
}
@Test(timeout=30000)
public void testBlockRead() throws Exception {
conf.setClass(ImageWriter.Options.UGI_CLASS,
FsUGIResolver.class, UGIResolver.class);
createImage(new FSTreeWalk(NAMEPATH, conf), NNDIRPATH,
FixedBlockResolver.class);
startCluster(NNDIRPATH, 3, new StorageType[] {StorageType.PROVIDED}, null);
FileSystem fs = cluster.getFileSystem();
Thread.sleep(2000);
int count = 0;
// read NN metadata, verify contents match
for (TreePath e : new FSTreeWalk(NAMEPATH, conf)) {
FileStatus rs = e.getFileStatus();
Path hp = removePrefix(NAMEPATH, rs.getPath());
LOG.info("hp " + hp.toUri().getPath());
//skip HDFS specific files, which may have been created later on.
if(hp.toString().contains("in_use.lock")
|| hp.toString().contains("current")) {
continue;
}
e.accept(count++);
assertTrue(fs.exists(hp));
FileStatus hs = fs.getFileStatus(hp);
assertEquals(hp.toUri().getPath(), hs.getPath().toUri().getPath());
assertEquals(rs.getPermission(), hs.getPermission());
assertEquals(rs.getOwner(), hs.getOwner());
assertEquals(rs.getGroup(), hs.getGroup());
if (rs.isFile()) {
assertEquals(rs.getLen(), hs.getLen());
try (ReadableByteChannel i = Channels.newChannel(
new FileInputStream(new File(rs.getPath().toUri())))) {
try (ReadableByteChannel j = Channels.newChannel(
fs.open(hs.getPath()))) {
ByteBuffer ib = ByteBuffer.allocate(4096);
ByteBuffer jb = ByteBuffer.allocate(4096);
while (true) {
int il = i.read(ib);
int jl = j.read(jb);
if (il < 0 || jl < 0) {
assertEquals(il, jl);
break;
}
ib.flip();
jb.flip();
int cmp = Math.min(ib.remaining(), jb.remaining());
for (int k = 0; k < cmp; ++k) {
assertEquals(ib.get(), jb.get());
}
ib.compact();
jb.compact();
}
}
}
}
}
}
}