HDFS-2199. Move blockTokenSecretManager from FSNamesystem to BlockManager. Contributed by Uma Maheswara Rao G
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1152776 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
8390152d08
commit
d68e38b78d
@ -626,6 +626,9 @@ Trunk (unreleased changes)
|
||||
HDFS-2212. Refactor double-buffering code out of EditLogOutputStreams.
|
||||
(todd via eli)
|
||||
|
||||
HDFS-2199. Move blockTokenSecretManager from FSNamesystem to BlockManager.
|
||||
(Uma Maheswara Rao G via szetszwo)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HDFS-1458. Improve checkpoint performance by avoiding unnecessary image
|
||||
|
@ -24,6 +24,7 @@
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
@ -47,6 +48,9 @@
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
|
||||
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.UnderReplicatedBlocks.BlockIterator;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
|
||||
@ -58,6 +62,7 @@
|
||||
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
|
||||
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
|
||||
import org.apache.hadoop.net.Node;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.util.Daemon;
|
||||
|
||||
/**
|
||||
@ -80,6 +85,25 @@ public class BlockManager {
|
||||
public volatile long scheduledReplicationBlocksCount = 0L;
|
||||
private volatile long excessBlocksCount = 0L;
|
||||
private volatile long pendingDeletionBlocksCount = 0L;
|
||||
private boolean isBlockTokenEnabled;
|
||||
private long blockKeyUpdateInterval;
|
||||
private long blockTokenLifetime;
|
||||
private BlockTokenSecretManager blockTokenSecretManager;
|
||||
|
||||
/** returns the isBlockTokenEnabled - true if block token enabled ,else false */
|
||||
public boolean isBlockTokenEnabled() {
|
||||
return isBlockTokenEnabled;
|
||||
}
|
||||
|
||||
/** get the block key update interval */
|
||||
public long getBlockKeyUpdateInterval() {
|
||||
return blockKeyUpdateInterval;
|
||||
}
|
||||
|
||||
/** get the BlockTokenSecretManager */
|
||||
public BlockTokenSecretManager getBlockTokenSecretManager() {
|
||||
return blockTokenSecretManager;
|
||||
}
|
||||
|
||||
/** Used by metrics */
|
||||
public long getPendingReplicationBlocksCount() {
|
||||
@ -170,6 +194,42 @@ public long getExcessBlocksCount() {
|
||||
/** for block replicas placement */
|
||||
private BlockPlacementPolicy blockplacement;
|
||||
|
||||
/**
|
||||
* Get access keys
|
||||
*
|
||||
* @return current access keys
|
||||
*/
|
||||
public ExportedBlockKeys getBlockKeys() {
|
||||
return isBlockTokenEnabled ? blockTokenSecretManager.exportKeys()
|
||||
: ExportedBlockKeys.DUMMY_KEYS;
|
||||
}
|
||||
|
||||
/** Generate block token for a LocatedBlock. */
|
||||
public void setBlockToken(LocatedBlock l) throws IOException {
|
||||
Token<BlockTokenIdentifier> token = blockTokenSecretManager.generateToken(l
|
||||
.getBlock(), EnumSet.of(BlockTokenSecretManager.AccessMode.READ));
|
||||
l.setBlockToken(token);
|
||||
}
|
||||
|
||||
/** Generate block tokens for the blocks to be returned. */
|
||||
public void setBlockTokens(List<LocatedBlock> locatedBlocks) throws IOException {
|
||||
for(LocatedBlock l : locatedBlocks) {
|
||||
setBlockToken(l);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Update access keys.
|
||||
*/
|
||||
public void updateBlockKey() throws IOException {
|
||||
this.blockTokenSecretManager.updateKeys();
|
||||
synchronized (namesystem.heartbeats) {
|
||||
for (DatanodeDescriptor nodeInfo : namesystem.heartbeats) {
|
||||
nodeInfo.needKeyUpdate = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public BlockManager(FSNamesystem fsn, Configuration conf) throws IOException {
|
||||
namesystem = fsn;
|
||||
datanodeManager = new DatanodeManager(fsn, conf);
|
||||
@ -179,7 +239,26 @@ public BlockManager(FSNamesystem fsn, Configuration conf) throws IOException {
|
||||
pendingReplications = new PendingReplicationBlocks(conf.getInt(
|
||||
DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY,
|
||||
DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_DEFAULT) * 1000L);
|
||||
this.isBlockTokenEnabled = conf.getBoolean(
|
||||
DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY,
|
||||
DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_DEFAULT);
|
||||
if (isBlockTokenEnabled) {
|
||||
if (isBlockTokenEnabled) {
|
||||
this.blockKeyUpdateInterval = conf.getLong(
|
||||
DFSConfigKeys.DFS_BLOCK_ACCESS_KEY_UPDATE_INTERVAL_KEY,
|
||||
DFSConfigKeys.DFS_BLOCK_ACCESS_KEY_UPDATE_INTERVAL_DEFAULT) * 60 * 1000L; // 10 hrs
|
||||
this.blockTokenLifetime = conf.getLong(
|
||||
DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_LIFETIME_KEY,
|
||||
DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_LIFETIME_DEFAULT) * 60 * 1000L; // 10 hrs
|
||||
}
|
||||
|
||||
blockTokenSecretManager = new BlockTokenSecretManager(true,
|
||||
blockKeyUpdateInterval, blockTokenLifetime);
|
||||
}
|
||||
LOG.info("isBlockTokenEnabled=" + isBlockTokenEnabled
|
||||
+ " blockKeyUpdateInterval=" + blockKeyUpdateInterval / (60 * 1000)
|
||||
+ " min(s), blockTokenLifetime=" + blockTokenLifetime / (60 * 1000)
|
||||
+ " min(s)");
|
||||
this.maxCorruptFilesReturned = conf.getInt(
|
||||
DFSConfigKeys.DFS_DEFAULT_MAX_CORRUPT_FILES_RETURNED_KEY,
|
||||
DFSConfigKeys.DFS_DEFAULT_MAX_CORRUPT_FILES_RETURNED);
|
||||
|
@ -469,7 +469,7 @@ public void registerDatanode(DatanodeRegistration nodeReg
|
||||
nodeReg.getInfoPort(),
|
||||
nodeReg.getIpcPort());
|
||||
nodeReg.updateRegInfo(dnReg);
|
||||
nodeReg.exportedKeys = namesystem.getBlockKeys();
|
||||
nodeReg.exportedKeys = namesystem.getBlockManager().getBlockKeys();
|
||||
|
||||
NameNode.stateChangeLog.info("BLOCK* NameSystem.registerDatanode: "
|
||||
+ "node registration from " + nodeReg.getName()
|
||||
|
@ -43,9 +43,7 @@
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.NavigableMap;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
@ -90,9 +88,7 @@
|
||||
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
|
||||
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
|
||||
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
|
||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||
@ -100,7 +96,6 @@
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.UnderReplicatedBlocks;
|
||||
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
|
||||
@ -112,15 +107,9 @@
|
||||
import org.apache.hadoop.hdfs.server.common.Util;
|
||||
import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
|
||||
import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMBean;
|
||||
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
|
||||
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
|
||||
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
|
||||
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
|
||||
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
|
||||
import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
|
||||
@ -221,10 +210,6 @@ private static final void logAuditEvent(UserGroupInformation ugi,
|
||||
private long capacityTotal = 0L, capacityUsed = 0L, capacityRemaining = 0L;
|
||||
private long blockPoolUsed = 0L;
|
||||
private int totalLoad = 0;
|
||||
boolean isBlockTokenEnabled;
|
||||
BlockTokenSecretManager blockTokenSecretManager;
|
||||
private long blockKeyUpdateInterval;
|
||||
private long blockTokenLifetime;
|
||||
|
||||
// Scan interval is not configurable.
|
||||
private static final long DELEGATION_TOKEN_REMOVER_SCAN_INTERVAL =
|
||||
@ -334,10 +319,6 @@ private void initialize(Configuration conf, FSImage fsImage)
|
||||
this.dir = new FSDirectory(fsImage, this, conf);
|
||||
}
|
||||
this.safeMode = new SafeModeInfo(conf);
|
||||
if (isBlockTokenEnabled) {
|
||||
blockTokenSecretManager = new BlockTokenSecretManager(true,
|
||||
blockKeyUpdateInterval, blockTokenLifetime);
|
||||
}
|
||||
}
|
||||
|
||||
void activateSecretManager() throws IOException {
|
||||
@ -499,21 +480,6 @@ private void setConfigurationParameters(Configuration conf)
|
||||
this.accessTimePrecision = conf.getLong(DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY, 0);
|
||||
this.supportAppends = conf.getBoolean(DFSConfigKeys.DFS_SUPPORT_APPEND_KEY,
|
||||
DFSConfigKeys.DFS_SUPPORT_APPEND_DEFAULT);
|
||||
this.isBlockTokenEnabled = conf.getBoolean(
|
||||
DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY,
|
||||
DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_DEFAULT);
|
||||
if (isBlockTokenEnabled) {
|
||||
this.blockKeyUpdateInterval = conf.getLong(
|
||||
DFSConfigKeys.DFS_BLOCK_ACCESS_KEY_UPDATE_INTERVAL_KEY,
|
||||
DFSConfigKeys.DFS_BLOCK_ACCESS_KEY_UPDATE_INTERVAL_DEFAULT) * 60 * 1000L; // 10 hrs
|
||||
this.blockTokenLifetime = conf.getLong(
|
||||
DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_LIFETIME_KEY,
|
||||
DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_LIFETIME_DEFAULT) * 60 * 1000L; // 10 hrs
|
||||
}
|
||||
LOG.info("isBlockTokenEnabled=" + isBlockTokenEnabled
|
||||
+ " blockKeyUpdateInterval=" + blockKeyUpdateInterval / (60 * 1000)
|
||||
+ " min(s), blockTokenLifetime=" + blockTokenLifetime / (60 * 1000)
|
||||
+ " min(s)");
|
||||
|
||||
this.dtpReplaceDatanodeOnFailure = ReplaceDatanodeOnFailure.get(conf);
|
||||
}
|
||||
@ -646,15 +612,6 @@ BlocksWithLocations getBlocks(DatanodeID datanode, long size)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get access keys
|
||||
*
|
||||
* @return current access keys
|
||||
*/
|
||||
public ExportedBlockKeys getBlockKeys() {
|
||||
return isBlockTokenEnabled ? blockTokenSecretManager.exportKeys()
|
||||
: ExportedBlockKeys.DUMMY_KEYS;
|
||||
}
|
||||
|
||||
/////////////////////////////////////////////////////////
|
||||
//
|
||||
@ -853,9 +810,9 @@ LocatedBlocks getBlockLocationsInternal(INodeFile inode,
|
||||
.getBlockLocation(last, n - last.getNumBytes()) : blockManager
|
||||
.getBlockLocation(last, n);
|
||||
|
||||
if (isBlockTokenEnabled && needBlockToken) {
|
||||
setBlockTokens(locatedblocks);
|
||||
setBlockToken(lastBlock);
|
||||
if (blockManager.isBlockTokenEnabled() && needBlockToken) {
|
||||
blockManager.setBlockTokens(locatedblocks);
|
||||
blockManager.setBlockToken(lastBlock);
|
||||
}
|
||||
return new LocatedBlocks(n, inode.isUnderConstruction(), locatedblocks,
|
||||
lastBlock, last.isComplete());
|
||||
@ -868,19 +825,6 @@ public LocatedBlock createLocatedBlock(final Block b, final DatanodeInfo[] locat
|
||||
return new LocatedBlock(getExtendedBlock(b), locations, offset, corrupt);
|
||||
}
|
||||
|
||||
/** Generate block tokens for the blocks to be returned. */
|
||||
private void setBlockTokens(List<LocatedBlock> locatedBlocks) throws IOException {
|
||||
for(LocatedBlock l : locatedBlocks) {
|
||||
setBlockToken(l);
|
||||
}
|
||||
}
|
||||
|
||||
/** Generate block token for a LocatedBlock. */
|
||||
private void setBlockToken(LocatedBlock l) throws IOException {
|
||||
Token<BlockTokenIdentifier> token = blockTokenSecretManager.generateToken(l
|
||||
.getBlock(), EnumSet.of(BlockTokenSecretManager.AccessMode.READ));
|
||||
l.setBlockToken(token);
|
||||
}
|
||||
|
||||
/**
|
||||
* Moves all the blocks from srcs and appends them to trg
|
||||
@ -1369,8 +1313,8 @@ private LocatedBlock startFileInternal(String src,
|
||||
LocatedBlock lb =
|
||||
blockManager.convertLastBlockToUnderConstruction(cons);
|
||||
|
||||
if (lb != null && isBlockTokenEnabled) {
|
||||
lb.setBlockToken(blockTokenSecretManager.generateToken(lb.getBlock(),
|
||||
if (lb != null && blockManager.isBlockTokenEnabled()) {
|
||||
lb.setBlockToken(blockManager.getBlockTokenSecretManager().generateToken(lb.getBlock(),
|
||||
EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE)));
|
||||
}
|
||||
return lb;
|
||||
@ -1659,8 +1603,8 @@ public LocatedBlock getAdditionalBlock(String src,
|
||||
|
||||
// Create next block
|
||||
LocatedBlock b = new LocatedBlock(getExtendedBlock(newBlock), targets, fileLength);
|
||||
if (isBlockTokenEnabled) {
|
||||
b.setBlockToken(blockTokenSecretManager.generateToken(b.getBlock(),
|
||||
if (blockManager.isBlockTokenEnabled()) {
|
||||
b.setBlockToken(blockManager.getBlockTokenSecretManager().generateToken(b.getBlock(),
|
||||
EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE)));
|
||||
}
|
||||
return b;
|
||||
@ -1708,8 +1652,8 @@ LocatedBlock getAdditionalDatanode(final String src, final ExtendedBlock blk,
|
||||
).chooseTarget(src, numAdditionalNodes, clientnode, chosen, true,
|
||||
excludes, preferredblocksize);
|
||||
final LocatedBlock lb = new LocatedBlock(blk, targets);
|
||||
if (isBlockTokenEnabled) {
|
||||
lb.setBlockToken(blockTokenSecretManager.generateToken(lb.getBlock(),
|
||||
if (blockManager.isBlockTokenEnabled()) {
|
||||
lb.setBlockToken(blockManager.getBlockTokenSecretManager().generateToken(lb.getBlock(),
|
||||
EnumSet.of(BlockTokenSecretManager.AccessMode.COPY)));
|
||||
}
|
||||
return lb;
|
||||
@ -2705,8 +2649,8 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
|
||||
public void addKeyUpdateCommand(final List<DatanodeCommand> cmds,
|
||||
final DatanodeDescriptor nodeinfo) {
|
||||
// check access key update
|
||||
if (isBlockTokenEnabled && nodeinfo.needKeyUpdate) {
|
||||
cmds.add(new KeyUpdateCommand(blockTokenSecretManager.exportKeys()));
|
||||
if (blockManager.isBlockTokenEnabled() && nodeinfo.needKeyUpdate) {
|
||||
cmds.add(new KeyUpdateCommand(blockManager.getBlockTokenSecretManager().exportKeys()));
|
||||
nodeinfo.needKeyUpdate = false;
|
||||
}
|
||||
}
|
||||
@ -2792,17 +2736,6 @@ public void run () {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Update access keys.
|
||||
*/
|
||||
void updateBlockKey() throws IOException {
|
||||
this.blockTokenSecretManager.updateKeys();
|
||||
synchronized (heartbeats) {
|
||||
for (DatanodeDescriptor nodeInfo : heartbeats) {
|
||||
nodeInfo.needKeyUpdate = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Periodically calls heartbeatCheck() and updateBlockKey()
|
||||
@ -2820,8 +2753,9 @@ public void run() {
|
||||
heartbeatCheck();
|
||||
lastHeartbeatCheck = now;
|
||||
}
|
||||
if (isBlockTokenEnabled && (lastBlockKeyUpdate + blockKeyUpdateInterval < now)) {
|
||||
updateBlockKey();
|
||||
if (blockManager.isBlockTokenEnabled()
|
||||
&& (lastBlockKeyUpdate + blockManager.getBlockKeyUpdateInterval() < now)) {
|
||||
blockManager.updateBlockKey();
|
||||
lastBlockKeyUpdate = now;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
@ -4355,8 +4289,8 @@ LocatedBlock updateBlockForPipeline(ExtendedBlock block,
|
||||
// get a new generation stamp and an access token
|
||||
block.setGenerationStamp(nextGenerationStamp());
|
||||
locatedBlock = new LocatedBlock(block, new DatanodeInfo[0]);
|
||||
if (isBlockTokenEnabled) {
|
||||
locatedBlock.setBlockToken(blockTokenSecretManager.generateToken(
|
||||
if (blockManager.isBlockTokenEnabled()) {
|
||||
locatedBlock.setBlockToken(blockManager.getBlockTokenSecretManager().generateToken(
|
||||
block, EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE)));
|
||||
}
|
||||
} finally {
|
||||
|
@ -631,7 +631,7 @@ public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size)
|
||||
|
||||
@Override // NamenodeProtocol
|
||||
public ExportedBlockKeys getBlockKeys() throws IOException {
|
||||
return namesystem.getBlockKeys();
|
||||
return namesystem.getBlockManager().getBlockKeys();
|
||||
}
|
||||
|
||||
@Override // NamenodeProtocol
|
||||
|
@ -190,7 +190,7 @@ public void testAppend() throws Exception {
|
||||
assertEquals(numDataNodes, cluster.getDataNodes().size());
|
||||
// set a short token lifetime (1 second)
|
||||
SecurityTestUtil.setBlockTokenLifetime(
|
||||
cluster.getNameNode().getNamesystem().blockTokenSecretManager, 1000L);
|
||||
cluster.getNameNode().getNamesystem().getBlockManager().getBlockTokenSecretManager(), 1000L);
|
||||
Path fileToAppend = new Path(FILE_TO_APPEND);
|
||||
FileSystem fs = cluster.getFileSystem();
|
||||
|
||||
@ -246,7 +246,7 @@ public void testWrite() throws Exception {
|
||||
assertEquals(numDataNodes, cluster.getDataNodes().size());
|
||||
// set a short token lifetime (1 second)
|
||||
SecurityTestUtil.setBlockTokenLifetime(
|
||||
cluster.getNameNode().getNamesystem().blockTokenSecretManager, 1000L);
|
||||
cluster.getNameNode().getNamesystem().getBlockManager().getBlockTokenSecretManager(), 1000L);
|
||||
Path fileToWrite = new Path(FILE_TO_WRITE);
|
||||
FileSystem fs = cluster.getFileSystem();
|
||||
|
||||
@ -294,7 +294,9 @@ public void testRead() throws Exception {
|
||||
assertEquals(numDataNodes, cluster.getDataNodes().size());
|
||||
// set a short token lifetime (1 second) initially
|
||||
SecurityTestUtil.setBlockTokenLifetime(
|
||||
cluster.getNameNode().getNamesystem().blockTokenSecretManager, 1000L);
|
||||
cluster.getNameNode()
|
||||
.getNamesystem().getBlockManager().getBlockTokenSecretManager(),
|
||||
1000L);
|
||||
Path fileToRead = new Path(FILE_TO_READ);
|
||||
FileSystem fs = cluster.getFileSystem();
|
||||
createFile(fs, fileToRead);
|
||||
@ -349,7 +351,8 @@ public void testRead() throws Exception {
|
||||
tryRead(conf, lblock, false);
|
||||
// use a valid new token
|
||||
lblock.setBlockToken(cluster.getNameNode().getNamesystem()
|
||||
.blockTokenSecretManager.generateToken(lblock.getBlock(),
|
||||
.getBlockManager().getBlockTokenSecretManager().generateToken(
|
||||
lblock.getBlock(),
|
||||
EnumSet.of(BlockTokenSecretManager.AccessMode.READ)));
|
||||
// read should succeed
|
||||
tryRead(conf, lblock, true);
|
||||
@ -357,13 +360,15 @@ public void testRead() throws Exception {
|
||||
ExtendedBlock wrongBlock = new ExtendedBlock(lblock.getBlock()
|
||||
.getBlockPoolId(), lblock.getBlock().getBlockId() + 1);
|
||||
lblock.setBlockToken(cluster.getNameNode().getNamesystem()
|
||||
.blockTokenSecretManager.generateToken(wrongBlock,
|
||||
.getBlockManager().getBlockTokenSecretManager().generateToken(wrongBlock,
|
||||
EnumSet.of(BlockTokenSecretManager.AccessMode.READ)));
|
||||
// read should fail
|
||||
tryRead(conf, lblock, false);
|
||||
// use a token with wrong access modes
|
||||
lblock.setBlockToken(cluster.getNameNode().getNamesystem()
|
||||
.blockTokenSecretManager.generateToken(lblock.getBlock(), EnumSet.of(
|
||||
.getBlockManager().getBlockTokenSecretManager().generateToken(
|
||||
lblock.getBlock(),
|
||||
EnumSet.of(
|
||||
BlockTokenSecretManager.AccessMode.WRITE,
|
||||
BlockTokenSecretManager.AccessMode.COPY,
|
||||
BlockTokenSecretManager.AccessMode.REPLACE)));
|
||||
@ -372,7 +377,9 @@ public void testRead() throws Exception {
|
||||
|
||||
// set a long token lifetime for future tokens
|
||||
SecurityTestUtil.setBlockTokenLifetime(
|
||||
cluster.getNameNode().getNamesystem().blockTokenSecretManager, 600 * 1000L);
|
||||
cluster.getNameNode()
|
||||
.getNamesystem().getBlockManager().getBlockTokenSecretManager(),
|
||||
600 * 1000L);
|
||||
|
||||
/*
|
||||
* testing that when cached tokens are expired, DFSClient will re-fetch
|
||||
|
Loading…
Reference in New Issue
Block a user