HDFS-17383:Datanode current block token should come from active NameNode in HA mode (#6562). Contributed by lei w.

Reviewed-by: Shuyan Zhang <zhangshuyan@apache.org>
Signed-off-by: Shuyan Zhang <zhangshuyan@apache.org>
This commit is contained in:
Lei313 2024-04-15 18:35:53 +08:00 committed by GitHub
parent bd1a08b2cf
commit f49a4df797
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 155 additions and 21 deletions

View File

@ -141,9 +141,9 @@ public void checkAccess(Token<BlockTokenIdentifier> token,
/**
* See {@link BlockTokenSecretManager#addKeys(ExportedBlockKeys)}.
*/
public void addKeys(String bpid, ExportedBlockKeys exportedKeys)
throws IOException {
get(bpid).addKeys(exportedKeys);
public void addKeys(String bpid, ExportedBlockKeys exportedKeys,
boolean updateCurrentKey) throws IOException {
get(bpid).addKeys(exportedKeys, updateCurrentKey);
}
/**

View File

@ -218,17 +218,23 @@ private synchronized void removeExpiredKeys() {
}
}
public synchronized void addKeys(ExportedBlockKeys exportedKeys) throws IOException {
addKeys(exportedKeys, true);
}
/**
* Set block keys, only to be used in worker mode
*/
public synchronized void addKeys(ExportedBlockKeys exportedKeys)
throws IOException {
public synchronized void addKeys(ExportedBlockKeys exportedKeys,
boolean updateCurrentKey) throws IOException {
if (isMaster || exportedKeys == null) {
return;
}
LOG.info("Setting block keys. BlockPool = {} .", blockPoolId);
removeExpiredKeys();
this.currentKey = exportedKeys.getCurrentKey();
if (updateCurrentKey || currentKey == null) {
this.currentKey = exportedKeys.getCurrentKey();
}
BlockKey[] receivedKeys = exportedKeys.getAllKeys();
for (int i = 0; i < receivedKeys.length; i++) {
if (receivedKeys[i] != null) {

View File

@ -789,7 +789,7 @@ void enableRMTerminationForTesting() {
checkNSRunning = false;
}
private boolean isBlockTokenEnabled() {
protected boolean isBlockTokenEnabled() {
return blockTokenSecretManager != null;
}

View File

@ -2055,10 +2055,13 @@ public void setBalancerBandwidth(long bandwidth) throws IOException {
}
}
public void markAllDatanodesStale() {
LOG.info("Marking all datanodes as stale");
public void markAllDatanodesStaleAndSetKeyUpdateIfNeed() {
LOG.info("Marking all datanodes as stale and schedule update block token if need.");
synchronized (this) {
for (DatanodeDescriptor dn : datanodeMap.values()) {
if (blockManager.isBlockTokenEnabled()) {
dn.setNeedKeyUpdate(true);
}
for(DatanodeStorageInfo storage : dn.getStorageInfos()) {
storage.markStaleAfterFailover();
}

View File

@ -427,8 +427,10 @@ void registrationSucceeded(BPServiceActor bpServiceActor,
dn.bpRegistrationSucceeded(bpRegistration, getBlockPoolId());
// Add the initial block token secret keys to the DN's secret manager.
if (dn.isBlockTokenEnabled) {
boolean updateCurrentKey = bpServiceActor.state == null
|| bpServiceActor.state == HAServiceState.ACTIVE;
dn.blockPoolTokenSecretManager.addKeys(getBlockPoolId(),
reg.getExportedKeys());
reg.getExportedKeys(), updateCurrentKey);
}
} finally {
writeUnlock();
@ -781,7 +783,7 @@ assert getBlockPoolId().equals(bp) :
if (dn.isBlockTokenEnabled) {
dn.blockPoolTokenSecretManager.addKeys(
getBlockPoolId(),
((KeyUpdateCommand) cmd).getExportedKeys());
((KeyUpdateCommand) cmd).getExportedKeys(), true);
}
break;
case DatanodeProtocol.DNA_BALANCERBANDWIDTHUPDATE:
@ -822,7 +824,7 @@ private boolean processCommandFromStandby(DatanodeCommand cmd,
if (dn.isBlockTokenEnabled) {
dn.blockPoolTokenSecretManager.addKeys(
getBlockPoolId(),
((KeyUpdateCommand) cmd).getExportedKeys());
((KeyUpdateCommand) cmd).getExportedKeys(), false);
}
break;
case DatanodeProtocol.DNA_TRANSFER:

View File

@ -313,6 +313,7 @@ private void connectToNNAndHandshake() throws IOException {
// This also initializes our block pool in the DN if we are
// the first NN connection for this BP.
bpos.verifyAndSetNamespaceInfo(this, nsInfo);
state = nsInfo.getState();
/* set thread name again to include NamespaceInfo when it's available. */
this.bpThread.setName(formatThreadName("heartbeating", nnAddr));

View File

@ -102,7 +102,7 @@ public class DataNodeMetrics {
final MutableQuantiles[] ramDiskBlocksLazyPersistWindowMsQuantiles;
@Metric MutableCounterLong fsyncCount;
@Metric MutableCounterLong volumeFailures;
@Metric("Count of network errors on the datanode")

View File

@ -1401,7 +1401,7 @@ void startActiveServices() throws IOException {
editLogTailer.catchupDuringFailover();
blockManager.setPostponeBlocksFromFuture(false);
blockManager.getDatanodeManager().markAllDatanodesStale();
blockManager.getDatanodeManager().markAllDatanodesStaleAndSetKeyUpdateIfNeed();
blockManager.clearQueues();
blockManager.processAllPendingDNMessages();
blockManager.getBlockIdManager().applyImpendingGenerationStamp();

View File

@ -1124,7 +1124,7 @@ public static BlockOpResponseProto transferRbw(final ExtendedBlock b,
return BlockOpResponseProto.parseDelimitedFrom(in);
}
}
public static void setFederatedConfiguration(MiniDFSCluster cluster,
Configuration conf) {
Set<String> nameservices = new HashSet<String>();

View File

@ -535,7 +535,7 @@ public void testReadSkipStaleStorage() throws Exception {
assertEquals(blockNum - count, blocks.length);
// set all storage stale
bm0.getDatanodeManager().markAllDatanodesStale();
bm0.getDatanodeManager().markAllDatanodesStaleAndSetKeyUpdateIfNeed();
blocks = namenode.getBlocks(
dataNodes[0], fileLen*2, 0, 0, null).getBlocks();
assertEquals(0, blocks.length);

View File

@ -467,7 +467,7 @@ private void testBlockPoolTokenSecretManager(boolean enableProtobuf)
bpMgr.addBlockPool(bpid, slaveHandler);
ExportedBlockKeys keys = masterHandler.exportKeys();
bpMgr.addKeys(bpid, keys);
bpMgr.addKeys(bpid, keys, true);
String[] storageIds = new String[] {"DS-9001"};
tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid),
new StorageType[]{StorageType.DEFAULT}, storageIds);
@ -480,7 +480,7 @@ private void testBlockPoolTokenSecretManager(boolean enableProtobuf)
tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid), null,
null);
keys = masterHandler.exportKeys();
bpMgr.addKeys(bpid, keys);
bpMgr.addKeys(bpid, keys, true);
tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid),
new StorageType[]{StorageType.DEFAULT}, new String[]{"DS-9001"});
tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid), null,

View File

@ -0,0 +1,122 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.security.token.block;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
public class TestUpdateDataNodeCurrentKey {
private static final short REPLICATION = (short)1;
private MiniDFSCluster cluster = null;
private Configuration config;
@Before
public void setup() throws IOException {
config = new Configuration();
config.setInt(
DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 8);
config.setInt(
DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY, 10);
config.setInt(
DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION,
12);
config.setInt(
DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY,
300);
config.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
config.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
cluster = new MiniDFSCluster.Builder(config)
.nnTopology(MiniDFSNNTopology.simpleHATopology())
.numDataNodes(REPLICATION).build();
cluster.waitActive();
}
@After
public void shutDownCluster() throws IOException {
if (cluster != null) {
cluster.shutdown();
cluster = null;
}
}
@Test
public void testUpdateDatanodeCurrentKeyWithStandbyNameNodes(){
final String bpid = cluster.getNameNode(0).getFSImage().getBlockPoolID();
final DataNode dataNode = cluster.getDataNodes().get(0);
BlockKey currentKey = dataNode.getBlockPoolTokenSecretManager().
get(bpid).getCurrentKey();
Assert.assertTrue(currentKey != null);
}
@Test
public void testUpdateDatanodeCurrentKeyWithFailover() throws IOException,
InterruptedException {
cluster.transitionToActive(0);
final String bpid = cluster.getNameNode(0).getFSImage().getBlockPoolID();
Thread.sleep(3000);
BlockKey annCurrentKey = cluster.getNameNode(0).
getNamesystem().getBlockManager().
getBlockTokenSecretManager().
getCurrentKey();
final DataNode dataNode = cluster.getDataNodes().get(0);
BlockKey currentKey = dataNode.getBlockPoolTokenSecretManager().
get(bpid).getCurrentKey();
Assert.assertEquals(annCurrentKey, currentKey);
}
@Test
public void testUpdateDatanodeCurrentKeyFromActiveNameNode()
throws IOException {
cluster.transitionToActive(0);
final DataNode oldDataNode = cluster.getDataNodes().get(0);
//Add a new datanode
cluster.startDataNodes(config, 1, true, null, null);
final String bpid = cluster.getNamesystem(0).getBlockPoolId();
final DatanodeInfo[] dataNodeInfos = cluster.getNameNodeRpc(0).
getDatanodeReport(HdfsConstants.DatanodeReportType.LIVE);
Assert.assertEquals(2, dataNodeInfos.length);
//Simulate nameNode restart
cluster.restartNameNode(1, true);
//DataNode currentKey is equals to active nameNode currentKey
BlockKey currentKey = cluster.getNameNode(0).getNamesystem().
getBlockManager().getBlockTokenSecretManager().
getCurrentKey();
final DataNode newDataNode = cluster.getDataNodes().get(1);
BlockKey dnCurrentKey = oldDataNode.getBlockPoolTokenSecretManager().
get(bpid).getCurrentKey();
BlockKey dn2CurrentKey = newDataNode.getBlockPoolTokenSecretManager().
get(bpid).getCurrentKey();
Assert.assertEquals(dnCurrentKey, dn2CurrentKey);
Assert.assertEquals(currentKey, dn2CurrentKey);
}
}

View File

@ -522,7 +522,7 @@ public void testDeleteCorruptReplicaWithStatleStorages() throws Exception {
try {
cluster.waitActive();
BlockManager blockManager = cluster.getNamesystem().getBlockManager();
blockManager.getDatanodeManager().markAllDatanodesStale();
blockManager.getDatanodeManager().markAllDatanodesStaleAndSetKeyUpdateIfNeed();
FileSystem fs = cluster.getFileSystem();
FSDataOutputStream out = fs.create(file);
for (int i = 0; i < 1024 * 1024 * 1; i++) {

View File

@ -65,8 +65,8 @@ public void testCorruptReplicaAfterFailover() throws Exception {
BlockManager bm1 = cluster.getNamesystem(1).getBlockManager();
// Mark datanodes as stale, as are marked if a namenode went through a
// failover, to prevent replica deletion.
bm0.getDatanodeManager().markAllDatanodesStale();
bm1.getDatanodeManager().markAllDatanodesStale();
bm0.getDatanodeManager().markAllDatanodesStaleAndSetKeyUpdateIfNeed();
bm1.getDatanodeManager().markAllDatanodesStaleAndSetKeyUpdateIfNeed();
// Restart the datanode
cluster.restartDataNode(dn);
// The replica from the datanode will be having lesser genstamp, so