HDFS-11663. [READ] Fix NullPointerException in ProvidedBlocksBuilder

This commit is contained in:
Virajith Jalaparti 2017-05-04 13:06:53 -07:00 committed by Chris Douglas
parent 1108cb7691
commit aa5ec85f7f
2 changed files with 77 additions and 33 deletions

View File

@ -134,11 +134,13 @@ public LocatedBlockBuilder newLocatedBlocks(int maxValue) {
class ProvidedBlocksBuilder extends LocatedBlockBuilder { class ProvidedBlocksBuilder extends LocatedBlockBuilder {
private ShadowDatanodeInfoWithStorage pending; private ShadowDatanodeInfoWithStorage pending;
private boolean hasProvidedLocations;
ProvidedBlocksBuilder(int maxBlocks) { ProvidedBlocksBuilder(int maxBlocks) {
super(maxBlocks); super(maxBlocks);
pending = new ShadowDatanodeInfoWithStorage( pending = new ShadowDatanodeInfoWithStorage(
providedDescriptor, storageId); providedDescriptor, storageId);
hasProvidedLocations = false;
} }
@Override @Override
@ -154,6 +156,7 @@ LocatedBlock newLocatedBlock(ExtendedBlock eb,
types[i] = storages[i].getStorageType(); types[i] = storages[i].getStorageType();
if (StorageType.PROVIDED.equals(storages[i].getStorageType())) { if (StorageType.PROVIDED.equals(storages[i].getStorageType())) {
locs[i] = pending; locs[i] = pending;
hasProvidedLocations = true;
} else { } else {
locs[i] = new DatanodeInfoWithStorage( locs[i] = new DatanodeInfoWithStorage(
storages[i].getDatanodeDescriptor(), sids[i], types[i]); storages[i].getDatanodeDescriptor(), sids[i], types[i]);
@ -165,25 +168,28 @@ LocatedBlock newLocatedBlock(ExtendedBlock eb,
@Override @Override
LocatedBlocks build(DatanodeDescriptor client) { LocatedBlocks build(DatanodeDescriptor client) {
// TODO: to support multiple provided storages, need to pass/maintain map // TODO: to support multiple provided storages, need to pass/maintain map
// set all fields of pending DatanodeInfo if (hasProvidedLocations) {
List<String> excludedUUids = new ArrayList<String>(); // set all fields of pending DatanodeInfo
for (LocatedBlock b: blocks) { List<String> excludedUUids = new ArrayList<String>();
DatanodeInfo[] infos = b.getLocations(); for (LocatedBlock b : blocks) {
StorageType[] types = b.getStorageTypes(); DatanodeInfo[] infos = b.getLocations();
StorageType[] types = b.getStorageTypes();
for (int i = 0; i < types.length; i++) { for (int i = 0; i < types.length; i++) {
if (!StorageType.PROVIDED.equals(types[i])) { if (!StorageType.PROVIDED.equals(types[i])) {
excludedUUids.add(infos[i].getDatanodeUuid()); excludedUUids.add(infos[i].getDatanodeUuid());
}
} }
} }
DatanodeDescriptor dn =
providedDescriptor.choose(client, excludedUUids);
if (dn == null) {
dn = providedDescriptor.choose(client);
}
pending.replaceInternal(dn);
} }
DatanodeDescriptor dn = providedDescriptor.choose(client, excludedUUids);
if (dn == null) {
dn = providedDescriptor.choose(client);
}
pending.replaceInternal(dn);
return new LocatedBlocks( return new LocatedBlocks(
flen, isUC, blocks, last, lastComplete, feInfo, ecPolicy); flen, isUC, blocks, last, lastComplete, feInfo, ecPolicy);
} }
@ -278,7 +284,8 @@ DatanodeStorageInfo createProvidedStorage(DatanodeStorage ds) {
DatanodeDescriptor choose(DatanodeDescriptor client) { DatanodeDescriptor choose(DatanodeDescriptor client) {
// exact match for now // exact match for now
DatanodeDescriptor dn = dns.get(client.getDatanodeUuid()); DatanodeDescriptor dn = client != null ?
dns.get(client.getDatanodeUuid()) : null;
if (null == dn) { if (null == dn) {
dn = chooseRandom(); dn = chooseRandom();
} }
@ -288,7 +295,8 @@ DatanodeDescriptor choose(DatanodeDescriptor client) {
DatanodeDescriptor choose(DatanodeDescriptor client, DatanodeDescriptor choose(DatanodeDescriptor client,
List<String> excludedUUids) { List<String> excludedUUids) {
// exact match for now // exact match for now
DatanodeDescriptor dn = dns.get(client.getDatanodeUuid()); DatanodeDescriptor dn = client != null ?
dns.get(client.getDatanodeUuid()) : null;
if (null == dn || excludedUUids.contains(client.getDatanodeUuid())) { if (null == dn || excludedUUids.contains(client.getDatanodeUuid())) {
dn = null; dn = null;

View File

@ -35,6 +35,7 @@
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockFormatProvider; import org.apache.hadoop.hdfs.server.blockmanagement.BlockFormatProvider;
@ -69,6 +70,10 @@ public class TestNameNodeProvidedImplementation {
final Path BLOCKFILE = new Path(NNDIRPATH, "blocks.csv"); final Path BLOCKFILE = new Path(NNDIRPATH, "blocks.csv");
final String SINGLEUSER = "usr1"; final String SINGLEUSER = "usr1";
final String SINGLEGROUP = "grp1"; final String SINGLEGROUP = "grp1";
private final int numFiles = 10;
private final String filePrefix = "file";
private final String fileSuffix = ".dat";
private final int baseFileLen = 1024;
Configuration conf; Configuration conf;
MiniDFSCluster cluster; MiniDFSCluster cluster;
@ -114,15 +119,16 @@ public void setSeed() throws Exception {
} }
// create 10 random files under BASE // create 10 random files under BASE
for (int i=0; i < 10; i++) { for (int i=0; i < numFiles; i++) {
File newFile = new File(new Path(NAMEPATH, "file" + i).toUri()); File newFile = new File(
new Path(NAMEPATH, filePrefix + i + fileSuffix).toUri());
if(!newFile.exists()) { if(!newFile.exists()) {
try { try {
LOG.info("Creating " + newFile.toString()); LOG.info("Creating " + newFile.toString());
newFile.createNewFile(); newFile.createNewFile();
Writer writer = new OutputStreamWriter( Writer writer = new OutputStreamWriter(
new FileOutputStream(newFile.getAbsolutePath()), "utf-8"); new FileOutputStream(newFile.getAbsolutePath()), "utf-8");
for(int j=0; j < 10*i; j++) { for(int j=0; j < baseFileLen*i; j++) {
writer.write("0"); writer.write("0");
} }
writer.flush(); writer.flush();
@ -161,29 +167,30 @@ void createImage(TreeWalk t, Path out,
void startCluster(Path nspath, int numDatanodes, void startCluster(Path nspath, int numDatanodes,
StorageType[] storageTypes, StorageType[] storageTypes,
StorageType[][] storageTypesPerDatanode) StorageType[][] storageTypesPerDatanode,
boolean doFormat)
throws IOException { throws IOException {
conf.set(DFS_NAMENODE_NAME_DIR_KEY, nspath.toString()); conf.set(DFS_NAMENODE_NAME_DIR_KEY, nspath.toString());
if (storageTypesPerDatanode != null) { if (storageTypesPerDatanode != null) {
cluster = new MiniDFSCluster.Builder(conf) cluster = new MiniDFSCluster.Builder(conf)
.format(false) .format(doFormat)
.manageNameDfsDirs(false) .manageNameDfsDirs(doFormat)
.numDataNodes(numDatanodes) .numDataNodes(numDatanodes)
.storageTypes(storageTypesPerDatanode) .storageTypes(storageTypesPerDatanode)
.build(); .build();
} else if (storageTypes != null) { } else if (storageTypes != null) {
cluster = new MiniDFSCluster.Builder(conf) cluster = new MiniDFSCluster.Builder(conf)
.format(false) .format(doFormat)
.manageNameDfsDirs(false) .manageNameDfsDirs(doFormat)
.numDataNodes(numDatanodes) .numDataNodes(numDatanodes)
.storagesPerDatanode(storageTypes.length) .storagesPerDatanode(storageTypes.length)
.storageTypes(storageTypes) .storageTypes(storageTypes)
.build(); .build();
} else { } else {
cluster = new MiniDFSCluster.Builder(conf) cluster = new MiniDFSCluster.Builder(conf)
.format(false) .format(doFormat)
.manageNameDfsDirs(false) .manageNameDfsDirs(doFormat)
.numDataNodes(numDatanodes) .numDataNodes(numDatanodes)
.build(); .build();
} }
@ -195,7 +202,8 @@ public void testLoadImage() throws Exception {
final long seed = r.nextLong(); final long seed = r.nextLong();
LOG.info("NAMEPATH: " + NAMEPATH); LOG.info("NAMEPATH: " + NAMEPATH);
createImage(new RandomTreeWalk(seed), NNDIRPATH, FixedBlockResolver.class); createImage(new RandomTreeWalk(seed), NNDIRPATH, FixedBlockResolver.class);
startCluster(NNDIRPATH, 0, new StorageType[] {StorageType.PROVIDED}, null); startCluster(NNDIRPATH, 0, new StorageType[] {StorageType.PROVIDED},
null, false);
FileSystem fs = cluster.getFileSystem(); FileSystem fs = cluster.getFileSystem();
for (TreePath e : new RandomTreeWalk(seed)) { for (TreePath e : new RandomTreeWalk(seed)) {
@ -220,7 +228,8 @@ public void testBlockLoad() throws Exception {
SingleUGIResolver.class, UGIResolver.class); SingleUGIResolver.class, UGIResolver.class);
createImage(new FSTreeWalk(NAMEPATH, conf), NNDIRPATH, createImage(new FSTreeWalk(NAMEPATH, conf), NNDIRPATH,
FixedBlockResolver.class); FixedBlockResolver.class);
startCluster(NNDIRPATH, 1, new StorageType[] {StorageType.PROVIDED}, null); startCluster(NNDIRPATH, 1, new StorageType[] {StorageType.PROVIDED},
null, false);
} }
@Test(timeout=500000) @Test(timeout=500000)
@ -232,10 +241,10 @@ public void testDefaultReplication() throws Exception {
// make the last Datanode with only DISK // make the last Datanode with only DISK
startCluster(NNDIRPATH, 3, null, startCluster(NNDIRPATH, 3, null,
new StorageType[][] { new StorageType[][] {
{StorageType.PROVIDED}, {StorageType.PROVIDED},
{StorageType.PROVIDED}, {StorageType.PROVIDED},
{StorageType.DISK}} {StorageType.DISK}},
); false);
// wait for the replication to finish // wait for the replication to finish
Thread.sleep(50000); Thread.sleep(50000);
@ -290,7 +299,8 @@ public void testBlockRead() throws Exception {
FsUGIResolver.class, UGIResolver.class); FsUGIResolver.class, UGIResolver.class);
createImage(new FSTreeWalk(NAMEPATH, conf), NNDIRPATH, createImage(new FSTreeWalk(NAMEPATH, conf), NNDIRPATH,
FixedBlockResolver.class); FixedBlockResolver.class);
startCluster(NNDIRPATH, 3, new StorageType[] {StorageType.PROVIDED}, null); startCluster(NNDIRPATH, 3, new StorageType[] {StorageType.PROVIDED},
null, false);
FileSystem fs = cluster.getFileSystem(); FileSystem fs = cluster.getFileSystem();
Thread.sleep(2000); Thread.sleep(2000);
int count = 0; int count = 0;
@ -342,4 +352,30 @@ public void testBlockRead() throws Exception {
} }
} }
} }
private BlockLocation[] createFile(Path path, short replication,
long fileLen, long blockLen) throws IOException {
FileSystem fs = cluster.getFileSystem();
//create a sample file that is not provided
DFSTestUtil.createFile(fs, path, false, (int) blockLen,
fileLen, blockLen, replication, 0, true);
return fs.getFileBlockLocations(path, 0, fileLen);
}
@Test
public void testClusterWithEmptyImage() throws IOException {
// start a cluster with 2 datanodes without any provided storage
startCluster(NNDIRPATH, 2, null,
new StorageType[][] {
{StorageType.DISK},
{StorageType.DISK}},
true);
assertTrue(cluster.isClusterUp());
assertTrue(cluster.isDataNodeUp());
BlockLocation[] locations = createFile(new Path("/testFile1.dat"),
(short) 2, 1024*1024, 1024*1024);
assertEquals(1, locations.length);
assertEquals(2, locations[0].getHosts().length);
}
} }