HDFS-13603: do not propagate ExecutionException and add maxRetries limit to NameNode edek cache warmup (#6774)
This commit is contained in:
parent
d3b98cb1b2
commit
b4ddb2d3bb
@ -1422,6 +1422,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||
public static final int DFS_NAMENODE_EDEKCACHELOADER_INTERVAL_MS_DEFAULT = 1000;
|
||||
public static final String DFS_NAMENODE_EDEKCACHELOADER_INITIAL_DELAY_MS_KEY = "dfs.namenode.edekcacheloader.initial.delay.ms";
|
||||
public static final int DFS_NAMENODE_EDEKCACHELOADER_INITIAL_DELAY_MS_DEFAULT = 3000;
|
||||
public static final String DFS_NAMENODE_EDEKCACHELOADER_MAX_RETRIES_KEY =
|
||||
"dfs.namenode.edekcacheloader.max-retries";
|
||||
public static final int DFS_NAMENODE_EDEKCACHELOADER_MAX_RETRIES_DEFAULT = 10;
|
||||
public static final String DFS_NAMENODE_REENCRYPT_SLEEP_INTERVAL_KEY = "dfs.namenode.reencrypt.sleep.interval";
|
||||
public static final String DFS_NAMENODE_REENCRYPT_SLEEP_INTERVAL_DEFAULT = "1m";
|
||||
public static final String DFS_NAMENODE_REENCRYPT_BATCH_SIZE_KEY = "dfs.namenode.reencrypt.batch.size";
|
||||
|
@ -533,16 +533,16 @@ static boolean isInAnEZ(final FSDirectory fsd, final INodesInPath iip)
|
||||
}
|
||||
|
||||
/**
|
||||
* Proactively warm up the edek cache. We'll get all the edek key names,
|
||||
* then launch up a separate thread to warm them up.
|
||||
* Best-effort attempt to proactively warm up the edek cache. We'll get all the edek key names,
|
||||
* then launch up a separate thread to warm them up. Retries happen if any of keys fail to warm up.
|
||||
*/
|
||||
static void warmUpEdekCache(final ExecutorService executor,
|
||||
final FSDirectory fsd, final int delay, final int interval) {
|
||||
final FSDirectory fsd, final int delay, final int interval, final int maxRetries) {
|
||||
fsd.readLock();
|
||||
try {
|
||||
String[] edeks = fsd.ezManager.getKeyNames();
|
||||
executor.execute(
|
||||
new EDEKCacheLoader(edeks, fsd.getProvider(), delay, interval));
|
||||
new EDEKCacheLoader(edeks, fsd.getProvider(), delay, interval, maxRetries));
|
||||
} finally {
|
||||
fsd.readUnlock();
|
||||
}
|
||||
@ -557,19 +557,22 @@ static class EDEKCacheLoader implements Runnable {
|
||||
private final KeyProviderCryptoExtension kp;
|
||||
private int initialDelay;
|
||||
private int retryInterval;
|
||||
private int maxRetries;
|
||||
|
||||
EDEKCacheLoader(final String[] names, final KeyProviderCryptoExtension kp,
|
||||
final int delay, final int interval) {
|
||||
final int delay, final int interval, final int maxRetries) {
|
||||
this.keyNames = names;
|
||||
this.kp = kp;
|
||||
this.initialDelay = delay;
|
||||
this.retryInterval = interval;
|
||||
this.maxRetries = maxRetries;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
NameNode.LOG.info("Warming up {} EDEKs... (initialDelay={}, "
|
||||
+ "retryInterval={})", keyNames.length, initialDelay, retryInterval);
|
||||
+ "retryInterval={}, maxRetries={})", keyNames.length, initialDelay, retryInterval,
|
||||
maxRetries);
|
||||
try {
|
||||
Thread.sleep(initialDelay);
|
||||
} catch (InterruptedException ie) {
|
||||
@ -577,42 +580,39 @@ public void run() {
|
||||
return;
|
||||
}
|
||||
|
||||
final int logCoolDown = 10000; // periodically print error log (if any)
|
||||
int sinceLastLog = logCoolDown; // always print the first failure
|
||||
boolean success = false;
|
||||
int retryCount = 0;
|
||||
IOException lastSeenIOE = null;
|
||||
long warmUpEDEKStartTime = monotonicNow();
|
||||
while (true) {
|
||||
|
||||
while (!success && retryCount < maxRetries) {
|
||||
try {
|
||||
kp.warmUpEncryptedKeys(keyNames);
|
||||
NameNode.LOG
|
||||
.info("Successfully warmed up {} EDEKs.", keyNames.length);
|
||||
NameNode.LOG.info("Successfully warmed up {} EDEKs.", keyNames.length);
|
||||
success = true;
|
||||
break;
|
||||
} catch (IOException ioe) {
|
||||
lastSeenIOE = ioe;
|
||||
if (sinceLastLog >= logCoolDown) {
|
||||
NameNode.LOG.info("Failed to warm up EDEKs.", ioe);
|
||||
sinceLastLog = 0;
|
||||
} else {
|
||||
NameNode.LOG.debug("Failed to warm up EDEKs.", ioe);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
NameNode.LOG.error("Cannot warm up EDEKs.", e);
|
||||
throw e;
|
||||
}
|
||||
|
||||
if (!success) {
|
||||
try {
|
||||
Thread.sleep(retryInterval);
|
||||
} catch (InterruptedException ie) {
|
||||
NameNode.LOG.info("EDEKCacheLoader interrupted during retry.");
|
||||
break;
|
||||
}
|
||||
sinceLastLog += retryInterval;
|
||||
retryCount++;
|
||||
}
|
||||
}
|
||||
|
||||
long warmUpEDEKTime = monotonicNow() - warmUpEDEKStartTime;
|
||||
NameNode.getNameNodeMetrics().addWarmUpEDEKTime(warmUpEDEKTime);
|
||||
if (!success) {
|
||||
NameNode.LOG.warn("Unable to warm up EDEKs.");
|
||||
NameNode.LOG.warn("Max retry {} reached, unable to warm up EDEKs.", maxRetries);
|
||||
if (lastSeenIOE != null) {
|
||||
NameNode.LOG.warn("Last seen exception:", lastSeenIOE);
|
||||
}
|
||||
|
@ -579,6 +579,7 @@ private boolean isFromProxyUser(CallerContext ctx) {
|
||||
private ExecutorService edekCacheLoader = null;
|
||||
private final int edekCacheLoaderDelay;
|
||||
private final int edekCacheLoaderInterval;
|
||||
private final int edekCacheLoaderMaxRetries;
|
||||
|
||||
/**
|
||||
* When an active namenode will roll its own edit log, in # edits
|
||||
@ -1012,6 +1013,9 @@ static FSNamesystem loadFromDisk(Configuration conf) throws IOException {
|
||||
this.edekCacheLoaderInterval = conf.getInt(
|
||||
DFSConfigKeys.DFS_NAMENODE_EDEKCACHELOADER_INTERVAL_MS_KEY,
|
||||
DFSConfigKeys.DFS_NAMENODE_EDEKCACHELOADER_INTERVAL_MS_DEFAULT);
|
||||
this.edekCacheLoaderMaxRetries = conf.getInt(
|
||||
DFSConfigKeys.DFS_NAMENODE_EDEKCACHELOADER_MAX_RETRIES_KEY,
|
||||
DFSConfigKeys.DFS_NAMENODE_EDEKCACHELOADER_MAX_RETRIES_DEFAULT);
|
||||
|
||||
this.leaseRecheckIntervalMs = conf.getLong(
|
||||
DFS_NAMENODE_LEASE_RECHECK_INTERVAL_MS_KEY,
|
||||
@ -1470,8 +1474,9 @@ void startActiveServices() throws IOException {
|
||||
new ThreadFactoryBuilder().setDaemon(true)
|
||||
.setNameFormat("Warm Up EDEK Cache Thread #%d")
|
||||
.build());
|
||||
FSDirEncryptionZoneOp.warmUpEdekCache(edekCacheLoader, dir,
|
||||
edekCacheLoaderDelay, edekCacheLoaderInterval);
|
||||
FSDirEncryptionZoneOp
|
||||
.warmUpEdekCache(edekCacheLoader, dir, edekCacheLoaderDelay, edekCacheLoaderInterval,
|
||||
edekCacheLoaderMaxRetries);
|
||||
}
|
||||
if (blockManager.getSPSManager() != null) {
|
||||
blockManager.getSPSManager().start();
|
||||
|
@ -3614,6 +3614,14 @@
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.namenode.edekcacheloader.max-retries</name>
|
||||
<value>10</value>
|
||||
<description>When KeyProvider is configured, the max retries allowed to attempt
|
||||
warm up edek cache if none of key successful on NN start up / become active.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.namenode.reencrypt.sleep.interval</name>
|
||||
<value>1m</value>
|
||||
|
@ -0,0 +1,59 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.namenode;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.doThrow;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
||||
public class TestFSDirEncryptionZoneOp {
|
||||
|
||||
@Test
|
||||
public void testWarmUpEdekCacheRetries() throws IOException {
|
||||
NameNode.initMetrics(new Configuration(), NamenodeRole.NAMENODE);
|
||||
|
||||
final int initialDelay = 100;
|
||||
final int retryInterval = 100;
|
||||
final int maxRetries = 2;
|
||||
|
||||
KeyProviderCryptoExtension kpMock = mock(KeyProviderCryptoExtension.class);
|
||||
|
||||
doThrow(new IOException())
|
||||
.doThrow(new IOException())
|
||||
.doAnswer(invocation -> null)
|
||||
.when(kpMock).warmUpEncryptedKeys(any());
|
||||
|
||||
FSDirEncryptionZoneOp.EDEKCacheLoader loader =
|
||||
new FSDirEncryptionZoneOp.EDEKCacheLoader(new String[] {"edek1", "edek2"}, kpMock,
|
||||
initialDelay, retryInterval, maxRetries);
|
||||
|
||||
loader.run();
|
||||
|
||||
verify(kpMock, times(maxRetries)).warmUpEncryptedKeys(any());
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user