diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedStripedBlock.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedStripedBlock.java index dc5a77fab2..6e62220ac9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedStripedBlock.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedStripedBlock.java @@ -20,6 +20,8 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.security.token.Token; import java.util.Arrays; @@ -32,8 +34,10 @@ @InterfaceStability.Evolving public class LocatedStripedBlock extends LocatedBlock { private static final int[] EMPTY_INDICES = {}; + private static final Token EMPTY_TOKEN = new Token<>(); private int[] blockIndices; + private Token[] blockTokens; public LocatedStripedBlock(ExtendedBlock b, DatanodeInfo[] locs, String[] storageIDs, StorageType[] storageTypes, int[] indices, @@ -46,6 +50,10 @@ public LocatedStripedBlock(ExtendedBlock b, DatanodeInfo[] locs, this.blockIndices = new int[indices.length]; System.arraycopy(indices, 0, blockIndices, 0, indices.length); } + blockTokens = new Token[blockIndices.length]; + for (int i = 0; i < blockIndices.length; i++) { + blockTokens[i] = EMPTY_TOKEN; + } } @Override @@ -67,4 +75,12 @@ public int[] getBlockIndices() { public boolean isStriped() { return true; } + + public Token[] getBlockTokens() { + return blockTokens; + } + + public void setBlockTokens(Token[] tokens) { + this.blockTokens = tokens; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt index b135c0837e..4709388662 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt @@ -362,3 +362,5 @@ HDFS-8787. Erasure coding: rename BlockInfoContiguousUC and BlockInfoStripedUC to be consistent with trunk. (zhz) + + HDFS-8433. Erasure coding: set blockToken in LocatedStripedBlock.(waltersu4549) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index 6baa005c67..1bc096498e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@ -813,9 +813,12 @@ public static LocatedBlockProto convertLocatedBlock(LocatedBlock b) { builder.addAllStorageIDs(Arrays.asList(storageIDs)); } if (b instanceof LocatedStripedBlock) { - int[] indices = ((LocatedStripedBlock) b).getBlockIndices(); - for (int index : indices) { - builder.addBlockIndex(index); + LocatedStripedBlock sb = (LocatedStripedBlock) b; + int[] indices = sb.getBlockIndices(); + Token[] blockTokens = sb.getBlockTokens(); + for (int i = 0; i < indices.length; i++) { + builder.addBlockIndex(indices[i]); + builder.addBlockTokens(PBHelper.convert(blockTokens[i])); } } @@ -872,6 +875,12 @@ public static LocatedBlock convertLocatedBlockProto(LocatedBlockProto proto) { storageIDs, storageTypes, indices, proto.getOffset(), proto.getCorrupt(), cachedLocs.toArray(new DatanodeInfo[cachedLocs.size()])); + List tokenProtos = proto.getBlockTokensList(); + Token[] blockTokens = new Token[indices.length]; + for (int i = 0; i < indices.length; i++) { + blockTokens[i] = PBHelper.convert(tokenProtos.get(i)); + } + ((LocatedStripedBlock) lb).setBlockTokens(blockTokens); } lb.setBlockToken(PBHelper.convert(proto.getBlockToken())); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index edcc14e92d..7872baa22e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -92,6 +92,7 @@ import org.apache.hadoop.net.Node; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.LightWeightGSet; import org.apache.hadoop.util.Time; @@ -989,9 +990,23 @@ public void setBlockToken(final LocatedBlock b, final AccessMode mode) throws IOException { if (isBlockTokenEnabled()) { // Use cached UGI if serving RPC calls. - b.setBlockToken(blockTokenSecretManager.generateToken( - NameNode.getRemoteUser().getShortUserName(), - b.getBlock(), EnumSet.of(mode))); + if (b.isStriped()) { + LocatedStripedBlock sb = (LocatedStripedBlock) b; + int[] indices = sb.getBlockIndices(); + Token[] blockTokens = new Token[indices.length]; + ExtendedBlock internalBlock = new ExtendedBlock(b.getBlock()); + for (int i = 0; i < indices.length; i++) { + internalBlock.setBlockId(b.getBlock().getBlockId() + indices[i]); + blockTokens[i] = blockTokenSecretManager.generateToken( + NameNode.getRemoteUser().getShortUserName(), + internalBlock, EnumSet.of(mode)); + } + sb.setBlockTokens(blockTokens); + } else { + b.setBlockToken(blockTokenSecretManager.generateToken( + NameNode.getRemoteUser().getShortUserName(), + b.getBlock(), EnumSet.of(mode))); + } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java index 6bd5e1f010..9b0939c6dc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java @@ -30,8 +30,10 @@ import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; import com.google.common.base.Preconditions; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.io.erasurecode.ECSchema; import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder; +import org.apache.hadoop.security.token.Token; import java.nio.ByteBuffer; import java.util.*; @@ -105,17 +107,22 @@ public static LocatedBlock constructInternalBlock(LocatedStripedBlock bg, int idxInBlockGroup) { final ExtendedBlock blk = constructInternalBlock( bg.getBlock(), cellSize, dataBlkNum, idxInBlockGroup); - + final LocatedBlock locatedBlock; if (idxInReturnedLocs < bg.getLocations().length) { - return new LocatedBlock(blk, + locatedBlock = new LocatedBlock(blk, new DatanodeInfo[]{bg.getLocations()[idxInReturnedLocs]}, new String[]{bg.getStorageIDs()[idxInReturnedLocs]}, new StorageType[]{bg.getStorageTypes()[idxInReturnedLocs]}, bg.getStartOffset(), bg.isCorrupt(), null); } else { - return new LocatedBlock(blk, null, null, null, + locatedBlock = new LocatedBlock(blk, null, null, null, bg.getStartOffset(), bg.isCorrupt(), null); } + Token[] blockTokens = bg.getBlockTokens(); + if (idxInBlockGroup < blockTokens.length) { + locatedBlock.setBlockToken(blockTokens[idxInBlockGroup]); + } + return locatedBlock; } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto index e1f944fd72..d2cb66538e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto @@ -220,7 +220,10 @@ message LocatedBlockProto { repeated bool isCached = 6 [packed=true]; // if a location in locs is cached repeated StorageTypeProto storageTypes = 7; repeated string storageIDs = 8; + + // striped block related fields repeated uint32 blockIndex = 9; // used for striped block to indicate block index for each storage + repeated hadoop.common.TokenProto blockTokens = 10; // each internal block has a block token } message DataEncryptionKeyProto { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java index 43f2992343..26ed1fe563 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java @@ -24,7 +24,6 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.net.Socket; -import java.util.EnumSet; import java.util.List; import java.util.Random; @@ -69,28 +68,32 @@ public class TestBlockTokenWithDFS { - private static final int BLOCK_SIZE = 1024; - private static final int FILE_SIZE = 2 * BLOCK_SIZE; + protected static int BLOCK_SIZE = 1024; + protected static int FILE_SIZE = 2 * BLOCK_SIZE; private static final String FILE_TO_READ = "/fileToRead.dat"; private static final String FILE_TO_WRITE = "/fileToWrite.dat"; private static final String FILE_TO_APPEND = "/fileToAppend.dat"; - private final byte[] rawData = new byte[FILE_SIZE]; { ((Log4JLogger) DFSClient.LOG).getLogger().setLevel(Level.ALL); - Random r = new Random(); - r.nextBytes(rawData); } - private void createFile(FileSystem fs, Path filename) throws IOException { + public static byte[] generateBytes(int fileSize){ + Random r = new Random(); + byte[] rawData = new byte[fileSize]; + r.nextBytes(rawData); + return rawData; + } + + private void createFile(FileSystem fs, Path filename, byte[] expected) throws IOException { FSDataOutputStream out = fs.create(filename); - out.write(rawData); + out.write(expected); out.close(); } // read a file using blockSeekTo() - private boolean checkFile1(FSDataInputStream in) { - byte[] toRead = new byte[FILE_SIZE]; + private boolean checkFile1(FSDataInputStream in, byte[] expected) { + byte[] toRead = new byte[expected.length]; int totalRead = 0; int nRead = 0; try { @@ -101,27 +104,27 @@ private boolean checkFile1(FSDataInputStream in) { return false; } assertEquals("Cannot read file.", toRead.length, totalRead); - return checkFile(toRead); + return checkFile(toRead, expected); } // read a file using fetchBlockByteRange() - private boolean checkFile2(FSDataInputStream in) { - byte[] toRead = new byte[FILE_SIZE]; + private boolean checkFile2(FSDataInputStream in, byte[] expected) { + byte[] toRead = new byte[expected.length]; try { assertEquals("Cannot read file", toRead.length, in.read(0, toRead, 0, toRead.length)); } catch (IOException e) { return false; } - return checkFile(toRead); + return checkFile(toRead, expected); } - private boolean checkFile(byte[] fileToCheck) { - if (fileToCheck.length != rawData.length) { + private boolean checkFile(byte[] fileToCheck, byte[] expected) { + if (fileToCheck.length != expected.length) { return false; } for (int i = 0; i < fileToCheck.length; i++) { - if (fileToCheck[i] != rawData[i]) { + if (fileToCheck[i] != expected[i]) { return false; } } @@ -137,7 +140,7 @@ private static FSDataOutputStream writeFile(FileSystem fileSys, Path name, } // try reading a block using a BlockReader directly - private static void tryRead(final Configuration conf, LocatedBlock lblock, + protected void tryRead(final Configuration conf, LocatedBlock lblock, boolean shouldSucceed) { InetSocketAddress targetAddr = null; IOException ioe = null; @@ -148,7 +151,7 @@ private static void tryRead(final Configuration conf, LocatedBlock lblock, targetAddr = NetUtils.createSocketAddr(nodes[0].getXferAddr()); blockReader = new BlockReaderFactory(new DfsClientConf(conf)). - setFileName(BlockReaderFactory.getFileName(targetAddr, + setFileName(BlockReaderFactory.getFileName(targetAddr, "test-blockpoolid", block.getBlockId())). setBlock(block). setBlockToken(lblock.getBlockToken()). @@ -205,7 +208,7 @@ public Peer newConnectedPeer(InetSocketAddress addr, } // get a conf for testing - private static Configuration getConf(int numDataNodes) { + protected Configuration getConf(int numDataNodes) { Configuration conf = new Configuration(); conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); @@ -241,16 +244,16 @@ public void testAppend() throws Exception { SecurityTestUtil.setBlockTokenLifetime(sm, 1000L); Path fileToAppend = new Path(FILE_TO_APPEND); FileSystem fs = cluster.getFileSystem(); - + byte[] expected = generateBytes(FILE_SIZE); // write a one-byte file FSDataOutputStream stm = writeFile(fs, fileToAppend, (short) numDataNodes, BLOCK_SIZE); - stm.write(rawData, 0, 1); + stm.write(expected, 0, 1); stm.close(); // open the file again for append stm = fs.append(fileToAppend); - int mid = rawData.length - 1; - stm.write(rawData, 1, mid - 1); + int mid = expected.length - 1; + stm.write(expected, 1, mid - 1); stm.hflush(); /* @@ -267,11 +270,11 @@ public void testAppend() throws Exception { // remove a datanode to force re-establishing pipeline cluster.stopDataNode(0); // append the rest of the file - stm.write(rawData, mid, rawData.length - mid); + stm.write(expected, mid, expected.length - mid); stm.close(); // check if append is successful FSDataInputStream in5 = fs.open(fileToAppend); - assertTrue(checkFile1(in5)); + assertTrue(checkFile1(in5, expected)); } finally { if (cluster != null) { cluster.shutdown(); @@ -303,11 +306,12 @@ public void testWrite() throws Exception { Path fileToWrite = new Path(FILE_TO_WRITE); FileSystem fs = cluster.getFileSystem(); + byte[] expected = generateBytes(FILE_SIZE); FSDataOutputStream stm = writeFile(fs, fileToWrite, (short) numDataNodes, BLOCK_SIZE); // write a partial block - int mid = rawData.length - 1; - stm.write(rawData, 0, mid); + int mid = expected.length - 1; + stm.write(expected, 0, mid); stm.hflush(); /* @@ -324,11 +328,11 @@ public void testWrite() throws Exception { // remove a datanode to force re-establishing pipeline cluster.stopDataNode(0); // write the rest of the file - stm.write(rawData, mid, rawData.length - mid); + stm.write(expected, mid, expected.length - mid); stm.close(); // check if write is successful FSDataInputStream in4 = fs.open(fileToWrite); - assertTrue(checkFile1(in4)); + assertTrue(checkFile1(in4, expected)); } finally { if (cluster != null) { cluster.shutdown(); @@ -346,125 +350,137 @@ public void testRead() throws Exception { cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build(); cluster.waitActive(); assertEquals(numDataNodes, cluster.getDataNodes().size()); + doTestRead(conf, cluster, false); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } - final NameNode nn = cluster.getNameNode(); - final NamenodeProtocols nnProto = nn.getRpcServer(); - final BlockManager bm = nn.getNamesystem().getBlockManager(); - final BlockTokenSecretManager sm = bm.getBlockTokenSecretManager(); + protected void doTestRead(Configuration conf, MiniDFSCluster cluster, + boolean isStriped) throws Exception { + final int numDataNodes = cluster.getDataNodes().size(); + final NameNode nn = cluster.getNameNode(); + final NamenodeProtocols nnProto = nn.getRpcServer(); + final BlockManager bm = nn.getNamesystem().getBlockManager(); + final BlockTokenSecretManager sm = bm.getBlockTokenSecretManager(); - // set a short token lifetime (1 second) initially - SecurityTestUtil.setBlockTokenLifetime(sm, 1000L); + // set a short token lifetime (1 second) initially + SecurityTestUtil.setBlockTokenLifetime(sm, 1000L); - Path fileToRead = new Path(FILE_TO_READ); - FileSystem fs = cluster.getFileSystem(); - createFile(fs, fileToRead); + Path fileToRead = new Path(FILE_TO_READ); + FileSystem fs = cluster.getFileSystem(); + byte[] expected = generateBytes(FILE_SIZE); + createFile(fs, fileToRead, expected); /* * setup for testing expiration handling of cached tokens */ - // read using blockSeekTo(). Acquired tokens are cached in in1 - FSDataInputStream in1 = fs.open(fileToRead); - assertTrue(checkFile1(in1)); - // read using blockSeekTo(). Acquired tokens are cached in in2 - FSDataInputStream in2 = fs.open(fileToRead); - assertTrue(checkFile1(in2)); - // read using fetchBlockByteRange(). Acquired tokens are cached in in3 - FSDataInputStream in3 = fs.open(fileToRead); - assertTrue(checkFile2(in3)); + // read using blockSeekTo(). Acquired tokens are cached in in1 + FSDataInputStream in1 = fs.open(fileToRead); + assertTrue(checkFile1(in1,expected)); + // read using blockSeekTo(). Acquired tokens are cached in in2 + FSDataInputStream in2 = fs.open(fileToRead); + assertTrue(checkFile1(in2,expected)); + // read using fetchBlockByteRange(). Acquired tokens are cached in in3 + FSDataInputStream in3 = fs.open(fileToRead); + assertTrue(checkFile2(in3,expected)); /* * testing READ interface on DN using a BlockReader */ - DFSClient client = null; - try { - client = new DFSClient(new InetSocketAddress("localhost", + DFSClient client = null; + try { + client = new DFSClient(new InetSocketAddress("localhost", cluster.getNameNodePort()), conf); - } finally { - if (client != null) client.close(); - } - List locatedBlocks = nnProto.getBlockLocations( - FILE_TO_READ, 0, FILE_SIZE).getLocatedBlocks(); - LocatedBlock lblock = locatedBlocks.get(0); // first block - Token myToken = lblock.getBlockToken(); - // verify token is not expired - assertFalse(SecurityTestUtil.isBlockTokenExpired(myToken)); - // read with valid token, should succeed - tryRead(conf, lblock, true); + } finally { + if (client != null) client.close(); + } + List locatedBlocks = nnProto.getBlockLocations( + FILE_TO_READ, 0, FILE_SIZE).getLocatedBlocks(); + LocatedBlock lblock = locatedBlocks.get(0); // first block + // verify token is not expired + assertFalse(isBlockTokenExpired(lblock)); + // read with valid token, should succeed + tryRead(conf, lblock, true); /* * wait till myToken and all cached tokens in in1, in2 and in3 expire */ - while (!SecurityTestUtil.isBlockTokenExpired(myToken)) { - try { - Thread.sleep(10); - } catch (InterruptedException ignored) { - } + while (!isBlockTokenExpired(lblock)) { + try { + Thread.sleep(10); + } catch (InterruptedException ignored) { } + } /* * continue testing READ interface on DN using a BlockReader */ - // verify token is expired - assertTrue(SecurityTestUtil.isBlockTokenExpired(myToken)); - // read should fail - tryRead(conf, lblock, false); - // use a valid new token - lblock.setBlockToken(sm.generateToken(lblock.getBlock(), - EnumSet.of(BlockTokenIdentifier.AccessMode.READ))); - // read should succeed - tryRead(conf, lblock, true); - // use a token with wrong blockID - ExtendedBlock wrongBlock = new ExtendedBlock(lblock.getBlock() - .getBlockPoolId(), lblock.getBlock().getBlockId() + 1); - lblock.setBlockToken(sm.generateToken(wrongBlock, - EnumSet.of(BlockTokenIdentifier.AccessMode.READ))); - // read should fail - tryRead(conf, lblock, false); - // use a token with wrong access modes - lblock.setBlockToken(sm.generateToken(lblock.getBlock(), - EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE, - BlockTokenIdentifier.AccessMode.COPY, - BlockTokenIdentifier.AccessMode.REPLACE))); - // read should fail - tryRead(conf, lblock, false); + // verify token is expired + assertTrue(isBlockTokenExpired(lblock)); + // read should fail + tryRead(conf, lblock, false); + // use a valid new token + bm.setBlockToken(lblock, BlockTokenIdentifier.AccessMode.READ); + // read should succeed + tryRead(conf, lblock, true); + // use a token with wrong blockID + long rightId = lblock.getBlock().getBlockId(); + long wrongId = rightId + 1; + lblock.getBlock().setBlockId(wrongId); + bm.setBlockToken(lblock, BlockTokenIdentifier.AccessMode.READ); + lblock.getBlock().setBlockId(rightId); + // read should fail + tryRead(conf, lblock, false); + // use a token with wrong access modes + bm.setBlockToken(lblock, BlockTokenIdentifier.AccessMode.WRITE); + // read should fail + tryRead(conf, lblock, false); - // set a long token lifetime for future tokens - SecurityTestUtil.setBlockTokenLifetime(sm, 600 * 1000L); + // set a long token lifetime for future tokens + SecurityTestUtil.setBlockTokenLifetime(sm, 600 * 1000L); /* * testing that when cached tokens are expired, DFSClient will re-fetch * tokens transparently for READ. */ - // confirm all tokens cached in in1 are expired by now - List lblocks = DFSTestUtil.getAllBlocks(in1); - for (LocatedBlock blk : lblocks) { - assertTrue(SecurityTestUtil.isBlockTokenExpired(blk.getBlockToken())); - } - // verify blockSeekTo() is able to re-fetch token transparently - in1.seek(0); - assertTrue(checkFile1(in1)); + // confirm all tokens cached in in1 are expired by now + List lblocks = DFSTestUtil.getAllBlocks(in1); + for (LocatedBlock blk : lblocks) { + assertTrue(isBlockTokenExpired(blk)); + } + // verify blockSeekTo() is able to re-fetch token transparently + in1.seek(0); + assertTrue(checkFile1(in1, expected)); - // confirm all tokens cached in in2 are expired by now - List lblocks2 = DFSTestUtil.getAllBlocks(in2); - for (LocatedBlock blk : lblocks2) { - assertTrue(SecurityTestUtil.isBlockTokenExpired(blk.getBlockToken())); - } - // verify blockSeekTo() is able to re-fetch token transparently (testing - // via another interface method) + // confirm all tokens cached in in2 are expired by now + List lblocks2 = DFSTestUtil.getAllBlocks(in2); + for (LocatedBlock blk : lblocks2) { + assertTrue(isBlockTokenExpired(blk)); + } + // verify blockSeekTo() is able to re-fetch token transparently (testing + // via another interface method) + if (isStriped) { + // striped block doesn't support seekToNewSource + in2.seek(0); + } else { assertTrue(in2.seekToNewSource(0)); - assertTrue(checkFile1(in2)); + } + assertTrue(checkFile1(in2,expected)); - // confirm all tokens cached in in3 are expired by now - List lblocks3 = DFSTestUtil.getAllBlocks(in3); - for (LocatedBlock blk : lblocks3) { - assertTrue(SecurityTestUtil.isBlockTokenExpired(blk.getBlockToken())); - } - // verify fetchBlockByteRange() is able to re-fetch token transparently - assertTrue(checkFile2(in3)); + // confirm all tokens cached in in3 are expired by now + List lblocks3 = DFSTestUtil.getAllBlocks(in3); + for (LocatedBlock blk : lblocks3) { + assertTrue(isBlockTokenExpired(blk)); + } + // verify fetchBlockByteRange() is able to re-fetch token transparently + assertTrue(checkFile2(in3,expected)); /* * testing that after datanodes are restarted on the same ports, cached @@ -473,37 +489,42 @@ public void testRead() throws Exception { * new tokens can be fetched from namenode). */ - // restart datanodes on the same ports that they currently use - assertTrue(cluster.restartDataNodes(true)); - cluster.waitActive(); - assertEquals(numDataNodes, cluster.getDataNodes().size()); - cluster.shutdownNameNode(0); + // restart datanodes on the same ports that they currently use + assertTrue(cluster.restartDataNodes(true)); + cluster.waitActive(); + assertEquals(numDataNodes, cluster.getDataNodes().size()); + cluster.shutdownNameNode(0); - // confirm tokens cached in in1 are still valid - lblocks = DFSTestUtil.getAllBlocks(in1); - for (LocatedBlock blk : lblocks) { - assertFalse(SecurityTestUtil.isBlockTokenExpired(blk.getBlockToken())); - } - // verify blockSeekTo() still works (forced to use cached tokens) - in1.seek(0); - assertTrue(checkFile1(in1)); + // confirm tokens cached in in1 are still valid + lblocks = DFSTestUtil.getAllBlocks(in1); + for (LocatedBlock blk : lblocks) { + assertFalse(isBlockTokenExpired(blk)); + } + // verify blockSeekTo() still works (forced to use cached tokens) + in1.seek(0); + assertTrue(checkFile1(in1,expected)); - // confirm tokens cached in in2 are still valid - lblocks2 = DFSTestUtil.getAllBlocks(in2); - for (LocatedBlock blk : lblocks2) { - assertFalse(SecurityTestUtil.isBlockTokenExpired(blk.getBlockToken())); - } - // verify blockSeekTo() still works (forced to use cached tokens) + // confirm tokens cached in in2 are still valid + lblocks2 = DFSTestUtil.getAllBlocks(in2); + for (LocatedBlock blk : lblocks2) { + assertFalse(isBlockTokenExpired(blk)); + } + + // verify blockSeekTo() still works (forced to use cached tokens) + if (isStriped) { + in2.seek(0); + } else { in2.seekToNewSource(0); - assertTrue(checkFile1(in2)); + } + assertTrue(checkFile1(in2,expected)); - // confirm tokens cached in in3 are still valid - lblocks3 = DFSTestUtil.getAllBlocks(in3); - for (LocatedBlock blk : lblocks3) { - assertFalse(SecurityTestUtil.isBlockTokenExpired(blk.getBlockToken())); - } - // verify fetchBlockByteRange() still works (forced to use cached tokens) - assertTrue(checkFile2(in3)); + // confirm tokens cached in in3 are still valid + lblocks3 = DFSTestUtil.getAllBlocks(in3); + for (LocatedBlock blk : lblocks3) { + assertFalse(isBlockTokenExpired(blk)); + } + // verify fetchBlockByteRange() still works (forced to use cached tokens) + assertTrue(checkFile2(in3,expected)); /* * testing that when namenode is restarted, cached tokens should still @@ -512,18 +533,23 @@ public void testRead() throws Exception { * setup for this test depends on the previous test. */ - // restart the namenode and then shut it down for test - cluster.restartNameNode(0); - cluster.shutdownNameNode(0); + // restart the namenode and then shut it down for test + cluster.restartNameNode(0); + cluster.shutdownNameNode(0); - // verify blockSeekTo() still works (forced to use cached tokens) - in1.seek(0); - assertTrue(checkFile1(in1)); - // verify again blockSeekTo() still works (forced to use cached tokens) + // verify blockSeekTo() still works (forced to use cached tokens) + in1.seek(0); + assertTrue(checkFile1(in1,expected)); + // verify again blockSeekTo() still works (forced to use cached tokens) + if (isStriped) { + in2.seek(0); + } else { in2.seekToNewSource(0); - assertTrue(checkFile1(in2)); - // verify fetchBlockByteRange() still works (forced to use cached tokens) - assertTrue(checkFile2(in3)); + } + assertTrue(checkFile1(in2,expected)); + + // verify fetchBlockByteRange() still works (forced to use cached tokens) + assertTrue(checkFile2(in3,expected)); /* * testing that after both namenode and datanodes got restarted (namenode @@ -532,58 +558,60 @@ public void testRead() throws Exception { * setup of this test depends on the previous test. */ - // restore the cluster and restart the datanodes for test - cluster.restartNameNode(0); - assertTrue(cluster.restartDataNodes(true)); - cluster.waitActive(); - assertEquals(numDataNodes, cluster.getDataNodes().size()); + // restore the cluster and restart the datanodes for test + cluster.restartNameNode(0); + assertTrue(cluster.restartDataNodes(true)); + cluster.waitActive(); + assertEquals(numDataNodes, cluster.getDataNodes().size()); - // shutdown namenode so that DFSClient can't get new tokens from namenode - cluster.shutdownNameNode(0); + // shutdown namenode so that DFSClient can't get new tokens from namenode + cluster.shutdownNameNode(0); - // verify blockSeekTo() fails (cached tokens become invalid) - in1.seek(0); - assertFalse(checkFile1(in1)); - // verify fetchBlockByteRange() fails (cached tokens become invalid) - assertFalse(checkFile2(in3)); + // verify blockSeekTo() fails (cached tokens become invalid) + in1.seek(0); + assertFalse(checkFile1(in1,expected)); + // verify fetchBlockByteRange() fails (cached tokens become invalid) + assertFalse(checkFile2(in3,expected)); - // restart the namenode to allow DFSClient to re-fetch tokens - cluster.restartNameNode(0); - // verify blockSeekTo() works again (by transparently re-fetching - // tokens from namenode) - in1.seek(0); - assertTrue(checkFile1(in1)); + // restart the namenode to allow DFSClient to re-fetch tokens + cluster.restartNameNode(0); + // verify blockSeekTo() works again (by transparently re-fetching + // tokens from namenode) + in1.seek(0); + assertTrue(checkFile1(in1,expected)); + if (isStriped) { + in2.seek(0); + } else { in2.seekToNewSource(0); - assertTrue(checkFile1(in2)); - // verify fetchBlockByteRange() works again (by transparently - // re-fetching tokens from namenode) - assertTrue(checkFile2(in3)); + } + assertTrue(checkFile1(in2,expected)); + // verify fetchBlockByteRange() works again (by transparently + // re-fetching tokens from namenode) + assertTrue(checkFile2(in3,expected)); /* * testing that when datanodes are restarted on different ports, DFSClient * is able to re-fetch tokens transparently to connect to them */ - // restart datanodes on newly assigned ports - assertTrue(cluster.restartDataNodes(false)); - cluster.waitActive(); - assertEquals(numDataNodes, cluster.getDataNodes().size()); - // verify blockSeekTo() is able to re-fetch token transparently - in1.seek(0); - assertTrue(checkFile1(in1)); - // verify blockSeekTo() is able to re-fetch token transparently + // restart datanodes on newly assigned ports + assertTrue(cluster.restartDataNodes(false)); + cluster.waitActive(); + assertEquals(numDataNodes, cluster.getDataNodes().size()); + // verify blockSeekTo() is able to re-fetch token transparently + in1.seek(0); + assertTrue(checkFile1(in1,expected)); + // verify blockSeekTo() is able to re-fetch token transparently + if (isStriped) { + in2.seek(0); + } else { in2.seekToNewSource(0); - assertTrue(checkFile1(in2)); - // verify fetchBlockByteRange() is able to re-fetch token transparently - assertTrue(checkFile2(in3)); - - } finally { - if (cluster != null) { - cluster.shutdown(); - } } - } + assertTrue(checkFile1(in2,expected)); + // verify fetchBlockByteRange() is able to re-fetch token transparently + assertTrue(checkFile2(in3,expected)); + } /** * Integration testing of access token, involving NN, DN, and Balancer */ @@ -593,4 +621,8 @@ public void testEnd2End() throws Exception { conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true); new TestBalancer().integrationTest(conf); } + + protected boolean isBlockTokenExpired(LocatedBlock lb) throws IOException { + return SecurityTestUtil.isBlockTokenExpired(lb.getBlockToken()); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFSStriped.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFSStriped.java new file mode 100644 index 0000000000..e212917b6f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFSStriped.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; +import org.apache.hadoop.hdfs.util.StripedBlockUtil; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; + +public class TestBlockTokenWithDFSStriped extends TestBlockTokenWithDFS { + + private final static int dataBlocks = HdfsConstants.NUM_DATA_BLOCKS; + private final static int parityBlocks = HdfsConstants.NUM_PARITY_BLOCKS; + private final static int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE; + private final static int stripesPerBlock = 4; + private final static int numDNs = dataBlocks + parityBlocks + 2; + private static MiniDFSCluster cluster; + private static Configuration conf; + + { + BLOCK_SIZE = cellSize * stripesPerBlock; + FILE_SIZE = BLOCK_SIZE * dataBlocks * 3; + } + + @Before + public void setup() throws IOException { + conf = getConf(); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build(); + cluster.getFileSystem().getClient() + .createErasureCodingZone("/", null, cellSize); + cluster.waitActive(); + } + + @After + public void tearDown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + private Configuration getConf() { + Configuration conf = super.getConf(numDNs); + conf.setInt("io.bytes.per.checksum", cellSize); + return conf; + } + + @Test + @Override + public void testRead() throws Exception { + //TODO: DFSStripedInputStream handles token expiration +// doTestRead(conf, cluster, true); + } + + @Test + @Override + public void testWrite() throws Exception { + //TODO: DFSStripedOutputStream handles token expiration + } + + @Test + @Override + public void testAppend() throws Exception { + //TODO: support Append for striped file + } + + @Test + @Override + public void testEnd2End() throws Exception { + //TODO: DFSStripedOutputStream handles token expiration + } + + @Override + protected void tryRead(final Configuration conf, LocatedBlock lblock, + boolean shouldSucceed) { + LocatedStripedBlock lsb = (LocatedStripedBlock) lblock; + LocatedBlock[] internalBlocks = StripedBlockUtil.parseStripedBlockGroup + (lsb, cellSize, dataBlocks, parityBlocks); + for (LocatedBlock internalBlock : internalBlocks) { + super.tryRead(conf, internalBlock, shouldSucceed); + } + } + + @Override + protected boolean isBlockTokenExpired(LocatedBlock lb) throws IOException { + LocatedStripedBlock lsb = (LocatedStripedBlock) lb; + LocatedBlock[] internalBlocks = StripedBlockUtil.parseStripedBlockGroup + (lsb, cellSize, dataBlocks, parityBlocks); + for (LocatedBlock internalBlock : internalBlocks) { + if(super.isBlockTokenExpired(internalBlock)){ + return true; + } + } + return false; + } +}