HDFS-11741. Long running balancer may fail due to expired DataEncryptionKey. Contributed by Wei-Chiu Chuang and Xiao Chen.
This commit is contained in:
parent
ece33208b8
commit
6a3fc685a9
@ -45,6 +45,7 @@
|
|||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import org.apache.hadoop.fs.StorageType;
|
import org.apache.hadoop.fs.StorageType;
|
||||||
|
import org.apache.hadoop.util.Timer;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* BlockTokenSecretManager can be instantiated in 2 modes, master mode
|
* BlockTokenSecretManager can be instantiated in 2 modes, master mode
|
||||||
@ -83,6 +84,11 @@ public class BlockTokenSecretManager extends
|
|||||||
|
|
||||||
private final SecureRandom nonceGenerator = new SecureRandom();
|
private final SecureRandom nonceGenerator = new SecureRandom();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Timer object for querying the current time. Separated out for
|
||||||
|
* unit testing.
|
||||||
|
*/
|
||||||
|
private Timer timer;
|
||||||
/**
|
/**
|
||||||
* Constructor for workers.
|
* Constructor for workers.
|
||||||
*
|
*
|
||||||
@ -130,6 +136,7 @@ private BlockTokenSecretManager(boolean isMaster, long keyUpdateInterval,
|
|||||||
this.blockPoolId = blockPoolId;
|
this.blockPoolId = blockPoolId;
|
||||||
this.encryptionAlgorithm = encryptionAlgorithm;
|
this.encryptionAlgorithm = encryptionAlgorithm;
|
||||||
this.useProto = useProto;
|
this.useProto = useProto;
|
||||||
|
this.timer = new Timer();
|
||||||
generateKeys();
|
generateKeys();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -160,10 +167,10 @@ private synchronized void generateKeys() {
|
|||||||
* more.
|
* more.
|
||||||
*/
|
*/
|
||||||
setSerialNo(serialNo + 1);
|
setSerialNo(serialNo + 1);
|
||||||
currentKey = new BlockKey(serialNo, Time.now() + 2
|
currentKey = new BlockKey(serialNo, timer.now() + 2
|
||||||
* keyUpdateInterval + tokenLifetime, generateSecret());
|
* keyUpdateInterval + tokenLifetime, generateSecret());
|
||||||
setSerialNo(serialNo + 1);
|
setSerialNo(serialNo + 1);
|
||||||
nextKey = new BlockKey(serialNo, Time.now() + 3
|
nextKey = new BlockKey(serialNo, timer.now() + 3
|
||||||
* keyUpdateInterval + tokenLifetime, generateSecret());
|
* keyUpdateInterval + tokenLifetime, generateSecret());
|
||||||
allKeys.put(currentKey.getKeyId(), currentKey);
|
allKeys.put(currentKey.getKeyId(), currentKey);
|
||||||
allKeys.put(nextKey.getKeyId(), nextKey);
|
allKeys.put(nextKey.getKeyId(), nextKey);
|
||||||
@ -180,7 +187,7 @@ public synchronized ExportedBlockKeys exportKeys() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private synchronized void removeExpiredKeys() {
|
private synchronized void removeExpiredKeys() {
|
||||||
long now = Time.now();
|
long now = timer.now();
|
||||||
for (Iterator<Map.Entry<Integer, BlockKey>> it = allKeys.entrySet()
|
for (Iterator<Map.Entry<Integer, BlockKey>> it = allKeys.entrySet()
|
||||||
.iterator(); it.hasNext();) {
|
.iterator(); it.hasNext();) {
|
||||||
Map.Entry<Integer, BlockKey> e = it.next();
|
Map.Entry<Integer, BlockKey> e = it.next();
|
||||||
@ -230,15 +237,15 @@ synchronized boolean updateKeys() throws IOException {
|
|||||||
removeExpiredKeys();
|
removeExpiredKeys();
|
||||||
// set final expiry date of retiring currentKey
|
// set final expiry date of retiring currentKey
|
||||||
allKeys.put(currentKey.getKeyId(), new BlockKey(currentKey.getKeyId(),
|
allKeys.put(currentKey.getKeyId(), new BlockKey(currentKey.getKeyId(),
|
||||||
Time.now() + keyUpdateInterval + tokenLifetime,
|
timer.now() + keyUpdateInterval + tokenLifetime,
|
||||||
currentKey.getKey()));
|
currentKey.getKey()));
|
||||||
// update the estimated expiry date of new currentKey
|
// update the estimated expiry date of new currentKey
|
||||||
currentKey = new BlockKey(nextKey.getKeyId(), Time.now()
|
currentKey = new BlockKey(nextKey.getKeyId(), timer.now()
|
||||||
+ 2 * keyUpdateInterval + tokenLifetime, nextKey.getKey());
|
+ 2 * keyUpdateInterval + tokenLifetime, nextKey.getKey());
|
||||||
allKeys.put(currentKey.getKeyId(), currentKey);
|
allKeys.put(currentKey.getKeyId(), currentKey);
|
||||||
// generate a new nextKey
|
// generate a new nextKey
|
||||||
setSerialNo(serialNo + 1);
|
setSerialNo(serialNo + 1);
|
||||||
nextKey = new BlockKey(serialNo, Time.now() + 3
|
nextKey = new BlockKey(serialNo, timer.now() + 3
|
||||||
* keyUpdateInterval + tokenLifetime, generateSecret());
|
* keyUpdateInterval + tokenLifetime, generateSecret());
|
||||||
allKeys.put(nextKey.getKeyId(), nextKey);
|
allKeys.put(nextKey.getKeyId(), nextKey);
|
||||||
return true;
|
return true;
|
||||||
@ -410,7 +417,7 @@ protected byte[] createPassword(BlockTokenIdentifier identifier) {
|
|||||||
}
|
}
|
||||||
if (key == null)
|
if (key == null)
|
||||||
throw new IllegalStateException("currentKey hasn't been initialized.");
|
throw new IllegalStateException("currentKey hasn't been initialized.");
|
||||||
identifier.setExpiryDate(Time.now() + tokenLifetime);
|
identifier.setExpiryDate(timer.now() + tokenLifetime);
|
||||||
identifier.setKeyId(key.getKeyId());
|
identifier.setKeyId(key.getKeyId());
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Generating block token for " + identifier.toString());
|
LOG.debug("Generating block token for " + identifier.toString());
|
||||||
@ -461,7 +468,7 @@ public DataEncryptionKey generateDataEncryptionKey() {
|
|||||||
}
|
}
|
||||||
byte[] encryptionKey = createPassword(nonce, key.getKey());
|
byte[] encryptionKey = createPassword(nonce, key.getKey());
|
||||||
return new DataEncryptionKey(key.getKeyId(), blockPoolId, nonce,
|
return new DataEncryptionKey(key.getKeyId(), blockPoolId, nonce,
|
||||||
encryptionKey, Time.now() + tokenLifetime,
|
encryptionKey, timer.now() + tokenLifetime,
|
||||||
encryptionAlgorithm);
|
encryptionAlgorithm);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -21,8 +21,6 @@
|
|||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.StorageType;
|
import org.apache.hadoop.fs.StorageType;
|
||||||
@ -37,13 +35,16 @@
|
|||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.util.Daemon;
|
import org.apache.hadoop.util.Daemon;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
import org.apache.hadoop.util.Timer;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The class provides utilities for key and token management.
|
* The class provides utilities for key and token management.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class KeyManager implements Closeable, DataEncryptionKeyFactory {
|
public class KeyManager implements Closeable, DataEncryptionKeyFactory {
|
||||||
private static final Log LOG = LogFactory.getLog(KeyManager.class);
|
private static final Logger LOG = LoggerFactory.getLogger(KeyManager.class);
|
||||||
|
|
||||||
private final NamenodeProtocol namenode;
|
private final NamenodeProtocol namenode;
|
||||||
|
|
||||||
@ -54,11 +55,17 @@ public class KeyManager implements Closeable, DataEncryptionKeyFactory {
|
|||||||
private final BlockTokenSecretManager blockTokenSecretManager;
|
private final BlockTokenSecretManager blockTokenSecretManager;
|
||||||
private final BlockKeyUpdater blockKeyUpdater;
|
private final BlockKeyUpdater blockKeyUpdater;
|
||||||
private DataEncryptionKey encryptionKey;
|
private DataEncryptionKey encryptionKey;
|
||||||
|
/**
|
||||||
|
* Timer object for querying the current time. Separated out for
|
||||||
|
* unit testing.
|
||||||
|
*/
|
||||||
|
private Timer timer;
|
||||||
|
|
||||||
public KeyManager(String blockpoolID, NamenodeProtocol namenode,
|
public KeyManager(String blockpoolID, NamenodeProtocol namenode,
|
||||||
boolean encryptDataTransfer, Configuration conf) throws IOException {
|
boolean encryptDataTransfer, Configuration conf) throws IOException {
|
||||||
this.namenode = namenode;
|
this.namenode = namenode;
|
||||||
this.encryptDataTransfer = encryptDataTransfer;
|
this.encryptDataTransfer = encryptDataTransfer;
|
||||||
|
this.timer = new Timer();
|
||||||
|
|
||||||
final ExportedBlockKeys keys = namenode.getBlockKeys();
|
final ExportedBlockKeys keys = namenode.getBlockKeys();
|
||||||
this.isBlockTokenEnabled = keys.isBlockTokenEnabled();
|
this.isBlockTokenEnabled = keys.isBlockTokenEnabled();
|
||||||
@ -113,7 +120,25 @@ public Token<BlockTokenIdentifier> getAccessToken(ExtendedBlock eb,
|
|||||||
public DataEncryptionKey newDataEncryptionKey() {
|
public DataEncryptionKey newDataEncryptionKey() {
|
||||||
if (encryptDataTransfer) {
|
if (encryptDataTransfer) {
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
if (encryptionKey == null) {
|
if (encryptionKey == null ||
|
||||||
|
encryptionKey.expiryDate < timer.now()) {
|
||||||
|
// Encryption Key (EK) is generated from Block Key (BK).
|
||||||
|
// Check if EK is expired, and generate a new one using the current BK
|
||||||
|
// if so, otherwise continue to use the previously generated EK.
|
||||||
|
//
|
||||||
|
// It's important to make sure that when EK is not expired, the BK
|
||||||
|
// used to generate the EK is not expired and removed, because
|
||||||
|
// the same BK will be used to re-generate the EK
|
||||||
|
// by BlockTokenSecretManager.
|
||||||
|
//
|
||||||
|
// The current implementation ensures that when an EK is not expired
|
||||||
|
// (within tokenLifetime), the BK that's used to generate it
|
||||||
|
// still has at least "keyUpdateInterval" of life time before
|
||||||
|
// the BK gets expired and removed.
|
||||||
|
// See BlockTokenSecretManager for details.
|
||||||
|
LOG.debug("Generating new data encryption key because current key "
|
||||||
|
+ (encryptionKey == null ?
|
||||||
|
"is null." : "expired on " + encryptionKey.expiryDate));
|
||||||
encryptionKey = blockTokenSecretManager.generateDataEncryptionKey();
|
encryptionKey = blockTokenSecretManager.generateDataEncryptionKey();
|
||||||
}
|
}
|
||||||
return encryptionKey;
|
return encryptionKey;
|
||||||
|
@ -0,0 +1,87 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.balancer;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
|
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
|
||||||
|
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
|
||||||
|
import org.apache.hadoop.util.FakeTimer;
|
||||||
|
import org.junit.Rule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.rules.Timeout;
|
||||||
|
import org.mockito.internal.util.reflection.Whitebox;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertNotEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test KeyManager class.
|
||||||
|
*/
|
||||||
|
public class TestKeyManager {
|
||||||
|
@Rule
|
||||||
|
public Timeout globalTimeout = new Timeout(120000);
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNewDataEncryptionKey() throws Exception {
|
||||||
|
final Configuration conf = new HdfsConfiguration();
|
||||||
|
// Enable data transport encryption and access token
|
||||||
|
conf.setBoolean(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY, true);
|
||||||
|
conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
|
||||||
|
|
||||||
|
final long keyUpdateInterval = 2 * 1000;
|
||||||
|
final long tokenLifeTime = keyUpdateInterval;
|
||||||
|
final String blockPoolId = "bp-foo";
|
||||||
|
FakeTimer fakeTimer = new FakeTimer();
|
||||||
|
BlockTokenSecretManager btsm = new BlockTokenSecretManager(
|
||||||
|
keyUpdateInterval, tokenLifeTime, 0, 1, blockPoolId, null, false);
|
||||||
|
Whitebox.setInternalState(btsm, "timer", fakeTimer);
|
||||||
|
|
||||||
|
// When KeyManager asks for block keys, return them from btsm directly
|
||||||
|
NamenodeProtocol namenode = mock(NamenodeProtocol.class);
|
||||||
|
when(namenode.getBlockKeys()).thenReturn(btsm.exportKeys());
|
||||||
|
|
||||||
|
// Instantiate a KeyManager instance and get data encryption key.
|
||||||
|
KeyManager keyManager = new KeyManager(blockPoolId, namenode,
|
||||||
|
true, conf);
|
||||||
|
Whitebox.setInternalState(keyManager, "timer", fakeTimer);
|
||||||
|
Whitebox.setInternalState(
|
||||||
|
Whitebox.getInternalState(keyManager, "blockTokenSecretManager"),
|
||||||
|
"timer", fakeTimer);
|
||||||
|
final DataEncryptionKey dek = keyManager.newDataEncryptionKey();
|
||||||
|
final long remainingTime = dek.expiryDate - fakeTimer.now();
|
||||||
|
assertEquals("KeyManager dataEncryptionKey should expire in 2 seconds",
|
||||||
|
keyUpdateInterval, remainingTime);
|
||||||
|
// advance the timer to expire the block key and data encryption key
|
||||||
|
fakeTimer.advance(keyUpdateInterval + 1);
|
||||||
|
|
||||||
|
// After the initial data encryption key expires, KeyManager should
|
||||||
|
// regenerate a valid data encryption key using the current block key.
|
||||||
|
final DataEncryptionKey dekAfterExpiration =
|
||||||
|
keyManager.newDataEncryptionKey();
|
||||||
|
assertNotEquals("KeyManager should generate a new data encryption key",
|
||||||
|
dek, dekAfterExpiration);
|
||||||
|
assertTrue("KeyManager has an expired DataEncryptionKey!",
|
||||||
|
dekAfterExpiration.expiryDate > fakeTimer.now());
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user