HDFS-2265. Remove unnecessary BlockTokenSecretManager fields/methods from BlockManager.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1158743 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
9c7e27cd2d
commit
b094465168
@ -665,6 +665,9 @@ Trunk (unreleased changes)
|
|||||||
|
|
||||||
HDFS-2233. Add WebUI tests with URI reserved chars. (eli)
|
HDFS-2233. Add WebUI tests with URI reserved chars. (eli)
|
||||||
|
|
||||||
|
HDFS-2265. Remove unnecessary BlockTokenSecretManager fields/methods from
|
||||||
|
BlockManager. (szetszwo)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-1458. Improve checkpoint performance by avoiding unnecessary image
|
HDFS-1458. Improve checkpoint performance by avoiding unnecessary image
|
||||||
|
@ -52,7 +52,7 @@ public class BlockTokenSecretManager extends
|
|||||||
public static final Token<BlockTokenIdentifier> DUMMY_TOKEN = new Token<BlockTokenIdentifier>();
|
public static final Token<BlockTokenIdentifier> DUMMY_TOKEN = new Token<BlockTokenIdentifier>();
|
||||||
|
|
||||||
private final boolean isMaster;
|
private final boolean isMaster;
|
||||||
/*
|
/**
|
||||||
* keyUpdateInterval is the interval that NN updates its block keys. It should
|
* keyUpdateInterval is the interval that NN updates its block keys. It should
|
||||||
* be set long enough so that all live DN's and Balancer should have sync'ed
|
* be set long enough so that all live DN's and Balancer should have sync'ed
|
||||||
* their block keys with NN at least once during each interval.
|
* their block keys with NN at least once during each interval.
|
||||||
@ -150,12 +150,24 @@ public synchronized void setKeys(ExportedBlockKeys exportedKeys)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Update block keys if update time > update interval.
|
||||||
|
* @return true if the keys are updated.
|
||||||
|
*/
|
||||||
|
public boolean updateKeys(final long updateTime) throws IOException {
|
||||||
|
if (updateTime > keyUpdateInterval) {
|
||||||
|
return updateKeys();
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Update block keys, only to be used in master mode
|
* Update block keys, only to be used in master mode
|
||||||
*/
|
*/
|
||||||
public synchronized void updateKeys() throws IOException {
|
synchronized boolean updateKeys() throws IOException {
|
||||||
if (!isMaster)
|
if (!isMaster)
|
||||||
return;
|
return false;
|
||||||
|
|
||||||
LOG.info("Updating block keys");
|
LOG.info("Updating block keys");
|
||||||
removeExpiredKeys();
|
removeExpiredKeys();
|
||||||
// set final expiry date of retiring currentKey
|
// set final expiry date of retiring currentKey
|
||||||
@ -171,6 +183,7 @@ public synchronized void updateKeys() throws IOException {
|
|||||||
nextKey = new BlockKey(serialNo, System.currentTimeMillis() + 3
|
nextKey = new BlockKey(serialNo, System.currentTimeMillis() + 3
|
||||||
* keyUpdateInterval + tokenLifetime, generateSecret());
|
* keyUpdateInterval + tokenLifetime, generateSecret());
|
||||||
allKeys.put(nextKey.getKeyId(), nextKey);
|
allKeys.put(nextKey.getKeyId(), nextKey);
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Generate an block token for current user */
|
/** Generate an block token for current user */
|
||||||
|
@ -80,21 +80,16 @@ public class BlockManager {
|
|||||||
|
|
||||||
private final FSNamesystem namesystem;
|
private final FSNamesystem namesystem;
|
||||||
|
|
||||||
|
private final DatanodeManager datanodeManager;
|
||||||
|
private final HeartbeatManager heartbeatManager;
|
||||||
|
private final BlockTokenSecretManager blockTokenSecretManager;
|
||||||
|
|
||||||
private volatile long pendingReplicationBlocksCount = 0L;
|
private volatile long pendingReplicationBlocksCount = 0L;
|
||||||
private volatile long corruptReplicaBlocksCount = 0L;
|
private volatile long corruptReplicaBlocksCount = 0L;
|
||||||
private volatile long underReplicatedBlocksCount = 0L;
|
private volatile long underReplicatedBlocksCount = 0L;
|
||||||
private volatile long scheduledReplicationBlocksCount = 0L;
|
private volatile long scheduledReplicationBlocksCount = 0L;
|
||||||
private volatile long excessBlocksCount = 0L;
|
private volatile long excessBlocksCount = 0L;
|
||||||
private volatile long pendingDeletionBlocksCount = 0L;
|
private volatile long pendingDeletionBlocksCount = 0L;
|
||||||
private boolean isBlockTokenEnabled;
|
|
||||||
private long blockKeyUpdateInterval;
|
|
||||||
private long blockTokenLifetime;
|
|
||||||
private BlockTokenSecretManager blockTokenSecretManager;
|
|
||||||
|
|
||||||
/** get the BlockTokenSecretManager */
|
|
||||||
public BlockTokenSecretManager getBlockTokenSecretManager() {
|
|
||||||
return blockTokenSecretManager;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Used by metrics */
|
/** Used by metrics */
|
||||||
public long getPendingReplicationBlocksCount() {
|
public long getPendingReplicationBlocksCount() {
|
||||||
@ -130,9 +125,6 @@ public long getExcessBlocksCount() {
|
|||||||
*/
|
*/
|
||||||
final BlocksMap blocksMap;
|
final BlocksMap blocksMap;
|
||||||
|
|
||||||
private final DatanodeManager datanodeManager;
|
|
||||||
private final HeartbeatManager heartbeatManager;
|
|
||||||
|
|
||||||
/** Replication thread. */
|
/** Replication thread. */
|
||||||
final Daemon replicationThread = new Daemon(new ReplicationMonitor());
|
final Daemon replicationThread = new Daemon(new ReplicationMonitor());
|
||||||
|
|
||||||
@ -197,26 +189,9 @@ public BlockManager(FSNamesystem fsn, Configuration conf) throws IOException {
|
|||||||
pendingReplications = new PendingReplicationBlocks(conf.getInt(
|
pendingReplications = new PendingReplicationBlocks(conf.getInt(
|
||||||
DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY,
|
DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY,
|
||||||
DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_DEFAULT) * 1000L);
|
DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_DEFAULT) * 1000L);
|
||||||
this.isBlockTokenEnabled = conf.getBoolean(
|
|
||||||
DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY,
|
|
||||||
DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_DEFAULT);
|
|
||||||
if (isBlockTokenEnabled) {
|
|
||||||
if (isBlockTokenEnabled) {
|
|
||||||
this.blockKeyUpdateInterval = conf.getLong(
|
|
||||||
DFSConfigKeys.DFS_BLOCK_ACCESS_KEY_UPDATE_INTERVAL_KEY,
|
|
||||||
DFSConfigKeys.DFS_BLOCK_ACCESS_KEY_UPDATE_INTERVAL_DEFAULT) * 60 * 1000L; // 10 hrs
|
|
||||||
this.blockTokenLifetime = conf.getLong(
|
|
||||||
DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_LIFETIME_KEY,
|
|
||||||
DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_LIFETIME_DEFAULT) * 60 * 1000L; // 10 hrs
|
|
||||||
}
|
|
||||||
|
|
||||||
blockTokenSecretManager = new BlockTokenSecretManager(true,
|
blockTokenSecretManager = createBlockTokenSecretManager(conf);
|
||||||
blockKeyUpdateInterval, blockTokenLifetime);
|
|
||||||
}
|
|
||||||
LOG.info("isBlockTokenEnabled=" + isBlockTokenEnabled
|
|
||||||
+ " blockKeyUpdateInterval=" + blockKeyUpdateInterval / (60 * 1000)
|
|
||||||
+ " min(s), blockTokenLifetime=" + blockTokenLifetime / (60 * 1000)
|
|
||||||
+ " min(s)");
|
|
||||||
this.maxCorruptFilesReturned = conf.getInt(
|
this.maxCorruptFilesReturned = conf.getInt(
|
||||||
DFSConfigKeys.DFS_DEFAULT_MAX_CORRUPT_FILES_RETURNED_KEY,
|
DFSConfigKeys.DFS_DEFAULT_MAX_CORRUPT_FILES_RETURNED_KEY,
|
||||||
DFSConfigKeys.DFS_DEFAULT_MAX_CORRUPT_FILES_RETURNED);
|
DFSConfigKeys.DFS_DEFAULT_MAX_CORRUPT_FILES_RETURNED);
|
||||||
@ -260,6 +235,46 @@ public BlockManager(FSNamesystem fsn, Configuration conf) throws IOException {
|
|||||||
LOG.info("replicationRecheckInterval = " + replicationRecheckInterval);
|
LOG.info("replicationRecheckInterval = " + replicationRecheckInterval);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static BlockTokenSecretManager createBlockTokenSecretManager(
|
||||||
|
final Configuration conf) throws IOException {
|
||||||
|
final boolean isEnabled = conf.getBoolean(
|
||||||
|
DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY,
|
||||||
|
DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_DEFAULT);
|
||||||
|
LOG.info(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY + "=" + isEnabled);
|
||||||
|
|
||||||
|
if (!isEnabled) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
final long updateMin = conf.getLong(
|
||||||
|
DFSConfigKeys.DFS_BLOCK_ACCESS_KEY_UPDATE_INTERVAL_KEY,
|
||||||
|
DFSConfigKeys.DFS_BLOCK_ACCESS_KEY_UPDATE_INTERVAL_DEFAULT);
|
||||||
|
final long lifetimeMin = conf.getLong(
|
||||||
|
DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_LIFETIME_KEY,
|
||||||
|
DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_LIFETIME_DEFAULT);
|
||||||
|
LOG.info(DFSConfigKeys.DFS_BLOCK_ACCESS_KEY_UPDATE_INTERVAL_KEY
|
||||||
|
+ "=" + updateMin + " min(s), "
|
||||||
|
+ DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_LIFETIME_KEY
|
||||||
|
+ "=" + lifetimeMin + " min(s)");
|
||||||
|
return new BlockTokenSecretManager(true,
|
||||||
|
updateMin*60*1000L, lifetimeMin*60*1000L);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** get the BlockTokenSecretManager */
|
||||||
|
BlockTokenSecretManager getBlockTokenSecretManager() {
|
||||||
|
return blockTokenSecretManager;
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean isBlockTokenEnabled() {
|
||||||
|
return blockTokenSecretManager != null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Should the access keys be updated? */
|
||||||
|
boolean shouldUpdateBlockKey(final long updateTime) throws IOException {
|
||||||
|
return isBlockTokenEnabled()? blockTokenSecretManager.updateKeys(updateTime)
|
||||||
|
: false;
|
||||||
|
}
|
||||||
|
|
||||||
public void activate(Configuration conf) {
|
public void activate(Configuration conf) {
|
||||||
pendingReplications.start();
|
pendingReplications.start();
|
||||||
datanodeManager.activate(conf);
|
datanodeManager.activate(conf);
|
||||||
@ -599,7 +614,7 @@ public LocatedBlocks createLocatedBlocks(final BlockInfo[] blocks,
|
|||||||
: fileSizeExcludeBlocksUnderConstruction;
|
: fileSizeExcludeBlocksUnderConstruction;
|
||||||
final LocatedBlock lastlb = createLocatedBlock(last, lastPos);
|
final LocatedBlock lastlb = createLocatedBlock(last, lastPos);
|
||||||
|
|
||||||
if (isBlockTokenEnabled && needBlockToken) {
|
if (isBlockTokenEnabled() && needBlockToken) {
|
||||||
for(LocatedBlock lb : locatedblocks) {
|
for(LocatedBlock lb : locatedblocks) {
|
||||||
setBlockToken(lb, AccessMode.READ);
|
setBlockToken(lb, AccessMode.READ);
|
||||||
}
|
}
|
||||||
@ -613,14 +628,14 @@ public LocatedBlocks createLocatedBlocks(final BlockInfo[] blocks,
|
|||||||
|
|
||||||
/** @return current access keys. */
|
/** @return current access keys. */
|
||||||
public ExportedBlockKeys getBlockKeys() {
|
public ExportedBlockKeys getBlockKeys() {
|
||||||
return isBlockTokenEnabled? blockTokenSecretManager.exportKeys()
|
return isBlockTokenEnabled()? blockTokenSecretManager.exportKeys()
|
||||||
: ExportedBlockKeys.DUMMY_KEYS;
|
: ExportedBlockKeys.DUMMY_KEYS;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Generate a block token for the located block. */
|
/** Generate a block token for the located block. */
|
||||||
public void setBlockToken(final LocatedBlock b,
|
public void setBlockToken(final LocatedBlock b,
|
||||||
final BlockTokenSecretManager.AccessMode mode) throws IOException {
|
final BlockTokenSecretManager.AccessMode mode) throws IOException {
|
||||||
if (isBlockTokenEnabled) {
|
if (isBlockTokenEnabled()) {
|
||||||
b.setBlockToken(blockTokenSecretManager.generateToken(b.getBlock(),
|
b.setBlockToken(blockTokenSecretManager.generateToken(b.getBlock(),
|
||||||
EnumSet.of(mode)));
|
EnumSet.of(mode)));
|
||||||
}
|
}
|
||||||
@ -629,7 +644,7 @@ public void setBlockToken(final LocatedBlock b,
|
|||||||
void addKeyUpdateCommand(final List<DatanodeCommand> cmds,
|
void addKeyUpdateCommand(final List<DatanodeCommand> cmds,
|
||||||
final DatanodeDescriptor nodeinfo) {
|
final DatanodeDescriptor nodeinfo) {
|
||||||
// check access key update
|
// check access key update
|
||||||
if (isBlockTokenEnabled && nodeinfo.needKeyUpdate) {
|
if (isBlockTokenEnabled() && nodeinfo.needKeyUpdate) {
|
||||||
cmds.add(new KeyUpdateCommand(blockTokenSecretManager.exportKeys()));
|
cmds.add(new KeyUpdateCommand(blockTokenSecretManager.exportKeys()));
|
||||||
nodeinfo.needKeyUpdate = false;
|
nodeinfo.needKeyUpdate = false;
|
||||||
}
|
}
|
||||||
@ -2368,16 +2383,6 @@ public BlockInfo getStoredBlock(Block block) {
|
|||||||
return blocksMap.getStoredBlock(block);
|
return blocksMap.getStoredBlock(block);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/** Should the access keys be updated? */
|
|
||||||
boolean shouldUpdateBlockKey(final long updateTime) throws IOException {
|
|
||||||
final boolean b = isBlockTokenEnabled && blockKeyUpdateInterval < updateTime;
|
|
||||||
if (b) {
|
|
||||||
blockTokenSecretManager.updateKeys();
|
|
||||||
}
|
|
||||||
return b;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** updates a block in under replication queue */
|
/** updates a block in under replication queue */
|
||||||
private void updateNeededReplications(final Block block,
|
private void updateNeededReplications(final Block block,
|
||||||
final int curReplicasDelta, int expectedReplicasDelta) {
|
final int curReplicasDelta, int expectedReplicasDelta) {
|
||||||
|
@ -15,7 +15,12 @@
|
|||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.namenode;
|
package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
@ -26,6 +31,10 @@
|
|||||||
|
|
||||||
import org.apache.commons.logging.impl.Log4JLogger;
|
import org.apache.commons.logging.impl.Log4JLogger;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.BlockReader;
|
import org.apache.hadoop.hdfs.BlockReader;
|
||||||
import org.apache.hadoop.hdfs.DFSClient;
|
import org.apache.hadoop.hdfs.DFSClient;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
@ -34,21 +43,20 @@
|
|||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
import org.apache.hadoop.hdfs.security.token.block.*;
|
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||||
|
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
|
||||||
|
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
|
||||||
|
import org.apache.hadoop.hdfs.security.token.block.SecurityTestUtil;
|
||||||
import org.apache.hadoop.hdfs.server.balancer.TestBalancer;
|
import org.apache.hadoop.hdfs.server.balancer.TestBalancer;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
|
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.hadoop.fs.FSDataInputStream;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
|
||||||
import org.apache.hadoop.security.token.*;
|
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.log4j.Level;
|
import org.apache.log4j.Level;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
import junit.framework.TestCase;
|
public class TestBlockTokenWithDFS {
|
||||||
|
|
||||||
public class TestBlockTokenWithDFS extends TestCase {
|
|
||||||
|
|
||||||
private static final int BLOCK_SIZE = 1024;
|
private static final int BLOCK_SIZE = 1024;
|
||||||
private static final int FILE_SIZE = 2 * BLOCK_SIZE;
|
private static final int FILE_SIZE = 2 * BLOCK_SIZE;
|
||||||
@ -175,10 +183,11 @@ private static Configuration getConf(int numDataNodes) throws IOException {
|
|||||||
return conf;
|
return conf;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/**
|
||||||
* testing that APPEND operation can handle token expiration when
|
* testing that APPEND operation can handle token expiration when
|
||||||
* re-establishing pipeline is needed
|
* re-establishing pipeline is needed
|
||||||
*/
|
*/
|
||||||
|
@Test
|
||||||
public void testAppend() throws Exception {
|
public void testAppend() throws Exception {
|
||||||
MiniDFSCluster cluster = null;
|
MiniDFSCluster cluster = null;
|
||||||
int numDataNodes = 2;
|
int numDataNodes = 2;
|
||||||
@ -188,9 +197,13 @@ public void testAppend() throws Exception {
|
|||||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build();
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build();
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
assertEquals(numDataNodes, cluster.getDataNodes().size());
|
assertEquals(numDataNodes, cluster.getDataNodes().size());
|
||||||
|
|
||||||
|
final NameNode nn = cluster.getNameNode();
|
||||||
|
final BlockManager bm = nn.getNamesystem().getBlockManager();
|
||||||
|
final BlockTokenSecretManager sm = bm.getBlockTokenSecretManager();
|
||||||
|
|
||||||
// set a short token lifetime (1 second)
|
// set a short token lifetime (1 second)
|
||||||
SecurityTestUtil.setBlockTokenLifetime(
|
SecurityTestUtil.setBlockTokenLifetime(sm, 1000L);
|
||||||
cluster.getNameNode().getNamesystem().getBlockManager().getBlockTokenSecretManager(), 1000L);
|
|
||||||
Path fileToAppend = new Path(FILE_TO_APPEND);
|
Path fileToAppend = new Path(FILE_TO_APPEND);
|
||||||
FileSystem fs = cluster.getFileSystem();
|
FileSystem fs = cluster.getFileSystem();
|
||||||
|
|
||||||
@ -231,10 +244,11 @@ public void testAppend() throws Exception {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/**
|
||||||
* testing that WRITE operation can handle token expiration when
|
* testing that WRITE operation can handle token expiration when
|
||||||
* re-establishing pipeline is needed
|
* re-establishing pipeline is needed
|
||||||
*/
|
*/
|
||||||
|
@Test
|
||||||
public void testWrite() throws Exception {
|
public void testWrite() throws Exception {
|
||||||
MiniDFSCluster cluster = null;
|
MiniDFSCluster cluster = null;
|
||||||
int numDataNodes = 2;
|
int numDataNodes = 2;
|
||||||
@ -244,9 +258,13 @@ public void testWrite() throws Exception {
|
|||||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build();
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build();
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
assertEquals(numDataNodes, cluster.getDataNodes().size());
|
assertEquals(numDataNodes, cluster.getDataNodes().size());
|
||||||
|
|
||||||
|
final NameNode nn = cluster.getNameNode();
|
||||||
|
final BlockManager bm = nn.getNamesystem().getBlockManager();
|
||||||
|
final BlockTokenSecretManager sm = bm.getBlockTokenSecretManager();
|
||||||
|
|
||||||
// set a short token lifetime (1 second)
|
// set a short token lifetime (1 second)
|
||||||
SecurityTestUtil.setBlockTokenLifetime(
|
SecurityTestUtil.setBlockTokenLifetime(sm, 1000L);
|
||||||
cluster.getNameNode().getNamesystem().getBlockManager().getBlockTokenSecretManager(), 1000L);
|
|
||||||
Path fileToWrite = new Path(FILE_TO_WRITE);
|
Path fileToWrite = new Path(FILE_TO_WRITE);
|
||||||
FileSystem fs = cluster.getFileSystem();
|
FileSystem fs = cluster.getFileSystem();
|
||||||
|
|
||||||
@ -283,6 +301,7 @@ public void testWrite() throws Exception {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
public void testRead() throws Exception {
|
public void testRead() throws Exception {
|
||||||
MiniDFSCluster cluster = null;
|
MiniDFSCluster cluster = null;
|
||||||
int numDataNodes = 2;
|
int numDataNodes = 2;
|
||||||
@ -292,11 +311,14 @@ public void testRead() throws Exception {
|
|||||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build();
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build();
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
assertEquals(numDataNodes, cluster.getDataNodes().size());
|
assertEquals(numDataNodes, cluster.getDataNodes().size());
|
||||||
|
|
||||||
|
final NameNode nn = cluster.getNameNode();
|
||||||
|
final BlockManager bm = nn.getNamesystem().getBlockManager();
|
||||||
|
final BlockTokenSecretManager sm = bm.getBlockTokenSecretManager();
|
||||||
|
|
||||||
// set a short token lifetime (1 second) initially
|
// set a short token lifetime (1 second) initially
|
||||||
SecurityTestUtil.setBlockTokenLifetime(
|
SecurityTestUtil.setBlockTokenLifetime(sm, 1000L);
|
||||||
cluster.getNameNode()
|
|
||||||
.getNamesystem().getBlockManager().getBlockTokenSecretManager(),
|
|
||||||
1000L);
|
|
||||||
Path fileToRead = new Path(FILE_TO_READ);
|
Path fileToRead = new Path(FILE_TO_READ);
|
||||||
FileSystem fs = cluster.getFileSystem();
|
FileSystem fs = cluster.getFileSystem();
|
||||||
createFile(fs, fileToRead);
|
createFile(fs, fileToRead);
|
||||||
@ -321,7 +343,7 @@ public void testRead() throws Exception {
|
|||||||
|
|
||||||
new DFSClient(new InetSocketAddress("localhost",
|
new DFSClient(new InetSocketAddress("localhost",
|
||||||
cluster.getNameNodePort()), conf);
|
cluster.getNameNodePort()), conf);
|
||||||
List<LocatedBlock> locatedBlocks = cluster.getNameNode().getBlockLocations(
|
List<LocatedBlock> locatedBlocks = nn.getBlockLocations(
|
||||||
FILE_TO_READ, 0, FILE_SIZE).getLocatedBlocks();
|
FILE_TO_READ, 0, FILE_SIZE).getLocatedBlocks();
|
||||||
LocatedBlock lblock = locatedBlocks.get(0); // first block
|
LocatedBlock lblock = locatedBlocks.get(0); // first block
|
||||||
Token<BlockTokenIdentifier> myToken = lblock.getBlockToken();
|
Token<BlockTokenIdentifier> myToken = lblock.getBlockToken();
|
||||||
@ -350,36 +372,27 @@ public void testRead() throws Exception {
|
|||||||
// read should fail
|
// read should fail
|
||||||
tryRead(conf, lblock, false);
|
tryRead(conf, lblock, false);
|
||||||
// use a valid new token
|
// use a valid new token
|
||||||
lblock.setBlockToken(cluster.getNameNode().getNamesystem()
|
lblock.setBlockToken(sm.generateToken(lblock.getBlock(),
|
||||||
.getBlockManager().getBlockTokenSecretManager().generateToken(
|
|
||||||
lblock.getBlock(),
|
|
||||||
EnumSet.of(BlockTokenSecretManager.AccessMode.READ)));
|
EnumSet.of(BlockTokenSecretManager.AccessMode.READ)));
|
||||||
// read should succeed
|
// read should succeed
|
||||||
tryRead(conf, lblock, true);
|
tryRead(conf, lblock, true);
|
||||||
// use a token with wrong blockID
|
// use a token with wrong blockID
|
||||||
ExtendedBlock wrongBlock = new ExtendedBlock(lblock.getBlock()
|
ExtendedBlock wrongBlock = new ExtendedBlock(lblock.getBlock()
|
||||||
.getBlockPoolId(), lblock.getBlock().getBlockId() + 1);
|
.getBlockPoolId(), lblock.getBlock().getBlockId() + 1);
|
||||||
lblock.setBlockToken(cluster.getNameNode().getNamesystem()
|
lblock.setBlockToken(sm.generateToken(wrongBlock,
|
||||||
.getBlockManager().getBlockTokenSecretManager().generateToken(wrongBlock,
|
|
||||||
EnumSet.of(BlockTokenSecretManager.AccessMode.READ)));
|
EnumSet.of(BlockTokenSecretManager.AccessMode.READ)));
|
||||||
// read should fail
|
// read should fail
|
||||||
tryRead(conf, lblock, false);
|
tryRead(conf, lblock, false);
|
||||||
// use a token with wrong access modes
|
// use a token with wrong access modes
|
||||||
lblock.setBlockToken(cluster.getNameNode().getNamesystem()
|
lblock.setBlockToken(sm.generateToken(lblock.getBlock(),
|
||||||
.getBlockManager().getBlockTokenSecretManager().generateToken(
|
EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE,
|
||||||
lblock.getBlock(),
|
|
||||||
EnumSet.of(
|
|
||||||
BlockTokenSecretManager.AccessMode.WRITE,
|
|
||||||
BlockTokenSecretManager.AccessMode.COPY,
|
BlockTokenSecretManager.AccessMode.COPY,
|
||||||
BlockTokenSecretManager.AccessMode.REPLACE)));
|
BlockTokenSecretManager.AccessMode.REPLACE)));
|
||||||
// read should fail
|
// read should fail
|
||||||
tryRead(conf, lblock, false);
|
tryRead(conf, lblock, false);
|
||||||
|
|
||||||
// set a long token lifetime for future tokens
|
// set a long token lifetime for future tokens
|
||||||
SecurityTestUtil.setBlockTokenLifetime(
|
SecurityTestUtil.setBlockTokenLifetime(sm, 600 * 1000L);
|
||||||
cluster.getNameNode()
|
|
||||||
.getNamesystem().getBlockManager().getBlockTokenSecretManager(),
|
|
||||||
600 * 1000L);
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* testing that when cached tokens are expired, DFSClient will re-fetch
|
* testing that when cached tokens are expired, DFSClient will re-fetch
|
||||||
@ -531,9 +544,10 @@ public void testRead() throws Exception {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/**
|
||||||
* Integration testing of access token, involving NN, DN, and Balancer
|
* Integration testing of access token, involving NN, DN, and Balancer
|
||||||
*/
|
*/
|
||||||
|
@Test
|
||||||
public void testEnd2End() throws Exception {
|
public void testEnd2End() throws Exception {
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
|
conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
|
Loading…
Reference in New Issue
Block a user