HDFS-8433. Erasure coding: set blockToken in LocatedStripedBlock. Contributed by Walter Su.

This commit is contained in:
Walter Su 2015-07-20 10:18:34 +08:00
parent 4fdd9abd7e
commit 06394e3760
8 changed files with 403 additions and 200 deletions

View File

@ -20,6 +20,8 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.security.token.Token;
import java.util.Arrays;
@ -32,8 +34,10 @@
@InterfaceStability.Evolving
public class LocatedStripedBlock extends LocatedBlock {
private static final int[] EMPTY_INDICES = {};
private static final Token<BlockTokenIdentifier> EMPTY_TOKEN = new Token<>();
private int[] blockIndices;
private Token<BlockTokenIdentifier>[] blockTokens;
public LocatedStripedBlock(ExtendedBlock b, DatanodeInfo[] locs,
String[] storageIDs, StorageType[] storageTypes, int[] indices,
@ -46,6 +50,10 @@ public LocatedStripedBlock(ExtendedBlock b, DatanodeInfo[] locs,
this.blockIndices = new int[indices.length];
System.arraycopy(indices, 0, blockIndices, 0, indices.length);
}
blockTokens = new Token[blockIndices.length];
for (int i = 0; i < blockIndices.length; i++) {
blockTokens[i] = EMPTY_TOKEN;
}
}
@Override
@ -67,4 +75,12 @@ public int[] getBlockIndices() {
public boolean isStriped() {
return true;
}
public Token<BlockTokenIdentifier>[] getBlockTokens() {
return blockTokens;
}
public void setBlockTokens(Token<BlockTokenIdentifier>[] tokens) {
this.blockTokens = tokens;
}
}

View File

@ -362,3 +362,5 @@
HDFS-8787. Erasure coding: rename BlockInfoContiguousUC and BlockInfoStripedUC
to be consistent with trunk. (zhz)
HDFS-8433. Erasure coding: set blockToken in LocatedStripedBlock.(waltersu4549)

View File

@ -813,9 +813,12 @@ public static LocatedBlockProto convertLocatedBlock(LocatedBlock b) {
builder.addAllStorageIDs(Arrays.asList(storageIDs));
}
if (b instanceof LocatedStripedBlock) {
int[] indices = ((LocatedStripedBlock) b).getBlockIndices();
for (int index : indices) {
builder.addBlockIndex(index);
LocatedStripedBlock sb = (LocatedStripedBlock) b;
int[] indices = sb.getBlockIndices();
Token<BlockTokenIdentifier>[] blockTokens = sb.getBlockTokens();
for (int i = 0; i < indices.length; i++) {
builder.addBlockIndex(indices[i]);
builder.addBlockTokens(PBHelper.convert(blockTokens[i]));
}
}
@ -872,6 +875,12 @@ public static LocatedBlock convertLocatedBlockProto(LocatedBlockProto proto) {
storageIDs, storageTypes, indices, proto.getOffset(),
proto.getCorrupt(),
cachedLocs.toArray(new DatanodeInfo[cachedLocs.size()]));
List<TokenProto> tokenProtos = proto.getBlockTokensList();
Token<BlockTokenIdentifier>[] blockTokens = new Token[indices.length];
for (int i = 0; i < indices.length; i++) {
blockTokens[i] = PBHelper.convert(tokenProtos.get(i));
}
((LocatedStripedBlock) lb).setBlockTokens(blockTokens);
}
lb.setBlockToken(PBHelper.convert(proto.getBlockToken()));

View File

@ -92,6 +92,7 @@
import org.apache.hadoop.net.Node;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.LightWeightGSet;
import org.apache.hadoop.util.Time;
@ -989,9 +990,23 @@ public void setBlockToken(final LocatedBlock b,
final AccessMode mode) throws IOException {
if (isBlockTokenEnabled()) {
// Use cached UGI if serving RPC calls.
b.setBlockToken(blockTokenSecretManager.generateToken(
NameNode.getRemoteUser().getShortUserName(),
b.getBlock(), EnumSet.of(mode)));
if (b.isStriped()) {
LocatedStripedBlock sb = (LocatedStripedBlock) b;
int[] indices = sb.getBlockIndices();
Token<BlockTokenIdentifier>[] blockTokens = new Token[indices.length];
ExtendedBlock internalBlock = new ExtendedBlock(b.getBlock());
for (int i = 0; i < indices.length; i++) {
internalBlock.setBlockId(b.getBlock().getBlockId() + indices[i]);
blockTokens[i] = blockTokenSecretManager.generateToken(
NameNode.getRemoteUser().getShortUserName(),
internalBlock, EnumSet.of(mode));
}
sb.setBlockTokens(blockTokens);
} else {
b.setBlockToken(blockTokenSecretManager.generateToken(
NameNode.getRemoteUser().getShortUserName(),
b.getBlock(), EnumSet.of(mode)));
}
}
}

View File

@ -30,8 +30,10 @@
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
import org.apache.hadoop.security.token.Token;
import java.nio.ByteBuffer;
import java.util.*;
@ -105,17 +107,22 @@ public static LocatedBlock constructInternalBlock(LocatedStripedBlock bg,
int idxInBlockGroup) {
final ExtendedBlock blk = constructInternalBlock(
bg.getBlock(), cellSize, dataBlkNum, idxInBlockGroup);
final LocatedBlock locatedBlock;
if (idxInReturnedLocs < bg.getLocations().length) {
return new LocatedBlock(blk,
locatedBlock = new LocatedBlock(blk,
new DatanodeInfo[]{bg.getLocations()[idxInReturnedLocs]},
new String[]{bg.getStorageIDs()[idxInReturnedLocs]},
new StorageType[]{bg.getStorageTypes()[idxInReturnedLocs]},
bg.getStartOffset(), bg.isCorrupt(), null);
} else {
return new LocatedBlock(blk, null, null, null,
locatedBlock = new LocatedBlock(blk, null, null, null,
bg.getStartOffset(), bg.isCorrupt(), null);
}
Token<BlockTokenIdentifier>[] blockTokens = bg.getBlockTokens();
if (idxInBlockGroup < blockTokens.length) {
locatedBlock.setBlockToken(blockTokens[idxInBlockGroup]);
}
return locatedBlock;
}
/**

View File

@ -220,7 +220,10 @@ message LocatedBlockProto {
repeated bool isCached = 6 [packed=true]; // if a location in locs is cached
repeated StorageTypeProto storageTypes = 7;
repeated string storageIDs = 8;
// striped block related fields
repeated uint32 blockIndex = 9; // used for striped block to indicate block index for each storage
repeated hadoop.common.TokenProto blockTokens = 10; // each internal block has a block token
}
message DataEncryptionKeyProto {

View File

@ -24,7 +24,6 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.EnumSet;
import java.util.List;
import java.util.Random;
@ -69,28 +68,32 @@
public class TestBlockTokenWithDFS {
private static final int BLOCK_SIZE = 1024;
private static final int FILE_SIZE = 2 * BLOCK_SIZE;
protected static int BLOCK_SIZE = 1024;
protected static int FILE_SIZE = 2 * BLOCK_SIZE;
private static final String FILE_TO_READ = "/fileToRead.dat";
private static final String FILE_TO_WRITE = "/fileToWrite.dat";
private static final String FILE_TO_APPEND = "/fileToAppend.dat";
private final byte[] rawData = new byte[FILE_SIZE];
{
((Log4JLogger) DFSClient.LOG).getLogger().setLevel(Level.ALL);
Random r = new Random();
r.nextBytes(rawData);
}
private void createFile(FileSystem fs, Path filename) throws IOException {
public static byte[] generateBytes(int fileSize){
Random r = new Random();
byte[] rawData = new byte[fileSize];
r.nextBytes(rawData);
return rawData;
}
private void createFile(FileSystem fs, Path filename, byte[] expected) throws IOException {
FSDataOutputStream out = fs.create(filename);
out.write(rawData);
out.write(expected);
out.close();
}
// read a file using blockSeekTo()
private boolean checkFile1(FSDataInputStream in) {
byte[] toRead = new byte[FILE_SIZE];
private boolean checkFile1(FSDataInputStream in, byte[] expected) {
byte[] toRead = new byte[expected.length];
int totalRead = 0;
int nRead = 0;
try {
@ -101,27 +104,27 @@ private boolean checkFile1(FSDataInputStream in) {
return false;
}
assertEquals("Cannot read file.", toRead.length, totalRead);
return checkFile(toRead);
return checkFile(toRead, expected);
}
// read a file using fetchBlockByteRange()
private boolean checkFile2(FSDataInputStream in) {
byte[] toRead = new byte[FILE_SIZE];
private boolean checkFile2(FSDataInputStream in, byte[] expected) {
byte[] toRead = new byte[expected.length];
try {
assertEquals("Cannot read file", toRead.length, in.read(0, toRead, 0,
toRead.length));
} catch (IOException e) {
return false;
}
return checkFile(toRead);
return checkFile(toRead, expected);
}
private boolean checkFile(byte[] fileToCheck) {
if (fileToCheck.length != rawData.length) {
private boolean checkFile(byte[] fileToCheck, byte[] expected) {
if (fileToCheck.length != expected.length) {
return false;
}
for (int i = 0; i < fileToCheck.length; i++) {
if (fileToCheck[i] != rawData[i]) {
if (fileToCheck[i] != expected[i]) {
return false;
}
}
@ -137,7 +140,7 @@ private static FSDataOutputStream writeFile(FileSystem fileSys, Path name,
}
// try reading a block using a BlockReader directly
private static void tryRead(final Configuration conf, LocatedBlock lblock,
protected void tryRead(final Configuration conf, LocatedBlock lblock,
boolean shouldSucceed) {
InetSocketAddress targetAddr = null;
IOException ioe = null;
@ -148,7 +151,7 @@ private static void tryRead(final Configuration conf, LocatedBlock lblock,
targetAddr = NetUtils.createSocketAddr(nodes[0].getXferAddr());
blockReader = new BlockReaderFactory(new DfsClientConf(conf)).
setFileName(BlockReaderFactory.getFileName(targetAddr,
setFileName(BlockReaderFactory.getFileName(targetAddr,
"test-blockpoolid", block.getBlockId())).
setBlock(block).
setBlockToken(lblock.getBlockToken()).
@ -205,7 +208,7 @@ public Peer newConnectedPeer(InetSocketAddress addr,
}
// get a conf for testing
private static Configuration getConf(int numDataNodes) {
protected Configuration getConf(int numDataNodes) {
Configuration conf = new Configuration();
conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
@ -241,16 +244,16 @@ public void testAppend() throws Exception {
SecurityTestUtil.setBlockTokenLifetime(sm, 1000L);
Path fileToAppend = new Path(FILE_TO_APPEND);
FileSystem fs = cluster.getFileSystem();
byte[] expected = generateBytes(FILE_SIZE);
// write a one-byte file
FSDataOutputStream stm = writeFile(fs, fileToAppend,
(short) numDataNodes, BLOCK_SIZE);
stm.write(rawData, 0, 1);
stm.write(expected, 0, 1);
stm.close();
// open the file again for append
stm = fs.append(fileToAppend);
int mid = rawData.length - 1;
stm.write(rawData, 1, mid - 1);
int mid = expected.length - 1;
stm.write(expected, 1, mid - 1);
stm.hflush();
/*
@ -267,11 +270,11 @@ public void testAppend() throws Exception {
// remove a datanode to force re-establishing pipeline
cluster.stopDataNode(0);
// append the rest of the file
stm.write(rawData, mid, rawData.length - mid);
stm.write(expected, mid, expected.length - mid);
stm.close();
// check if append is successful
FSDataInputStream in5 = fs.open(fileToAppend);
assertTrue(checkFile1(in5));
assertTrue(checkFile1(in5, expected));
} finally {
if (cluster != null) {
cluster.shutdown();
@ -303,11 +306,12 @@ public void testWrite() throws Exception {
Path fileToWrite = new Path(FILE_TO_WRITE);
FileSystem fs = cluster.getFileSystem();
byte[] expected = generateBytes(FILE_SIZE);
FSDataOutputStream stm = writeFile(fs, fileToWrite, (short) numDataNodes,
BLOCK_SIZE);
// write a partial block
int mid = rawData.length - 1;
stm.write(rawData, 0, mid);
int mid = expected.length - 1;
stm.write(expected, 0, mid);
stm.hflush();
/*
@ -324,11 +328,11 @@ public void testWrite() throws Exception {
// remove a datanode to force re-establishing pipeline
cluster.stopDataNode(0);
// write the rest of the file
stm.write(rawData, mid, rawData.length - mid);
stm.write(expected, mid, expected.length - mid);
stm.close();
// check if write is successful
FSDataInputStream in4 = fs.open(fileToWrite);
assertTrue(checkFile1(in4));
assertTrue(checkFile1(in4, expected));
} finally {
if (cluster != null) {
cluster.shutdown();
@ -346,125 +350,137 @@ public void testRead() throws Exception {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build();
cluster.waitActive();
assertEquals(numDataNodes, cluster.getDataNodes().size());
doTestRead(conf, cluster, false);
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
final NameNode nn = cluster.getNameNode();
final NamenodeProtocols nnProto = nn.getRpcServer();
final BlockManager bm = nn.getNamesystem().getBlockManager();
final BlockTokenSecretManager sm = bm.getBlockTokenSecretManager();
protected void doTestRead(Configuration conf, MiniDFSCluster cluster,
boolean isStriped) throws Exception {
final int numDataNodes = cluster.getDataNodes().size();
final NameNode nn = cluster.getNameNode();
final NamenodeProtocols nnProto = nn.getRpcServer();
final BlockManager bm = nn.getNamesystem().getBlockManager();
final BlockTokenSecretManager sm = bm.getBlockTokenSecretManager();
// set a short token lifetime (1 second) initially
SecurityTestUtil.setBlockTokenLifetime(sm, 1000L);
// set a short token lifetime (1 second) initially
SecurityTestUtil.setBlockTokenLifetime(sm, 1000L);
Path fileToRead = new Path(FILE_TO_READ);
FileSystem fs = cluster.getFileSystem();
createFile(fs, fileToRead);
Path fileToRead = new Path(FILE_TO_READ);
FileSystem fs = cluster.getFileSystem();
byte[] expected = generateBytes(FILE_SIZE);
createFile(fs, fileToRead, expected);
/*
* setup for testing expiration handling of cached tokens
*/
// read using blockSeekTo(). Acquired tokens are cached in in1
FSDataInputStream in1 = fs.open(fileToRead);
assertTrue(checkFile1(in1));
// read using blockSeekTo(). Acquired tokens are cached in in2
FSDataInputStream in2 = fs.open(fileToRead);
assertTrue(checkFile1(in2));
// read using fetchBlockByteRange(). Acquired tokens are cached in in3
FSDataInputStream in3 = fs.open(fileToRead);
assertTrue(checkFile2(in3));
// read using blockSeekTo(). Acquired tokens are cached in in1
FSDataInputStream in1 = fs.open(fileToRead);
assertTrue(checkFile1(in1,expected));
// read using blockSeekTo(). Acquired tokens are cached in in2
FSDataInputStream in2 = fs.open(fileToRead);
assertTrue(checkFile1(in2,expected));
// read using fetchBlockByteRange(). Acquired tokens are cached in in3
FSDataInputStream in3 = fs.open(fileToRead);
assertTrue(checkFile2(in3,expected));
/*
* testing READ interface on DN using a BlockReader
*/
DFSClient client = null;
try {
client = new DFSClient(new InetSocketAddress("localhost",
DFSClient client = null;
try {
client = new DFSClient(new InetSocketAddress("localhost",
cluster.getNameNodePort()), conf);
} finally {
if (client != null) client.close();
}
List<LocatedBlock> locatedBlocks = nnProto.getBlockLocations(
FILE_TO_READ, 0, FILE_SIZE).getLocatedBlocks();
LocatedBlock lblock = locatedBlocks.get(0); // first block
Token<BlockTokenIdentifier> myToken = lblock.getBlockToken();
// verify token is not expired
assertFalse(SecurityTestUtil.isBlockTokenExpired(myToken));
// read with valid token, should succeed
tryRead(conf, lblock, true);
} finally {
if (client != null) client.close();
}
List<LocatedBlock> locatedBlocks = nnProto.getBlockLocations(
FILE_TO_READ, 0, FILE_SIZE).getLocatedBlocks();
LocatedBlock lblock = locatedBlocks.get(0); // first block
// verify token is not expired
assertFalse(isBlockTokenExpired(lblock));
// read with valid token, should succeed
tryRead(conf, lblock, true);
/*
* wait till myToken and all cached tokens in in1, in2 and in3 expire
*/
while (!SecurityTestUtil.isBlockTokenExpired(myToken)) {
try {
Thread.sleep(10);
} catch (InterruptedException ignored) {
}
while (!isBlockTokenExpired(lblock)) {
try {
Thread.sleep(10);
} catch (InterruptedException ignored) {
}
}
/*
* continue testing READ interface on DN using a BlockReader
*/
// verify token is expired
assertTrue(SecurityTestUtil.isBlockTokenExpired(myToken));
// read should fail
tryRead(conf, lblock, false);
// use a valid new token
lblock.setBlockToken(sm.generateToken(lblock.getBlock(),
EnumSet.of(BlockTokenIdentifier.AccessMode.READ)));
// read should succeed
tryRead(conf, lblock, true);
// use a token with wrong blockID
ExtendedBlock wrongBlock = new ExtendedBlock(lblock.getBlock()
.getBlockPoolId(), lblock.getBlock().getBlockId() + 1);
lblock.setBlockToken(sm.generateToken(wrongBlock,
EnumSet.of(BlockTokenIdentifier.AccessMode.READ)));
// read should fail
tryRead(conf, lblock, false);
// use a token with wrong access modes
lblock.setBlockToken(sm.generateToken(lblock.getBlock(),
EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE,
BlockTokenIdentifier.AccessMode.COPY,
BlockTokenIdentifier.AccessMode.REPLACE)));
// read should fail
tryRead(conf, lblock, false);
// verify token is expired
assertTrue(isBlockTokenExpired(lblock));
// read should fail
tryRead(conf, lblock, false);
// use a valid new token
bm.setBlockToken(lblock, BlockTokenIdentifier.AccessMode.READ);
// read should succeed
tryRead(conf, lblock, true);
// use a token with wrong blockID
long rightId = lblock.getBlock().getBlockId();
long wrongId = rightId + 1;
lblock.getBlock().setBlockId(wrongId);
bm.setBlockToken(lblock, BlockTokenIdentifier.AccessMode.READ);
lblock.getBlock().setBlockId(rightId);
// read should fail
tryRead(conf, lblock, false);
// use a token with wrong access modes
bm.setBlockToken(lblock, BlockTokenIdentifier.AccessMode.WRITE);
// read should fail
tryRead(conf, lblock, false);
// set a long token lifetime for future tokens
SecurityTestUtil.setBlockTokenLifetime(sm, 600 * 1000L);
// set a long token lifetime for future tokens
SecurityTestUtil.setBlockTokenLifetime(sm, 600 * 1000L);
/*
* testing that when cached tokens are expired, DFSClient will re-fetch
* tokens transparently for READ.
*/
// confirm all tokens cached in in1 are expired by now
List<LocatedBlock> lblocks = DFSTestUtil.getAllBlocks(in1);
for (LocatedBlock blk : lblocks) {
assertTrue(SecurityTestUtil.isBlockTokenExpired(blk.getBlockToken()));
}
// verify blockSeekTo() is able to re-fetch token transparently
in1.seek(0);
assertTrue(checkFile1(in1));
// confirm all tokens cached in in1 are expired by now
List<LocatedBlock> lblocks = DFSTestUtil.getAllBlocks(in1);
for (LocatedBlock blk : lblocks) {
assertTrue(isBlockTokenExpired(blk));
}
// verify blockSeekTo() is able to re-fetch token transparently
in1.seek(0);
assertTrue(checkFile1(in1, expected));
// confirm all tokens cached in in2 are expired by now
List<LocatedBlock> lblocks2 = DFSTestUtil.getAllBlocks(in2);
for (LocatedBlock blk : lblocks2) {
assertTrue(SecurityTestUtil.isBlockTokenExpired(blk.getBlockToken()));
}
// verify blockSeekTo() is able to re-fetch token transparently (testing
// via another interface method)
// confirm all tokens cached in in2 are expired by now
List<LocatedBlock> lblocks2 = DFSTestUtil.getAllBlocks(in2);
for (LocatedBlock blk : lblocks2) {
assertTrue(isBlockTokenExpired(blk));
}
// verify blockSeekTo() is able to re-fetch token transparently (testing
// via another interface method)
if (isStriped) {
// striped block doesn't support seekToNewSource
in2.seek(0);
} else {
assertTrue(in2.seekToNewSource(0));
assertTrue(checkFile1(in2));
}
assertTrue(checkFile1(in2,expected));
// confirm all tokens cached in in3 are expired by now
List<LocatedBlock> lblocks3 = DFSTestUtil.getAllBlocks(in3);
for (LocatedBlock blk : lblocks3) {
assertTrue(SecurityTestUtil.isBlockTokenExpired(blk.getBlockToken()));
}
// verify fetchBlockByteRange() is able to re-fetch token transparently
assertTrue(checkFile2(in3));
// confirm all tokens cached in in3 are expired by now
List<LocatedBlock> lblocks3 = DFSTestUtil.getAllBlocks(in3);
for (LocatedBlock blk : lblocks3) {
assertTrue(isBlockTokenExpired(blk));
}
// verify fetchBlockByteRange() is able to re-fetch token transparently
assertTrue(checkFile2(in3,expected));
/*
* testing that after datanodes are restarted on the same ports, cached
@ -473,37 +489,42 @@ public void testRead() throws Exception {
* new tokens can be fetched from namenode).
*/
// restart datanodes on the same ports that they currently use
assertTrue(cluster.restartDataNodes(true));
cluster.waitActive();
assertEquals(numDataNodes, cluster.getDataNodes().size());
cluster.shutdownNameNode(0);
// restart datanodes on the same ports that they currently use
assertTrue(cluster.restartDataNodes(true));
cluster.waitActive();
assertEquals(numDataNodes, cluster.getDataNodes().size());
cluster.shutdownNameNode(0);
// confirm tokens cached in in1 are still valid
lblocks = DFSTestUtil.getAllBlocks(in1);
for (LocatedBlock blk : lblocks) {
assertFalse(SecurityTestUtil.isBlockTokenExpired(blk.getBlockToken()));
}
// verify blockSeekTo() still works (forced to use cached tokens)
in1.seek(0);
assertTrue(checkFile1(in1));
// confirm tokens cached in in1 are still valid
lblocks = DFSTestUtil.getAllBlocks(in1);
for (LocatedBlock blk : lblocks) {
assertFalse(isBlockTokenExpired(blk));
}
// verify blockSeekTo() still works (forced to use cached tokens)
in1.seek(0);
assertTrue(checkFile1(in1,expected));
// confirm tokens cached in in2 are still valid
lblocks2 = DFSTestUtil.getAllBlocks(in2);
for (LocatedBlock blk : lblocks2) {
assertFalse(SecurityTestUtil.isBlockTokenExpired(blk.getBlockToken()));
}
// verify blockSeekTo() still works (forced to use cached tokens)
// confirm tokens cached in in2 are still valid
lblocks2 = DFSTestUtil.getAllBlocks(in2);
for (LocatedBlock blk : lblocks2) {
assertFalse(isBlockTokenExpired(blk));
}
// verify blockSeekTo() still works (forced to use cached tokens)
if (isStriped) {
in2.seek(0);
} else {
in2.seekToNewSource(0);
assertTrue(checkFile1(in2));
}
assertTrue(checkFile1(in2,expected));
// confirm tokens cached in in3 are still valid
lblocks3 = DFSTestUtil.getAllBlocks(in3);
for (LocatedBlock blk : lblocks3) {
assertFalse(SecurityTestUtil.isBlockTokenExpired(blk.getBlockToken()));
}
// verify fetchBlockByteRange() still works (forced to use cached tokens)
assertTrue(checkFile2(in3));
// confirm tokens cached in in3 are still valid
lblocks3 = DFSTestUtil.getAllBlocks(in3);
for (LocatedBlock blk : lblocks3) {
assertFalse(isBlockTokenExpired(blk));
}
// verify fetchBlockByteRange() still works (forced to use cached tokens)
assertTrue(checkFile2(in3,expected));
/*
* testing that when namenode is restarted, cached tokens should still
@ -512,18 +533,23 @@ public void testRead() throws Exception {
* setup for this test depends on the previous test.
*/
// restart the namenode and then shut it down for test
cluster.restartNameNode(0);
cluster.shutdownNameNode(0);
// restart the namenode and then shut it down for test
cluster.restartNameNode(0);
cluster.shutdownNameNode(0);
// verify blockSeekTo() still works (forced to use cached tokens)
in1.seek(0);
assertTrue(checkFile1(in1));
// verify again blockSeekTo() still works (forced to use cached tokens)
// verify blockSeekTo() still works (forced to use cached tokens)
in1.seek(0);
assertTrue(checkFile1(in1,expected));
// verify again blockSeekTo() still works (forced to use cached tokens)
if (isStriped) {
in2.seek(0);
} else {
in2.seekToNewSource(0);
assertTrue(checkFile1(in2));
// verify fetchBlockByteRange() still works (forced to use cached tokens)
assertTrue(checkFile2(in3));
}
assertTrue(checkFile1(in2,expected));
// verify fetchBlockByteRange() still works (forced to use cached tokens)
assertTrue(checkFile2(in3,expected));
/*
* testing that after both namenode and datanodes got restarted (namenode
@ -532,58 +558,60 @@ public void testRead() throws Exception {
* setup of this test depends on the previous test.
*/
// restore the cluster and restart the datanodes for test
cluster.restartNameNode(0);
assertTrue(cluster.restartDataNodes(true));
cluster.waitActive();
assertEquals(numDataNodes, cluster.getDataNodes().size());
// restore the cluster and restart the datanodes for test
cluster.restartNameNode(0);
assertTrue(cluster.restartDataNodes(true));
cluster.waitActive();
assertEquals(numDataNodes, cluster.getDataNodes().size());
// shutdown namenode so that DFSClient can't get new tokens from namenode
cluster.shutdownNameNode(0);
// shutdown namenode so that DFSClient can't get new tokens from namenode
cluster.shutdownNameNode(0);
// verify blockSeekTo() fails (cached tokens become invalid)
in1.seek(0);
assertFalse(checkFile1(in1));
// verify fetchBlockByteRange() fails (cached tokens become invalid)
assertFalse(checkFile2(in3));
// verify blockSeekTo() fails (cached tokens become invalid)
in1.seek(0);
assertFalse(checkFile1(in1,expected));
// verify fetchBlockByteRange() fails (cached tokens become invalid)
assertFalse(checkFile2(in3,expected));
// restart the namenode to allow DFSClient to re-fetch tokens
cluster.restartNameNode(0);
// verify blockSeekTo() works again (by transparently re-fetching
// tokens from namenode)
in1.seek(0);
assertTrue(checkFile1(in1));
// restart the namenode to allow DFSClient to re-fetch tokens
cluster.restartNameNode(0);
// verify blockSeekTo() works again (by transparently re-fetching
// tokens from namenode)
in1.seek(0);
assertTrue(checkFile1(in1,expected));
if (isStriped) {
in2.seek(0);
} else {
in2.seekToNewSource(0);
assertTrue(checkFile1(in2));
// verify fetchBlockByteRange() works again (by transparently
// re-fetching tokens from namenode)
assertTrue(checkFile2(in3));
}
assertTrue(checkFile1(in2,expected));
// verify fetchBlockByteRange() works again (by transparently
// re-fetching tokens from namenode)
assertTrue(checkFile2(in3,expected));
/*
* testing that when datanodes are restarted on different ports, DFSClient
* is able to re-fetch tokens transparently to connect to them
*/
// restart datanodes on newly assigned ports
assertTrue(cluster.restartDataNodes(false));
cluster.waitActive();
assertEquals(numDataNodes, cluster.getDataNodes().size());
// verify blockSeekTo() is able to re-fetch token transparently
in1.seek(0);
assertTrue(checkFile1(in1));
// verify blockSeekTo() is able to re-fetch token transparently
// restart datanodes on newly assigned ports
assertTrue(cluster.restartDataNodes(false));
cluster.waitActive();
assertEquals(numDataNodes, cluster.getDataNodes().size());
// verify blockSeekTo() is able to re-fetch token transparently
in1.seek(0);
assertTrue(checkFile1(in1,expected));
// verify blockSeekTo() is able to re-fetch token transparently
if (isStriped) {
in2.seek(0);
} else {
in2.seekToNewSource(0);
assertTrue(checkFile1(in2));
// verify fetchBlockByteRange() is able to re-fetch token transparently
assertTrue(checkFile2(in3));
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
assertTrue(checkFile1(in2,expected));
// verify fetchBlockByteRange() is able to re-fetch token transparently
assertTrue(checkFile2(in3,expected));
}
/**
* Integration testing of access token, involving NN, DN, and Balancer
*/
@ -593,4 +621,8 @@ public void testEnd2End() throws Exception {
conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
new TestBalancer().integrationTest(conf);
}
protected boolean isBlockTokenExpired(LocatedBlock lb) throws IOException {
return SecurityTestUtil.isBlockTokenExpired(lb.getBlockToken());
}
}

View File

@ -0,0 +1,119 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
public class TestBlockTokenWithDFSStriped extends TestBlockTokenWithDFS {
private final static int dataBlocks = HdfsConstants.NUM_DATA_BLOCKS;
private final static int parityBlocks = HdfsConstants.NUM_PARITY_BLOCKS;
private final static int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
private final static int stripesPerBlock = 4;
private final static int numDNs = dataBlocks + parityBlocks + 2;
private static MiniDFSCluster cluster;
private static Configuration conf;
{
BLOCK_SIZE = cellSize * stripesPerBlock;
FILE_SIZE = BLOCK_SIZE * dataBlocks * 3;
}
@Before
public void setup() throws IOException {
conf = getConf();
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
cluster.getFileSystem().getClient()
.createErasureCodingZone("/", null, cellSize);
cluster.waitActive();
}
@After
public void tearDown() {
if (cluster != null) {
cluster.shutdown();
}
}
private Configuration getConf() {
Configuration conf = super.getConf(numDNs);
conf.setInt("io.bytes.per.checksum", cellSize);
return conf;
}
@Test
@Override
public void testRead() throws Exception {
//TODO: DFSStripedInputStream handles token expiration
// doTestRead(conf, cluster, true);
}
@Test
@Override
public void testWrite() throws Exception {
//TODO: DFSStripedOutputStream handles token expiration
}
@Test
@Override
public void testAppend() throws Exception {
//TODO: support Append for striped file
}
@Test
@Override
public void testEnd2End() throws Exception {
//TODO: DFSStripedOutputStream handles token expiration
}
@Override
protected void tryRead(final Configuration conf, LocatedBlock lblock,
boolean shouldSucceed) {
LocatedStripedBlock lsb = (LocatedStripedBlock) lblock;
LocatedBlock[] internalBlocks = StripedBlockUtil.parseStripedBlockGroup
(lsb, cellSize, dataBlocks, parityBlocks);
for (LocatedBlock internalBlock : internalBlocks) {
super.tryRead(conf, internalBlock, shouldSucceed);
}
}
@Override
protected boolean isBlockTokenExpired(LocatedBlock lb) throws IOException {
LocatedStripedBlock lsb = (LocatedStripedBlock) lb;
LocatedBlock[] internalBlocks = StripedBlockUtil.parseStripedBlockGroup
(lsb, cellSize, dataBlocks, parityBlocks);
for (LocatedBlock internalBlock : internalBlocks) {
if(super.isBlockTokenExpired(internalBlock)){
return true;
}
}
return false;
}
}