HDFS-2560. Refactor BPOfferService to be a static inner class. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1203444 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2011-11-18 00:45:31 +00:00
parent 7edfff5795
commit 0864ef1908
4 changed files with 212 additions and 160 deletions

View File

@ -107,6 +107,7 @@ Release 0.23.1 - UNRELEASED
NEW FEATURES
IMPROVEMENTS
HDFS-2560. Refactor BPOfferService to be a static inner class (todd)
OPTIMIZATIONS

View File

@ -55,6 +55,10 @@ synchronized BlockTokenSecretManager get(String bpid) {
}
return secretMgr;
}
public synchronized boolean isBlockPoolRegistered(String bpid) {
return map.containsKey(bpid);
}
/** Return an empty BlockTokenIdentifer */
@Override

View File

@ -136,6 +136,7 @@
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
import org.apache.hadoop.hdfs.server.common.JspHelper;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.datanode.FSDataset.VolumeInfo;
import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
@ -275,7 +276,7 @@ class BlockPoolManager {
List<InetSocketAddress> isas = DFSUtil.getNNServiceRpcAddresses(conf);
for(InetSocketAddress isa : isas) {
BPOfferService bpos = new BPOfferService(isa);
BPOfferService bpos = new BPOfferService(isa, DataNode.this);
nameNodeThreads.put(bpos.getNNSocketAddress(), bpos);
}
}
@ -373,19 +374,19 @@ void refreshNamenodes(Configuration conf)
}
for (InetSocketAddress nnaddr : toStart) {
BPOfferService bpos = new BPOfferService(nnaddr);
BPOfferService bpos = new BPOfferService(nnaddr, DataNode.this);
nameNodeThreads.put(bpos.getNNSocketAddress(), bpos);
}
for (BPOfferService bpos : toShutdown) {
remove(bpos);
}
}
for (BPOfferService bpos : toShutdown) {
bpos.stop();
bpos.join();
}
// stoping the BPOSes causes them to call remove() on their own when they
// clean up.
// Now start the threads that are not already running.
startAll();
}
@ -402,9 +403,7 @@ void refreshNamenodes(Configuration conf)
Daemon dataXceiverServer = null;
ThreadGroup threadGroup = null;
long blockReportInterval;
boolean resetBlockReportTime = true;
long deleteReportInterval;
long lastDeletedReport = 0;
long initialBlockReportDelay = DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT * 1000L;
long heartBeatInterval;
private boolean heartbeatsDisabledForTests = false;
@ -653,6 +652,7 @@ private synchronized void initDataBlockScanner(Configuration conf) {
return;
}
String reason = null;
assert data != null;
if (conf.getInt(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY,
DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT) < 0) {
reason = "verification is turned off by configuration";
@ -774,11 +774,15 @@ void setHeartbeatsDisabledForTests(
* </ul>
*/
@InterfaceAudience.Private
class BPOfferService implements Runnable {
static class BPOfferService implements Runnable {
final InetSocketAddress nnAddr;
DatanodeRegistration bpRegistration;
NamespaceInfo bpNSInfo;
long lastBlockReport = 0;
long lastDeletedReport = 0;
boolean resetBlockReportTime = true;
private Thread bpThread;
private DatanodeProtocol bpNamenode;
private String blockPoolId;
@ -788,14 +792,13 @@ class BPOfferService implements Runnable {
= new LinkedList<ReceivedDeletedBlockInfo>();
private volatile int pendingReceivedRequests = 0;
private volatile boolean shouldServiceRun = true;
private boolean isBlockTokenInitialized = false;
UpgradeManagerDatanode upgradeManager = null;
private final DataNode dn;
BPOfferService(InetSocketAddress isa) {
this.bpRegistration = new DatanodeRegistration(getMachineName());
bpRegistration.setInfoPort(infoServer.getPort());
bpRegistration.setIpcPort(getIpcPort());
this.nnAddr = isa;
BPOfferService(InetSocketAddress nnAddr, DataNode dn) {
this.dn = dn;
this.bpRegistration = dn.createRegistration();
this.nnAddr = nnAddr;
}
/**
@ -822,7 +825,6 @@ private InetSocketAddress getNNSocketAddress() {
void setNamespaceInfo(NamespaceInfo nsinfo) {
bpNSInfo = nsinfo;
this.blockPoolId = nsinfo.getBlockPoolID();
blockPoolManager.addBlockPool(this);
}
void setNameNode(DatanodeProtocol dnProtocol) {
@ -831,7 +833,7 @@ void setNameNode(DatanodeProtocol dnProtocol) {
private NamespaceInfo handshake() throws IOException {
NamespaceInfo nsInfo = new NamespaceInfo();
while (shouldRun && shouldServiceRun) {
while (dn.shouldRun && shouldServiceRun) {
try {
nsInfo = bpNamenode.versionRequest();
// verify build version
@ -867,7 +869,7 @@ private NamespaceInfo handshake() throws IOException {
return nsInfo;
}
void setupBP(Configuration conf, AbstractList<File> dataDirs)
void setupBP(Configuration conf)
throws IOException {
// get NN proxy
DatanodeProtocol dnp =
@ -878,52 +880,19 @@ void setupBP(Configuration conf, AbstractList<File> dataDirs)
// handshake with NN
NamespaceInfo nsInfo = handshake();
setNamespaceInfo(nsInfo);
synchronized(DataNode.this) {
// we do not allow namenode from different cluster to register
if(clusterId != null && !clusterId.equals(nsInfo.clusterID)) {
throw new IOException(
"cannot register with the namenode because clusterid do not match:"
+ " nn=" + nsInfo.getBlockPoolID() + "; nn cid=" + nsInfo.clusterID +
";dn cid=" + clusterId);
}
setupBPStorage();
setClusterId(nsInfo.clusterID);
}
initPeriodicScanners(conf);
}
void setupBPStorage() throws IOException {
StartupOption startOpt = getStartupOption(conf);
assert startOpt != null : "Startup option must be set.";
boolean simulatedFSDataset = conf.getBoolean(
DFS_DATANODE_SIMULATEDDATASTORAGE_KEY,
DFS_DATANODE_SIMULATEDDATASTORAGE_DEFAULT);
dn.initBlockPool(this, nsInfo);
if (simulatedFSDataset) {
initFsDataSet(conf, dataDirs);
bpRegistration.setStorageID(getStorageId()); //same as DN
bpRegistration.setStorageID(dn.getStorageId());
StorageInfo storageInfo = dn.storage.getBPStorage(blockPoolId);
if (storageInfo == null) {
// it's null in the case of SimulatedDataSet
bpRegistration.storageInfo.layoutVersion = HdfsConstants.LAYOUT_VERSION;
bpRegistration.storageInfo.namespaceID = bpNSInfo.namespaceID;
bpRegistration.storageInfo.clusterID = bpNSInfo.clusterID;
bpRegistration.setStorageInfo(nsInfo);
} else {
// read storage info, lock data dirs and transition fs state if necessary
storage.recoverTransitionRead(DataNode.this, blockPoolId, bpNSInfo,
dataDirs, startOpt);
LOG.info("setting up storage: nsid=" + storage.namespaceID + ";bpid="
+ blockPoolId + ";lv=" + storage.layoutVersion + ";nsInfo="
+ bpNSInfo);
bpRegistration.setStorageID(getStorageId());
bpRegistration.setStorageInfo(storage.getBPStorage(blockPoolId));
initFsDataSet(conf, dataDirs);
bpRegistration.setStorageInfo(storageInfo);
}
data.addBlockPool(blockPoolId, conf);
}
/**
* This methods arranges for the data node to send the block report at
* the next heartbeat.
@ -931,9 +900,9 @@ void setupBPStorage() throws IOException {
void scheduleBlockReport(long delay) {
if (delay > 0) { // send BR after random delay
lastBlockReport = System.currentTimeMillis()
- ( blockReportInterval - DFSUtil.getRandom().nextInt((int)(delay)));
- ( dn.blockReportInterval - DFSUtil.getRandom().nextInt((int)(delay)));
} else { // send at next heartbeat
lastBlockReport = lastHeartbeat - blockReportInterval;
lastBlockReport = lastHeartbeat - dn.blockReportInterval;
}
resetBlockReportTime = true; // reset future BRs for randomness
}
@ -1038,11 +1007,11 @@ DatanodeCommand blockReport() throws IOException {
// send block report if timer has expired.
DatanodeCommand cmd = null;
long startTime = now();
if (startTime - lastBlockReport > blockReportInterval) {
if (startTime - lastBlockReport > dn.blockReportInterval) {
// Create block report
long brCreateStartTime = now();
BlockListAsLongs bReport = data.getBlockReport(blockPoolId);
BlockListAsLongs bReport = dn.data.getBlockReport(blockPoolId);
// Send block report
long brSendStartTime = now();
@ -1052,7 +1021,7 @@ DatanodeCommand blockReport() throws IOException {
// Log the block report processing stats from Datanode perspective
long brSendCost = now() - brSendStartTime;
long brCreateCost = brSendStartTime - brCreateStartTime;
metrics.addBlockReport(brSendCost);
dn.metrics.addBlockReport(brSendCost);
LOG.info("BlockReport of " + bReport.getNumberOfBlocks()
+ " blocks took " + brCreateCost + " msec to generate and "
+ brSendCost + " msecs for RPC and NN processing");
@ -1060,7 +1029,7 @@ DatanodeCommand blockReport() throws IOException {
// If we have sent the first block report, then wait a random
// time before we start the periodic block reports.
if (resetBlockReportTime) {
lastBlockReport = startTime - DFSUtil.getRandom().nextInt((int)(blockReportInterval));
lastBlockReport = startTime - DFSUtil.getRandom().nextInt((int)(dn.blockReportInterval));
resetBlockReportTime = false;
} else {
/* say the last block report was at 8:20:14. The current report
@ -1070,7 +1039,7 @@ DatanodeCommand blockReport() throws IOException {
* 2) unexpected like 11:35:43, next report should be at 12:20:14
*/
lastBlockReport += (now() - lastBlockReport) /
blockReportInterval * blockReportInterval;
dn.blockReportInterval * dn.blockReportInterval;
}
LOG.info("sent block report, processed command:" + cmd);
}
@ -1080,12 +1049,12 @@ DatanodeCommand blockReport() throws IOException {
DatanodeCommand [] sendHeartBeat() throws IOException {
return bpNamenode.sendHeartbeat(bpRegistration,
data.getCapacity(),
data.getDfsUsed(),
data.getRemaining(),
data.getBlockPoolUsed(blockPoolId),
xmitsInProgress.get(),
getXceiverCount(), data.getNumFailedVolumes());
dn.data.getCapacity(),
dn.data.getDfsUsed(),
dn.data.getRemaining(),
dn.data.getBlockPoolUsed(blockPoolId),
dn.xmitsInProgress.get(),
dn.getXceiverCount(), dn.data.getNumFailedVolumes());
}
//This must be called only by blockPoolManager
@ -1121,21 +1090,9 @@ private synchronized void cleanUp() {
if(upgradeManager != null)
upgradeManager.shutdownUpgrade();
blockPoolManager.remove(this);
shouldServiceRun = false;
RPC.stopProxy(bpNamenode);
if (blockScanner != null) {
blockScanner.removeBlockPool(this.getBlockPoolId());
}
if (data != null) {
data.shutdownBlockPool(this.getBlockPoolId());
}
if (storage != null) {
storage.removeBlockPoolStorage(this.getBlockPoolId());
}
dn.shutdownBlockPool(this);
}
/**
@ -1144,22 +1101,22 @@ private synchronized void cleanUp() {
*/
private void offerService() throws Exception {
LOG.info("For namenode " + nnAddr + " using DELETEREPORT_INTERVAL of "
+ deleteReportInterval + " msec " + " BLOCKREPORT_INTERVAL of "
+ blockReportInterval + "msec" + " Initial delay: "
+ initialBlockReportDelay + "msec" + "; heartBeatInterval="
+ heartBeatInterval);
+ dn.deleteReportInterval + " msec " + " BLOCKREPORT_INTERVAL of "
+ dn.blockReportInterval + "msec" + " Initial delay: "
+ dn.initialBlockReportDelay + "msec" + "; heartBeatInterval="
+ dn.heartBeatInterval);
//
// Now loop for a long time....
//
while (shouldRun && shouldServiceRun) {
while (dn.shouldRun && shouldServiceRun) {
try {
long startTime = now();
//
// Every so often, send heartbeat or block-report
//
if (startTime - lastHeartbeat > heartBeatInterval) {
if (startTime - lastHeartbeat > dn.heartBeatInterval) {
//
// All heartbeat messages include following info:
// -- Datanode name
@ -1168,9 +1125,9 @@ private void offerService() throws Exception {
// -- Bytes remaining
//
lastHeartbeat = startTime;
if (!heartbeatsDisabledForTests) {
if (!dn.heartbeatsDisabledForTests) {
DatanodeCommand[] cmds = sendHeartBeat();
metrics.addHeartbeat(now() - startTime);
dn.metrics.addHeartbeat(now() - startTime);
long startProcessCommands = now();
if (!processCommand(cmds))
@ -1183,7 +1140,7 @@ private void offerService() throws Exception {
}
}
if (pendingReceivedRequests > 0
|| (startTime - lastDeletedReport > deleteReportInterval)) {
|| (startTime - lastDeletedReport > dn.deleteReportInterval)) {
reportReceivedDeletedBlocks();
lastDeletedReport = startTime;
}
@ -1192,15 +1149,15 @@ private void offerService() throws Exception {
processCommand(cmd);
// Now safe to start scanning the block pool
if (blockScanner != null) {
blockScanner.addBlockPool(this.blockPoolId);
if (dn.blockScanner != null) {
dn.blockScanner.addBlockPool(this.blockPoolId);
}
//
// There is no work to do; sleep until hearbeat timer elapses,
// or work arrives, and then iterate again.
//
long waitTime = heartBeatInterval -
long waitTime = dn.heartBeatInterval -
(System.currentTimeMillis() - lastHeartbeat);
synchronized(receivedAndDeletedBlockList) {
if (waitTime > 0 && pendingReceivedRequests == 0) {
@ -1223,7 +1180,7 @@ private void offerService() throws Exception {
}
LOG.warn("RemoteException in offerService", re);
try {
long sleepTime = Math.min(1000, heartBeatInterval);
long sleepTime = Math.min(1000, dn.heartBeatInterval);
Thread.sleep(sleepTime);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
@ -1269,7 +1226,7 @@ void register() throws IOException {
(bpNSInfo.getLayoutVersion(), "namenode");
}
while(shouldRun && shouldServiceRun) {
while(dn.shouldRun && shouldServiceRun) {
try {
// Use returned registration from namenode with updated machine name.
bpRegistration = bpNamenode.registerDatanode(bpRegistration);
@ -1277,8 +1234,6 @@ void register() throws IOException {
LOG.info("bpReg after =" + bpRegistration.storageInfo +
";sid=" + bpRegistration.storageID + ";name="+bpRegistration.getName());
NetUtils.getHostname();
hostName = bpRegistration.getHost();
break;
} catch(SocketTimeoutException e) { // namenode is busy
LOG.info("Problem connecting to server: " + nnAddr);
@ -1287,47 +1242,13 @@ void register() throws IOException {
} catch (InterruptedException ie) {}
}
}
if (storage.getStorageID().equals("")) {
storage.setStorageID(bpRegistration.getStorageID());
storage.writeAll();
LOG.info("New storage id " + bpRegistration.getStorageID()
+ " is assigned to data-node " + bpRegistration.getName());
} else if(!storage.getStorageID().equals(bpRegistration.getStorageID())) {
throw new IOException("Inconsistent storage IDs. Name-node returned "
+ bpRegistration.getStorageID()
+ ". Expecting " + storage.getStorageID());
}
if (!isBlockTokenInitialized) {
/* first time registering with NN */
ExportedBlockKeys keys = bpRegistration.exportedKeys;
isBlockTokenEnabled = keys.isBlockTokenEnabled();
if (isBlockTokenEnabled) {
long blockKeyUpdateInterval = keys.getKeyUpdateInterval();
long blockTokenLifetime = keys.getTokenLifetime();
LOG.info("Block token params received from NN: for block pool " +
blockPoolId + " keyUpdateInterval="
+ blockKeyUpdateInterval / (60 * 1000)
+ " min(s), tokenLifetime=" + blockTokenLifetime / (60 * 1000)
+ " min(s)");
final BlockTokenSecretManager secretMgr =
new BlockTokenSecretManager(false, 0, blockTokenLifetime);
blockPoolTokenSecretManager.addBlockPool(blockPoolId, secretMgr);
}
isBlockTokenInitialized = true;
}
if (isBlockTokenEnabled) {
blockPoolTokenSecretManager.setKeys(blockPoolId,
bpRegistration.exportedKeys);
bpRegistration.exportedKeys = ExportedBlockKeys.DUMMY_KEYS;
}
dn.bpRegistrationSucceeded(bpRegistration, blockPoolId);
LOG.info("in register:" + ";bpDNR="+bpRegistration.storageInfo);
// random short delay - helps scatter the BR from all DNs
scheduleBlockReport(initialBlockReportDelay);
scheduleBlockReport(dn.initialBlockReportDelay);
}
@ -1341,14 +1262,14 @@ void register() throws IOException {
*/
@Override
public void run() {
LOG.info(bpRegistration + "In BPOfferService.run, data = " + data
LOG.info(bpRegistration + "In BPOfferService.run, data = " + dn.data
+ ";bp=" + blockPoolId);
try {
// init stuff
try {
// setup storage
setupBP(conf, dataDirs);
setupBP(dn.conf);
register();
} catch (IOException ioe) {
// Initial handshake, storage recovery or registration failed
@ -1360,13 +1281,13 @@ public void run() {
initialized = true; // bp is initialized;
while (shouldRun && shouldServiceRun) {
while (dn.shouldRun && shouldServiceRun) {
try {
startDistributedUpgradeIfNeeded();
offerService();
} catch (Exception ex) {
LOG.error("Exception in BPOfferService", ex);
if (shouldRun && shouldServiceRun) {
if (dn.shouldRun && shouldServiceRun) {
try {
Thread.sleep(5000);
} catch (InterruptedException ie) {
@ -1379,7 +1300,7 @@ public void run() {
LOG.warn("Unexpected exception", ex);
} finally {
LOG.warn(bpRegistration + " ending block pool service for: "
+ blockPoolId);
+ blockPoolId + " thread " + Thread.currentThread().getId());
cleanUp();
}
}
@ -1420,8 +1341,8 @@ private boolean processCommand(DatanodeCommand cmd) throws IOException {
switch(cmd.getAction()) {
case DatanodeProtocol.DNA_TRANSFER:
// Send a copy of a block to another datanode
transferBlocks(bcmd.getBlockPoolId(), bcmd.getBlocks(), bcmd.getTargets());
metrics.incrBlocksReplicated(bcmd.getBlocks().length);
dn.transferBlocks(bcmd.getBlockPoolId(), bcmd.getBlocks(), bcmd.getTargets());
dn.metrics.incrBlocksReplicated(bcmd.getBlocks().length);
break;
case DatanodeProtocol.DNA_INVALIDATE:
//
@ -1430,16 +1351,16 @@ private boolean processCommand(DatanodeCommand cmd) throws IOException {
//
Block toDelete[] = bcmd.getBlocks();
try {
if (blockScanner != null) {
blockScanner.deleteBlocks(bcmd.getBlockPoolId(), toDelete);
if (dn.blockScanner != null) {
dn.blockScanner.deleteBlocks(bcmd.getBlockPoolId(), toDelete);
}
// using global fsdataset
data.invalidate(bcmd.getBlockPoolId(), toDelete);
dn.data.invalidate(bcmd.getBlockPoolId(), toDelete);
} catch(IOException e) {
checkDiskError();
dn.checkDiskError();
throw e;
}
metrics.incrBlocksRemoved(toDelete.length);
dn.metrics.incrBlocksRemoved(toDelete.length);
break;
case DatanodeProtocol.DNA_SHUTDOWN:
// shut down the data node
@ -1448,12 +1369,12 @@ private boolean processCommand(DatanodeCommand cmd) throws IOException {
case DatanodeProtocol.DNA_REGISTER:
// namenode requested a registration - at start or if NN lost contact
LOG.info("DatanodeCommand action: DNA_REGISTER");
if (shouldRun && shouldServiceRun) {
if (dn.shouldRun && shouldServiceRun) {
register();
}
break;
case DatanodeProtocol.DNA_FINALIZE:
storage.finalizeUpgrade(((FinalizeCommand) cmd)
dn.storage.finalizeUpgrade(((FinalizeCommand) cmd)
.getBlockPoolId());
break;
case UpgradeCommand.UC_ACTION_START_UPGRADE:
@ -1461,12 +1382,12 @@ private boolean processCommand(DatanodeCommand cmd) throws IOException {
processDistributedUpgradeCommand((UpgradeCommand)cmd);
break;
case DatanodeProtocol.DNA_RECOVERBLOCK:
recoverBlocks(((BlockRecoveryCommand)cmd).getRecoveringBlocks());
dn.recoverBlocks(((BlockRecoveryCommand)cmd).getRecoveringBlocks());
break;
case DatanodeProtocol.DNA_ACCESSKEYUPDATE:
LOG.info("DatanodeCommand action: DNA_ACCESSKEYUPDATE");
if (isBlockTokenEnabled) {
blockPoolTokenSecretManager.setKeys(blockPoolId,
if (dn.isBlockTokenEnabled) {
dn.blockPoolTokenSecretManager.setKeys(blockPoolId,
((KeyUpdateCommand) cmd).getExportedKeys());
}
break;
@ -1476,7 +1397,7 @@ private boolean processCommand(DatanodeCommand cmd) throws IOException {
((BalancerBandwidthCommand) cmd).getBalancerBandwidthValue();
if (bandwidth > 0) {
DataXceiverServer dxcs =
(DataXceiverServer) dataXceiverServer.getRunnable();
(DataXceiverServer) dn.dataXceiverServer.getRunnable();
dxcs.balanceThrottler.setBandwidth(bandwidth);
}
break;
@ -1495,7 +1416,7 @@ private void processDistributedUpgradeCommand(UpgradeCommand comm)
synchronized UpgradeManagerDatanode getUpgradeManager() {
if(upgradeManager == null)
upgradeManager =
new UpgradeManagerDatanode(DataNode.this, blockPoolId);
new UpgradeManagerDatanode(dn, blockPoolId);
return upgradeManager;
}
@ -1555,6 +1476,133 @@ void startDataNode(Configuration conf,
blockPoolManager = new BlockPoolManager(conf);
}
/**
* Check that the registration returned from a NameNode is consistent
* with the information in the storage. If the storage is fresh/unformatted,
* sets the storage ID based on this registration.
* Also updates the block pool's state in the secret manager.
*/
private synchronized void bpRegistrationSucceeded(DatanodeRegistration bpRegistration,
String blockPoolId)
throws IOException {
hostName = bpRegistration.getHost();
if (storage.getStorageID().equals("")) {
// This is a fresh datanode -- take the storage ID provided by the
// NN and persist it.
storage.setStorageID(bpRegistration.getStorageID());
storage.writeAll();
LOG.info("New storage id " + bpRegistration.getStorageID()
+ " is assigned to data-node " + bpRegistration.getName());
} else if(!storage.getStorageID().equals(bpRegistration.getStorageID())) {
throw new IOException("Inconsistent storage IDs. Name-node returned "
+ bpRegistration.getStorageID()
+ ". Expecting " + storage.getStorageID());
}
registerBlockPoolWithSecretManager(bpRegistration, blockPoolId);
}
/**
* After the block pool has contacted the NN, registers that block pool
* with the secret manager, updating it with the secrets provided by the NN.
* @param bpRegistration
* @param blockPoolId
* @throws IOException
*/
private void registerBlockPoolWithSecretManager(DatanodeRegistration bpRegistration,
String blockPoolId) throws IOException {
ExportedBlockKeys keys = bpRegistration.exportedKeys;
isBlockTokenEnabled = keys.isBlockTokenEnabled();
// TODO should we check that all federated nns are either enabled or
// disabled?
if (!isBlockTokenEnabled) return;
if (!blockPoolTokenSecretManager.isBlockPoolRegistered(blockPoolId)) {
long blockKeyUpdateInterval = keys.getKeyUpdateInterval();
long blockTokenLifetime = keys.getTokenLifetime();
LOG.info("Block token params received from NN: for block pool " +
blockPoolId + " keyUpdateInterval="
+ blockKeyUpdateInterval / (60 * 1000)
+ " min(s), tokenLifetime=" + blockTokenLifetime / (60 * 1000)
+ " min(s)");
final BlockTokenSecretManager secretMgr =
new BlockTokenSecretManager(false, 0, blockTokenLifetime);
blockPoolTokenSecretManager.addBlockPool(blockPoolId, secretMgr);
}
blockPoolTokenSecretManager.setKeys(blockPoolId,
bpRegistration.exportedKeys);
bpRegistration.exportedKeys = ExportedBlockKeys.DUMMY_KEYS;
}
/**
* Remove the given block pool from the block scanner, dataset, and storage.
*/
private void shutdownBlockPool(BPOfferService bpos) {
blockPoolManager.remove(bpos);
String bpId = bpos.getBlockPoolId();
if (blockScanner != null) {
blockScanner.removeBlockPool(bpId);
}
if (data != null) {
data.shutdownBlockPool(bpId);
}
if (storage != null) {
storage.removeBlockPoolStorage(bpId);
}
}
void initBlockPool(BPOfferService bpOfferService,
NamespaceInfo nsInfo) throws IOException {
String blockPoolId = nsInfo.getBlockPoolID();
blockPoolManager.addBlockPool(bpOfferService);
synchronized (this) {
// we do not allow namenode from different cluster to register
if(clusterId != null && !clusterId.equals(nsInfo.clusterID)) {
throw new IOException(
"cannot register with the namenode because clusterid do not match:"
+ " nn=" + nsInfo.getBlockPoolID() + "; nn cid=" + nsInfo.clusterID +
";dn cid=" + clusterId);
}
setClusterId(nsInfo.clusterID);
}
StartupOption startOpt = getStartupOption(conf);
assert startOpt != null : "Startup option must be set.";
boolean simulatedFSDataset = conf.getBoolean(
DFS_DATANODE_SIMULATEDDATASTORAGE_KEY,
DFS_DATANODE_SIMULATEDDATASTORAGE_DEFAULT);
if (!simulatedFSDataset) {
// read storage info, lock data dirs and transition fs state if necessary
storage.recoverTransitionRead(DataNode.this, blockPoolId, nsInfo,
dataDirs, startOpt);
StorageInfo bpStorage = storage.getBPStorage(blockPoolId);
LOG.info("setting up storage: nsid=" +
bpStorage.getNamespaceID() + ";bpid="
+ blockPoolId + ";lv=" + storage.getLayoutVersion() +
";nsInfo=" + nsInfo);
}
initFsDataSet();
initPeriodicScanners(conf);
data.addBlockPool(nsInfo.getBlockPoolID(), conf);
}
private DatanodeRegistration createRegistration() {
DatanodeRegistration reg = new DatanodeRegistration(getMachineName());
reg.setInfoPort(infoServer.getPort());
reg.setIpcPort(getIpcPort());
return reg;
}
BPOfferService[] getAllBpOs() {
return blockPoolManager.getAllNamenodeThreads();
}
@ -1567,8 +1615,7 @@ int getBpOsCount() {
* Initializes the {@link #data}. The initialization is done only once, when
* handshake with the the first namenode is completed.
*/
private synchronized void initFsDataSet(Configuration conf,
AbstractList<File> dataDirs) throws IOException {
private synchronized void initFsDataSet() throws IOException {
if (data != null) { // Already initialized
return;
}

View File

@ -61,7 +61,7 @@ public static void setBPNamenodeByIndex(DataNode dn,
bpos.setNamespaceInfo(nsifno);
dn.setBPNamenode(bpid, nn);
bpos.setupBPStorage();
dn.initBlockPool(bpos, nsifno);
}
}
}