diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java index 980b474c99..31e4bd4a49 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java @@ -141,9 +141,9 @@ public void checkAccess(Token token, /** * See {@link BlockTokenSecretManager#addKeys(ExportedBlockKeys)}. */ - public void addKeys(String bpid, ExportedBlockKeys exportedKeys) - throws IOException { - get(bpid).addKeys(exportedKeys); + public void addKeys(String bpid, ExportedBlockKeys exportedKeys, + boolean updateCurrentKey) throws IOException { + get(bpid).addKeys(exportedKeys, updateCurrentKey); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java index b9f817db51..d2c3fe6eae 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java @@ -218,17 +218,23 @@ private synchronized void removeExpiredKeys() { } } + public synchronized void addKeys(ExportedBlockKeys exportedKeys) throws IOException { + addKeys(exportedKeys, true); + } + /** * Set block keys, only to be used in worker mode */ - public synchronized void addKeys(ExportedBlockKeys exportedKeys) - throws IOException { + public synchronized void addKeys(ExportedBlockKeys exportedKeys, + boolean updateCurrentKey) throws IOException { if (isMaster || exportedKeys == null) { return; } LOG.info("Setting block keys. BlockPool = {} .", blockPoolId); removeExpiredKeys(); - this.currentKey = exportedKeys.getCurrentKey(); + if (updateCurrentKey || currentKey == null) { + this.currentKey = exportedKeys.getCurrentKey(); + } BlockKey[] receivedKeys = exportedKeys.getAllKeys(); for (int i = 0; i < receivedKeys.length; i++) { if (receivedKeys[i] != null) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 56c86482bd..f0c88f3755 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -789,7 +789,7 @@ void enableRMTerminationForTesting() { checkNSRunning = false; } - private boolean isBlockTokenEnabled() { + protected boolean isBlockTokenEnabled() { return blockTokenSecretManager != null; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index 29327a1611..b3eb2fd7f9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -2055,10 +2055,13 @@ public void setBalancerBandwidth(long bandwidth) throws IOException { } } - public void markAllDatanodesStale() { - LOG.info("Marking all datanodes as stale"); + public void markAllDatanodesStaleAndSetKeyUpdateIfNeed() { + LOG.info("Marking all datanodes as stale and schedule update block token if need."); synchronized (this) { for (DatanodeDescriptor dn : datanodeMap.values()) { + if (blockManager.isBlockTokenEnabled()) { + dn.setNeedKeyUpdate(true); + } for(DatanodeStorageInfo storage : dn.getStorageInfos()) { storage.markStaleAfterFailover(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java index 7d5d05bac5..1a2c024c90 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java @@ -427,8 +427,10 @@ void registrationSucceeded(BPServiceActor bpServiceActor, dn.bpRegistrationSucceeded(bpRegistration, getBlockPoolId()); // Add the initial block token secret keys to the DN's secret manager. if (dn.isBlockTokenEnabled) { + boolean updateCurrentKey = bpServiceActor.state == null + || bpServiceActor.state == HAServiceState.ACTIVE; dn.blockPoolTokenSecretManager.addKeys(getBlockPoolId(), - reg.getExportedKeys()); + reg.getExportedKeys(), updateCurrentKey); } } finally { writeUnlock(); @@ -781,7 +783,7 @@ assert getBlockPoolId().equals(bp) : if (dn.isBlockTokenEnabled) { dn.blockPoolTokenSecretManager.addKeys( getBlockPoolId(), - ((KeyUpdateCommand) cmd).getExportedKeys()); + ((KeyUpdateCommand) cmd).getExportedKeys(), true); } break; case DatanodeProtocol.DNA_BALANCERBANDWIDTHUPDATE: @@ -822,7 +824,7 @@ private boolean processCommandFromStandby(DatanodeCommand cmd, if (dn.isBlockTokenEnabled) { dn.blockPoolTokenSecretManager.addKeys( getBlockPoolId(), - ((KeyUpdateCommand) cmd).getExportedKeys()); + ((KeyUpdateCommand) cmd).getExportedKeys(), false); } break; case DatanodeProtocol.DNA_TRANSFER: diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index da30d05796..a391b8b8fb 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -313,6 +313,7 @@ private void connectToNNAndHandshake() throws IOException { // This also initializes our block pool in the DN if we are // the first NN connection for this BP. bpos.verifyAndSetNamespaceInfo(this, nsInfo); + state = nsInfo.getState(); /* set thread name again to include NamespaceInfo when it's available. */ this.bpThread.setName(formatThreadName("heartbeating", nnAddr)); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java index d5fcfc9544..2e902f694a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java @@ -102,7 +102,7 @@ public class DataNodeMetrics { final MutableQuantiles[] ramDiskBlocksLazyPersistWindowMsQuantiles; @Metric MutableCounterLong fsyncCount; - + @Metric MutableCounterLong volumeFailures; @Metric("Count of network errors on the datanode") diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 542cc37046..e21c243986 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -1401,7 +1401,7 @@ void startActiveServices() throws IOException { editLogTailer.catchupDuringFailover(); blockManager.setPostponeBlocksFromFuture(false); - blockManager.getDatanodeManager().markAllDatanodesStale(); + blockManager.getDatanodeManager().markAllDatanodesStaleAndSetKeyUpdateIfNeed(); blockManager.clearQueues(); blockManager.processAllPendingDNMessages(); blockManager.getBlockIdManager().applyImpendingGenerationStamp(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index 4ff57dd7e1..791f6529da 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -1124,7 +1124,7 @@ public static BlockOpResponseProto transferRbw(final ExtendedBlock b, return BlockOpResponseProto.parseDelimitedFrom(in); } } - + public static void setFederatedConfiguration(MiniDFSCluster cluster, Configuration conf) { Set nameservices = new HashSet(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestGetBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestGetBlocks.java index 5abb8adc14..ff751f2a68 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestGetBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestGetBlocks.java @@ -535,7 +535,7 @@ public void testReadSkipStaleStorage() throws Exception { assertEquals(blockNum - count, blocks.length); // set all storage stale - bm0.getDatanodeManager().markAllDatanodesStale(); + bm0.getDatanodeManager().markAllDatanodesStaleAndSetKeyUpdateIfNeed(); blocks = namenode.getBlocks( dataNodes[0], fileLen*2, 0, 0, null).getBlocks(); assertEquals(0, blocks.length); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java index d08276b069..fe5cb36779 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java @@ -467,7 +467,7 @@ private void testBlockPoolTokenSecretManager(boolean enableProtobuf) bpMgr.addBlockPool(bpid, slaveHandler); ExportedBlockKeys keys = masterHandler.exportKeys(); - bpMgr.addKeys(bpid, keys); + bpMgr.addKeys(bpid, keys, true); String[] storageIds = new String[] {"DS-9001"}; tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid), new StorageType[]{StorageType.DEFAULT}, storageIds); @@ -480,7 +480,7 @@ private void testBlockPoolTokenSecretManager(boolean enableProtobuf) tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid), null, null); keys = masterHandler.exportKeys(); - bpMgr.addKeys(bpid, keys); + bpMgr.addKeys(bpid, keys, true); tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid), new StorageType[]{StorageType.DEFAULT}, new String[]{"DS-9001"}); tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid), null, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestUpdateDataNodeCurrentKey.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestUpdateDataNodeCurrentKey.java new file mode 100644 index 0000000000..27922925ca --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestUpdateDataNodeCurrentKey.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.security.token.block; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.MiniDFSNNTopology; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; + +public class TestUpdateDataNodeCurrentKey { + private static final short REPLICATION = (short)1; + private MiniDFSCluster cluster = null; + private Configuration config; + + @Before + public void setup() throws IOException { + config = new Configuration(); + config.setInt( + DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 8); + config.setInt( + DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY, 10); + config.setInt( + DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION, + 12); + config.setInt( + DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY, + 300); + config.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true); + config.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true); + + cluster = new MiniDFSCluster.Builder(config) + .nnTopology(MiniDFSNNTopology.simpleHATopology()) + .numDataNodes(REPLICATION).build(); + cluster.waitActive(); + } + + @After + public void shutDownCluster() throws IOException { + if (cluster != null) { + cluster.shutdown(); + cluster = null; + } + } + + @Test + public void testUpdateDatanodeCurrentKeyWithStandbyNameNodes(){ + final String bpid = cluster.getNameNode(0).getFSImage().getBlockPoolID(); + final DataNode dataNode = cluster.getDataNodes().get(0); + BlockKey currentKey = dataNode.getBlockPoolTokenSecretManager(). + get(bpid).getCurrentKey(); + Assert.assertTrue(currentKey != null); + } + + @Test + public void testUpdateDatanodeCurrentKeyWithFailover() throws IOException, + InterruptedException { + cluster.transitionToActive(0); + final String bpid = cluster.getNameNode(0).getFSImage().getBlockPoolID(); + Thread.sleep(3000); + BlockKey annCurrentKey = cluster.getNameNode(0). + getNamesystem().getBlockManager(). + getBlockTokenSecretManager(). + getCurrentKey(); + final DataNode dataNode = cluster.getDataNodes().get(0); + BlockKey currentKey = dataNode.getBlockPoolTokenSecretManager(). + get(bpid).getCurrentKey(); + Assert.assertEquals(annCurrentKey, currentKey); + } + + @Test + public void testUpdateDatanodeCurrentKeyFromActiveNameNode() + throws IOException { + cluster.transitionToActive(0); + final DataNode oldDataNode = cluster.getDataNodes().get(0); + //Add a new datanode + cluster.startDataNodes(config, 1, true, null, null); + final String bpid = cluster.getNamesystem(0).getBlockPoolId(); + + final DatanodeInfo[] dataNodeInfos = cluster.getNameNodeRpc(0). + getDatanodeReport(HdfsConstants.DatanodeReportType.LIVE); + Assert.assertEquals(2, dataNodeInfos.length); + + //Simulate nameNode restart + cluster.restartNameNode(1, true); + + //DataNode currentKey is equals to active nameNode currentKey + BlockKey currentKey = cluster.getNameNode(0).getNamesystem(). + getBlockManager().getBlockTokenSecretManager(). + getCurrentKey(); + final DataNode newDataNode = cluster.getDataNodes().get(1); + BlockKey dnCurrentKey = oldDataNode.getBlockPoolTokenSecretManager(). + get(bpid).getCurrentKey(); + BlockKey dn2CurrentKey = newDataNode.getBlockPoolTokenSecretManager(). + get(bpid).getCurrentKey(); + Assert.assertEquals(dnCurrentKey, dn2CurrentKey); + Assert.assertEquals(currentKey, dn2CurrentKey); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java index 8ce69a45eb..d9d236b664 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java @@ -522,7 +522,7 @@ public void testDeleteCorruptReplicaWithStatleStorages() throws Exception { try { cluster.waitActive(); BlockManager blockManager = cluster.getNamesystem().getBlockManager(); - blockManager.getDatanodeManager().markAllDatanodesStale(); + blockManager.getDatanodeManager().markAllDatanodesStaleAndSetKeyUpdateIfNeed(); FileSystem fs = cluster.getFileSystem(); FSDataOutputStream out = fs.create(file); for (int i = 0; i < 1024 * 1024 * 1; i++) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCorruptionWithFailover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCorruptionWithFailover.java index 06e4f605fb..a5591ad35f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCorruptionWithFailover.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCorruptionWithFailover.java @@ -65,8 +65,8 @@ public void testCorruptReplicaAfterFailover() throws Exception { BlockManager bm1 = cluster.getNamesystem(1).getBlockManager(); // Mark datanodes as stale, as are marked if a namenode went through a // failover, to prevent replica deletion. - bm0.getDatanodeManager().markAllDatanodesStale(); - bm1.getDatanodeManager().markAllDatanodesStale(); + bm0.getDatanodeManager().markAllDatanodesStaleAndSetKeyUpdateIfNeed(); + bm1.getDatanodeManager().markAllDatanodesStaleAndSetKeyUpdateIfNeed(); // Restart the datanode cluster.restartDataNode(dn); // The replica from the datanode will be having lesser genstamp, so