HDFS-2563. Some cleanup in BPOfferService. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1203943 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2011-11-19 01:31:59 +00:00
parent 26447229ba
commit 1f92266516
4 changed files with 238 additions and 173 deletions

View File

@ -117,6 +117,8 @@ Release 0.23.1 - UNRELEASED
HDFS-2562. Refactor DN configuration variables out of DataNode class HDFS-2562. Refactor DN configuration variables out of DataNode class
(todd) (todd)
HDFS-2563. Some cleanup in BPOfferService. (todd)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-2130. Switch default checksum to CRC32C. (todd) HDFS-2130. Switch default checksum to CRC32C. (todd)

View File

@ -176,6 +176,9 @@
import org.apache.hadoop.util.VersionInfo; import org.apache.hadoop.util.VersionInfo;
import org.mortbay.util.ajax.JSON; import org.mortbay.util.ajax.JSON;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
/********************************************************** /**********************************************************
* DataNode is a class (and program) that stores a set of * DataNode is a class (and program) that stores a set of
@ -704,8 +707,21 @@ void setHeartbeatsDisabledForTests(
@InterfaceAudience.Private @InterfaceAudience.Private
static class BPOfferService implements Runnable { static class BPOfferService implements Runnable {
final InetSocketAddress nnAddr; final InetSocketAddress nnAddr;
DatanodeRegistration bpRegistration;
/**
* Information about the namespace that this service
* is registering with. This is assigned after
* the first phase of the handshake.
*/
NamespaceInfo bpNSInfo; NamespaceInfo bpNSInfo;
/**
* The registration information for this block pool.
* This is assigned after the second phase of the
* handshake.
*/
DatanodeRegistration bpRegistration;
long lastBlockReport = 0; long lastBlockReport = 0;
long lastDeletedReport = 0; long lastDeletedReport = 0;
@ -713,7 +729,6 @@ static class BPOfferService implements Runnable {
private Thread bpThread; private Thread bpThread;
private DatanodeProtocol bpNamenode; private DatanodeProtocol bpNamenode;
private String blockPoolId;
private long lastHeartbeat = 0; private long lastHeartbeat = 0;
private volatile boolean initialized = false; private volatile boolean initialized = false;
private final LinkedList<ReceivedDeletedBlockInfo> receivedAndDeletedBlockList private final LinkedList<ReceivedDeletedBlockInfo> receivedAndDeletedBlockList
@ -726,7 +741,6 @@ static class BPOfferService implements Runnable {
BPOfferService(InetSocketAddress nnAddr, DataNode dn) { BPOfferService(InetSocketAddress nnAddr, DataNode dn) {
this.dn = dn; this.dn = dn;
this.bpRegistration = dn.createRegistration();
this.nnAddr = nnAddr; this.nnAddr = nnAddr;
this.dnConf = dn.getDnConf(); this.dnConf = dn.getDnConf();
} }
@ -736,7 +750,7 @@ static class BPOfferService implements Runnable {
* and has registered with the corresponding namenode * and has registered with the corresponding namenode
* @return true if initialized * @return true if initialized
*/ */
public boolean initialized() { public boolean isInitialized() {
return initialized; return initialized;
} }
@ -745,41 +759,67 @@ public boolean isAlive() {
} }
public String getBlockPoolId() { public String getBlockPoolId() {
return blockPoolId; if (bpNSInfo != null) {
return bpNSInfo.getBlockPoolID();
} else {
LOG.warn("Block pool ID needed, but service not yet registered with NN",
new Exception("trace"));
return null;
}
}
public NamespaceInfo getNamespaceInfo() {
return bpNSInfo;
}
public String toString() {
if (bpNSInfo == null) {
// If we haven't yet connected to our NN, we don't yet know our
// own block pool ID.
// If _none_ of the block pools have connected yet, we don't even
// know the storage ID of this DN.
String storageId = dn.getStorageId();
if (storageId == null || "".equals(storageId)) {
storageId = "unknown";
}
return "Block pool <registering> (storage id " + storageId +
") connecting to " + nnAddr;
} else {
return "Block pool " + getBlockPoolId() +
" (storage id " + dn.getStorageId() +
") registered with " + nnAddr;
}
} }
private InetSocketAddress getNNSocketAddress() { private InetSocketAddress getNNSocketAddress() {
return nnAddr; return nnAddr;
} }
void setNamespaceInfo(NamespaceInfo nsinfo) { /**
bpNSInfo = nsinfo; * Used to inject a spy NN in the unit tests.
this.blockPoolId = nsinfo.getBlockPoolID(); */
} @VisibleForTesting
void setNameNode(DatanodeProtocol dnProtocol) { void setNameNode(DatanodeProtocol dnProtocol) {
bpNamenode = dnProtocol; bpNamenode = dnProtocol;
} }
private NamespaceInfo handshake() throws IOException { /**
NamespaceInfo nsInfo = new NamespaceInfo(); * Perform the first part of the handshake with the NameNode.
while (dn.shouldRun && shouldServiceRun) { * This calls <code>versionRequest</code> to determine the NN's
* namespace and version info. It automatically retries until
* the NN responds or the DN is shutting down.
*
* @return the NamespaceInfo
* @throws IncorrectVersionException if the remote NN does not match
* this DN's version
*/
NamespaceInfo retrieveNamespaceInfo() throws IncorrectVersionException {
NamespaceInfo nsInfo = null;
while (shouldRun()) {
try { try {
nsInfo = bpNamenode.versionRequest(); nsInfo = bpNamenode.versionRequest();
// verify build version LOG.debug(this + " received versionRequest response: " + nsInfo);
String nsVer = nsInfo.getBuildVersion(); break;
String stVer = Storage.getBuildVersion();
LOG.info("handshake: namespace info = " + nsInfo);
if(! nsVer.equals(stVer)) {
String errorMsg = "Incompatible build versions: bp = " + blockPoolId +
"namenode BV = " + nsVer + "; datanode BV = " + stVer;
LOG.warn(errorMsg);
bpNamenode.errorReport( bpRegistration,
DatanodeProtocol.NOTIFY, errorMsg );
} else {
break;
}
} catch(SocketTimeoutException e) { // namenode is busy } catch(SocketTimeoutException e) { // namenode is busy
LOG.warn("Problem connecting to server: " + nnAddr); LOG.warn("Problem connecting to server: " + nnAddr);
} catch(IOException e ) { // namenode is not available } catch(IOException e ) { // namenode is not available
@ -787,40 +827,53 @@ private NamespaceInfo handshake() throws IOException {
} }
// try again in a second // try again in a second
try { sleepAndLogInterrupts(5000, "requesting version info from NN");
Thread.sleep(5000);
} catch (InterruptedException ie) {}
} }
assert HdfsConstants.LAYOUT_VERSION == nsInfo.getLayoutVersion() : if (nsInfo != null) {
"Data-node and name-node layout versions must be the same." checkNNVersion(nsInfo);
+ "Expected: "+ HdfsConstants.LAYOUT_VERSION }
+ " actual "+ nsInfo.getLayoutVersion();
return nsInfo; return nsInfo;
} }
void setupBP(Configuration conf) private void checkNNVersion(NamespaceInfo nsInfo)
throws IOException { throws IncorrectVersionException {
// get NN proxy // build and layout versions should match
DatanodeProtocol dnp = String nsBuildVer = nsInfo.getBuildVersion();
(DatanodeProtocol)RPC.waitForProxy(DatanodeProtocol.class, String stBuildVer = Storage.getBuildVersion();
DatanodeProtocol.versionID, nnAddr, conf); if (!nsBuildVer.equals(stBuildVer)) {
setNameNode(dnp); LOG.warn("Data-node and name-node Build versions must be the same. " +
"Namenode build version: " + nsBuildVer + "Datanode " +
// handshake with NN "build version: " + stBuildVer);
NamespaceInfo nsInfo = handshake(); throw new IncorrectVersionException(nsBuildVer, "namenode", stBuildVer);
setNamespaceInfo(nsInfo);
dn.initBlockPool(this, nsInfo);
bpRegistration.setStorageID(dn.getStorageId());
StorageInfo storageInfo = dn.storage.getBPStorage(blockPoolId);
if (storageInfo == null) {
// it's null in the case of SimulatedDataSet
bpRegistration.storageInfo.layoutVersion = HdfsConstants.LAYOUT_VERSION;
bpRegistration.setStorageInfo(nsInfo);
} else {
bpRegistration.setStorageInfo(storageInfo);
} }
if (HdfsConstants.LAYOUT_VERSION != nsInfo.getLayoutVersion()) {
LOG.warn("Data-node and name-node layout versions must be the same." +
" Expected: "+ HdfsConstants.LAYOUT_VERSION +
" actual "+ bpNSInfo.getLayoutVersion());
throw new IncorrectVersionException(
bpNSInfo.getLayoutVersion(), "namenode");
}
}
private void connectToNNAndHandshake() throws IOException {
// get NN proxy
bpNamenode =
(DatanodeProtocol)RPC.waitForProxy(DatanodeProtocol.class,
DatanodeProtocol.versionID, nnAddr, dn.getConf());
// First phase of the handshake with NN - get the namespace
// info.
bpNSInfo = retrieveNamespaceInfo();
// Now that we know the namespace ID, etc, we can pass this to the DN.
// The DN can now initialize its local storage if we are the
// first BP to handshake, etc.
dn.initBlockPool(this);
// Second phase of the handshake with the NN.
register();
} }
/** /**
@ -875,7 +928,7 @@ private void reportReceivedDeletedBlocks() throws IOException {
} }
} }
if (receivedAndDeletedBlockArray != null) { if (receivedAndDeletedBlockArray != null) {
bpNamenode.blockReceivedAndDeleted(bpRegistration, blockPoolId, bpNamenode.blockReceivedAndDeleted(bpRegistration, getBlockPoolId(),
receivedAndDeletedBlockArray); receivedAndDeletedBlockArray);
synchronized (receivedAndDeletedBlockList) { synchronized (receivedAndDeletedBlockList) {
for (int i = 0; i < receivedAndDeletedBlockArray.length; i++) { for (int i = 0; i < receivedAndDeletedBlockArray.length; i++) {
@ -897,9 +950,9 @@ void notifyNamenodeReceivedBlock(ExtendedBlock block, String delHint) {
: "delHint is null"); : "delHint is null");
} }
if (!block.getBlockPoolId().equals(blockPoolId)) { if (!block.getBlockPoolId().equals(getBlockPoolId())) {
LOG.warn("BlockPool mismatch " + block.getBlockPoolId() + " vs. " LOG.warn("BlockPool mismatch " + block.getBlockPoolId() + " vs. "
+ blockPoolId); + getBlockPoolId());
return; return;
} }
@ -916,9 +969,9 @@ void notifyNamenodeDeletedBlock(ExtendedBlock block) {
throw new IllegalArgumentException("Block is null"); throw new IllegalArgumentException("Block is null");
} }
if (!block.getBlockPoolId().equals(blockPoolId)) { if (!block.getBlockPoolId().equals(getBlockPoolId())) {
LOG.warn("BlockPool mismatch " + block.getBlockPoolId() + " vs. " LOG.warn("BlockPool mismatch " + block.getBlockPoolId() + " vs. "
+ blockPoolId); + getBlockPoolId());
return; return;
} }
@ -941,11 +994,11 @@ DatanodeCommand blockReport() throws IOException {
// Create block report // Create block report
long brCreateStartTime = now(); long brCreateStartTime = now();
BlockListAsLongs bReport = dn.data.getBlockReport(blockPoolId); BlockListAsLongs bReport = dn.data.getBlockReport(getBlockPoolId());
// Send block report // Send block report
long brSendStartTime = now(); long brSendStartTime = now();
cmd = bpNamenode.blockReport(bpRegistration, blockPoolId, bReport cmd = bpNamenode.blockReport(bpRegistration, getBlockPoolId(), bReport
.getBlockListAsLongs()); .getBlockListAsLongs());
// Log the block report processing stats from Datanode perspective // Log the block report processing stats from Datanode perspective
@ -982,7 +1035,7 @@ DatanodeCommand blockReport() throws IOException {
dn.data.getCapacity(), dn.data.getCapacity(),
dn.data.getDfsUsed(), dn.data.getDfsUsed(),
dn.data.getRemaining(), dn.data.getRemaining(),
dn.data.getBlockPoolUsed(blockPoolId), dn.data.getBlockPoolUsed(getBlockPoolId()),
dn.xmitsInProgress.get(), dn.xmitsInProgress.get(),
dn.getXceiverCount(), dn.data.getNumFailedVolumes()); dn.getXceiverCount(), dn.data.getNumFailedVolumes());
} }
@ -1039,7 +1092,7 @@ private void offerService() throws Exception {
// //
// Now loop for a long time.... // Now loop for a long time....
// //
while (dn.shouldRun && shouldServiceRun) { while (shouldRun()) {
try { try {
long startTime = now(); long startTime = now();
@ -1080,7 +1133,7 @@ private void offerService() throws Exception {
// Now safe to start scanning the block pool // Now safe to start scanning the block pool
if (dn.blockScanner != null) { if (dn.blockScanner != null) {
dn.blockScanner.addBlockPool(this.blockPoolId); dn.blockScanner.addBlockPool(this.getBlockPoolId());
} }
// //
@ -1094,8 +1147,7 @@ private void offerService() throws Exception {
try { try {
receivedAndDeletedBlockList.wait(waitTime); receivedAndDeletedBlockList.wait(waitTime);
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
LOG.warn("BPOfferService for block pool=" LOG.warn("BPOfferService for " + this + " interrupted");
+ this.getBlockPoolId() + " received exception:" + ie);
} }
} }
} // synchronized } // synchronized
@ -1104,7 +1156,7 @@ private void offerService() throws Exception {
if (UnregisteredNodeException.class.getName().equals(reClass) || if (UnregisteredNodeException.class.getName().equals(reClass) ||
DisallowedDatanodeException.class.getName().equals(reClass) || DisallowedDatanodeException.class.getName().equals(reClass) ||
IncorrectVersionException.class.getName().equals(reClass)) { IncorrectVersionException.class.getName().equals(reClass)) {
LOG.warn("blockpool " + blockPoolId + " is shutting down", re); LOG.warn(this + " is shutting down", re);
shouldServiceRun = false; shouldServiceRun = false;
return; return;
} }
@ -1118,7 +1170,7 @@ private void offerService() throws Exception {
} catch (IOException e) { } catch (IOException e) {
LOG.warn("IOException in offerService", e); LOG.warn("IOException in offerService", e);
} }
} // while (shouldRun && shouldServiceRun) } // while (shouldRun())
} // offerService } // offerService
/** /**
@ -1134,54 +1186,44 @@ private void offerService() throws Exception {
* @throws IOException * @throws IOException
*/ */
void register() throws IOException { void register() throws IOException {
LOG.info("in register: sid=" + bpRegistration.getStorageID() + ";SI=" Preconditions.checkState(bpNSInfo != null,
+ bpRegistration.storageInfo); "register() should be called after handshake()");
// build and layout versions should match // The handshake() phase loaded the block pool storage
String nsBuildVer = bpNamenode.versionRequest().getBuildVersion(); // off disk - so update the bpRegistration object from that info
String stBuildVer = Storage.getBuildVersion(); bpRegistration = dn.createBPRegistration(bpNSInfo);
if (!nsBuildVer.equals(stBuildVer)) { LOG.info(this + " beginning handshake with NN");
LOG.warn("Data-node and name-node Build versions must be " +
"the same. Namenode build version: " + nsBuildVer + "Datanode " +
"build version: " + stBuildVer);
throw new IncorrectVersionException(nsBuildVer, "namenode", stBuildVer);
}
if (HdfsConstants.LAYOUT_VERSION != bpNSInfo.getLayoutVersion()) { while (shouldRun()) {
LOG.warn("Data-node and name-node layout versions must be " +
"the same. Expected: "+ HdfsConstants.LAYOUT_VERSION +
" actual "+ bpNSInfo.getLayoutVersion());
throw new IncorrectVersionException
(bpNSInfo.getLayoutVersion(), "namenode");
}
while(dn.shouldRun && shouldServiceRun) {
try { try {
// Use returned registration from namenode with updated machine name. // Use returned registration from namenode with updated machine name.
bpRegistration = bpNamenode.registerDatanode(bpRegistration); bpRegistration = bpNamenode.registerDatanode(bpRegistration);
LOG.info("bpReg after =" + bpRegistration.storageInfo +
";sid=" + bpRegistration.storageID + ";name="+bpRegistration.getName());
break; break;
} catch(SocketTimeoutException e) { // namenode is busy } catch(SocketTimeoutException e) { // namenode is busy
LOG.info("Problem connecting to server: " + nnAddr); LOG.info("Problem connecting to server: " + nnAddr);
try { sleepAndLogInterrupts(1000, "connecting to server");
Thread.sleep(1000);
} catch (InterruptedException ie) {}
} }
} }
dn.bpRegistrationSucceeded(bpRegistration, blockPoolId); LOG.info("Block pool " + this + " successfully registered with NN");
dn.bpRegistrationSucceeded(bpRegistration, getBlockPoolId());
LOG.info("in register:" + ";bpDNR="+bpRegistration.storageInfo);
// random short delay - helps scatter the BR from all DNs // random short delay - helps scatter the BR from all DNs
scheduleBlockReport(dnConf.initialBlockReportDelay); scheduleBlockReport(dnConf.initialBlockReportDelay);
} }
private void sleepAndLogInterrupts(int millis,
String stateString) {
try {
Thread.sleep(millis);
} catch (InterruptedException ie) {
LOG.info("BPOfferService " + this +
" interrupted while " + stateString);
}
}
/** /**
* No matter what kind of exception we get, keep retrying to offerService(). * No matter what kind of exception we get, keep retrying to offerService().
* That's the loop that connects to the NameNode and provides basic DataNode * That's the loop that connects to the NameNode and provides basic DataNode
@ -1192,49 +1234,43 @@ void register() throws IOException {
*/ */
@Override @Override
public void run() { public void run() {
LOG.info(bpRegistration + "In BPOfferService.run, data = " + dn.data LOG.info(this + " starting to offer service");
+ ";bp=" + blockPoolId);
try { try {
// init stuff // init stuff
try { try {
// setup storage // setup storage
setupBP(dn.conf); connectToNNAndHandshake();
register();
} catch (IOException ioe) { } catch (IOException ioe) {
// Initial handshake, storage recovery or registration failed // Initial handshake, storage recovery or registration failed
// End BPOfferService thread // End BPOfferService thread
LOG.fatal(bpRegistration + " initialization failed for block pool " LOG.fatal("Initialization failed for block pool " + this, ioe);
+ blockPoolId, ioe);
return; return;
} }
initialized = true; // bp is initialized; initialized = true; // bp is initialized;
while (dn.shouldRun && shouldServiceRun) { while (shouldRun()) {
try { try {
startDistributedUpgradeIfNeeded(); startDistributedUpgradeIfNeeded();
offerService(); offerService();
} catch (Exception ex) { } catch (Exception ex) {
LOG.error("Exception in BPOfferService", ex); LOG.error("Exception in BPOfferService for " + this, ex);
if (dn.shouldRun && shouldServiceRun) { sleepAndLogInterrupts(5000, "offering service");
try {
Thread.sleep(5000);
} catch (InterruptedException ie) {
LOG.warn("Received exception", ie);
}
}
} }
} }
} catch (Throwable ex) { } catch (Throwable ex) {
LOG.warn("Unexpected exception", ex); LOG.warn("Unexpected exception in block pool " + this, ex);
} finally { } finally {
LOG.warn(bpRegistration + " ending block pool service for: " LOG.warn("Ending block pool service for: " + this);
+ blockPoolId + " thread " + Thread.currentThread().getId());
cleanUp(); cleanUp();
} }
} }
private boolean shouldRun() {
return shouldServiceRun && dn.shouldRun();
}
/** /**
* Process an array of datanode commands * Process an array of datanode commands
* *
@ -1299,7 +1335,11 @@ private boolean processCommand(DatanodeCommand cmd) throws IOException {
case DatanodeProtocol.DNA_REGISTER: case DatanodeProtocol.DNA_REGISTER:
// namenode requested a registration - at start or if NN lost contact // namenode requested a registration - at start or if NN lost contact
LOG.info("DatanodeCommand action: DNA_REGISTER"); LOG.info("DatanodeCommand action: DNA_REGISTER");
if (dn.shouldRun && shouldServiceRun) { if (shouldRun()) {
// re-retrieve namespace info to make sure that, if the NN
// was restarted, we still match its version (HDFS-2120)
retrieveNamespaceInfo();
// and re-register
register(); register();
} }
break; break;
@ -1317,7 +1357,7 @@ private boolean processCommand(DatanodeCommand cmd) throws IOException {
case DatanodeProtocol.DNA_ACCESSKEYUPDATE: case DatanodeProtocol.DNA_ACCESSKEYUPDATE:
LOG.info("DatanodeCommand action: DNA_ACCESSKEYUPDATE"); LOG.info("DatanodeCommand action: DNA_ACCESSKEYUPDATE");
if (dn.isBlockTokenEnabled) { if (dn.isBlockTokenEnabled) {
dn.blockPoolTokenSecretManager.setKeys(blockPoolId, dn.blockPoolTokenSecretManager.setKeys(getBlockPoolId(),
((KeyUpdateCommand) cmd).getExportedKeys()); ((KeyUpdateCommand) cmd).getExportedKeys());
} }
break; break;
@ -1346,7 +1386,7 @@ private void processDistributedUpgradeCommand(UpgradeCommand comm)
synchronized UpgradeManagerDatanode getUpgradeManager() { synchronized UpgradeManagerDatanode getUpgradeManager() {
if(upgradeManager == null) if(upgradeManager == null)
upgradeManager = upgradeManager =
new UpgradeManagerDatanode(dn, blockPoolId); new UpgradeManagerDatanode(dn, getBlockPoolId());
return upgradeManager; return upgradeManager;
} }
@ -1363,6 +1403,7 @@ private void startDistributedUpgradeIfNeeded() throws IOException {
um.startUpgrade(); um.startUpgrade();
return; return;
} }
} }
/** /**
@ -1406,6 +1447,26 @@ void startDataNode(Configuration conf,
blockPoolManager = new BlockPoolManager(conf); blockPoolManager = new BlockPoolManager(conf);
} }
/**
* Create a DatanodeRegistration for a specific block pool.
* @param nsInfo the namespace info from the first part of the NN handshake
*/
DatanodeRegistration createBPRegistration(NamespaceInfo nsInfo) {
DatanodeRegistration bpRegistration = createUnknownBPRegistration();
String blockPoolId = nsInfo.getBlockPoolID();
bpRegistration.setStorageID(getStorageId());
StorageInfo storageInfo = storage.getBPStorage(blockPoolId);
if (storageInfo == null) {
// it's null in the case of SimulatedDataSet
bpRegistration.storageInfo.layoutVersion = HdfsConstants.LAYOUT_VERSION;
bpRegistration.setStorageInfo(nsInfo);
} else {
bpRegistration.setStorageInfo(storageInfo);
}
return bpRegistration;
}
/** /**
* Check that the registration returned from a NameNode is consistent * Check that the registration returned from a NameNode is consistent
* with the information in the storage. If the storage is fresh/unformatted, * with the information in the storage. If the storage is fresh/unformatted,
@ -1486,11 +1547,27 @@ private void shutdownBlockPool(BPOfferService bpos) {
} }
} }
void initBlockPool(BPOfferService bpOfferService, /**
NamespaceInfo nsInfo) throws IOException { * One of the Block Pools has successfully connected to its NN.
* This initializes the local storage for that block pool,
* checks consistency of the NN's cluster ID, etc.
*
* If this is the first block pool to register, this also initializes
* the datanode-scoped storage.
*
* @param nsInfo the handshake response from the NN.
* @throws IOException if the NN is inconsistent with the local storage.
*/
void initBlockPool(BPOfferService bpos) throws IOException {
NamespaceInfo nsInfo = bpos.getNamespaceInfo();
Preconditions.checkState(nsInfo != null,
"Block pool " + bpos + " should have retrieved " +
"its namespace info before calling initBlockPool.");
String blockPoolId = nsInfo.getBlockPoolID(); String blockPoolId = nsInfo.getBlockPoolID();
blockPoolManager.addBlockPool(bpOfferService); // Register the new block pool with the BP manager.
blockPoolManager.addBlockPool(bpos);
synchronized (this) { synchronized (this) {
// we do not allow namenode from different cluster to register // we do not allow namenode from different cluster to register
@ -1521,12 +1598,21 @@ void initBlockPool(BPOfferService bpOfferService,
+ blockPoolId + ";lv=" + storage.getLayoutVersion() + + blockPoolId + ";lv=" + storage.getLayoutVersion() +
";nsInfo=" + nsInfo); ";nsInfo=" + nsInfo);
} }
// In the case that this is the first block pool to connect, initialize
// the dataset, block scanners, etc.
initFsDataSet(); initFsDataSet();
initPeriodicScanners(conf); initPeriodicScanners(conf);
data.addBlockPool(nsInfo.getBlockPoolID(), conf);
data.addBlockPool(blockPoolId, conf);
} }
private DatanodeRegistration createRegistration() { /**
* Create a DatanodeRegistration object with no valid StorageInfo.
* This is used when reporting an error during handshake - ie
* before we can load any specific block pool.
*/
private DatanodeRegistration createUnknownBPRegistration() {
DatanodeRegistration reg = new DatanodeRegistration(getMachineName()); DatanodeRegistration reg = new DatanodeRegistration(getMachineName());
reg.setInfoPort(infoServer.getPort()); reg.setInfoPort(infoServer.getPort());
reg.setIpcPort(getIpcPort()); reg.setIpcPort(getIpcPort());
@ -2554,16 +2640,6 @@ public DatanodeProtocol getBPNamenode(String bpid) throws IOException {
return bpos.bpNamenode; return bpos.bpNamenode;
} }
/**
* To be used by tests only to set a mock namenode in BPOfferService
*/
void setBPNamenode(String bpid, DatanodeProtocol namenode) {
BPOfferService bp = blockPoolManager.get(bpid);
if (bp != null) {
bp.setNameNode(namenode);
}
}
/** Block synchronization */ /** Block synchronization */
void syncBlock(RecoveringBlock rBlock, void syncBlock(RecoveringBlock rBlock,
List<BlockRecord> syncList) throws IOException { List<BlockRecord> syncList) throws IOException {
@ -2789,7 +2865,7 @@ public String getNamenodeAddresses() {
final Map<String, String> info = new HashMap<String, String>(); final Map<String, String> info = new HashMap<String, String>();
for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) { for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) {
if (bpos != null && bpos.bpThread != null) { if (bpos != null && bpos.bpThread != null) {
info.put(bpos.getNNSocketAddress().getHostName(), bpos.blockPoolId); info.put(bpos.getNNSocketAddress().getHostName(), bpos.getBlockPoolId());
} }
} }
return JSON.toString(info); return JSON.toString(info);
@ -2877,7 +2953,7 @@ public boolean isBPServiceAlive(String bpid) {
*/ */
public boolean isDatanodeFullyStarted() { public boolean isDatanodeFullyStarted() {
for (BPOfferService bp : blockPoolManager.getAllNamenodeThreads()) { for (BPOfferService bp : blockPoolManager.getAllNamenodeThreads()) {
if (!bp.initialized() || !bp.isAlive()) { if (!bp.isInitialized() || !bp.isAlive()) {
return false; return false;
} }
} }
@ -2904,4 +2980,8 @@ public Long getBalancerBandwidth() {
DNConf getDnConf() { DNConf getDnConf() {
return dnConf; return dnConf;
} }
boolean shouldRun() {
return shouldRun;
}
} }

View File

@ -21,10 +21,7 @@
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.hdfs.server.datanode.DataNode.BPOfferService;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
/** /**
* Utility class for accessing package-private DataNode information during tests. * Utility class for accessing package-private DataNode information during tests.
@ -41,27 +38,4 @@ public class DataNodeTestUtils {
return dn.getDNRegistrationForBP(bpid); return dn.getDNRegistrationForBP(bpid);
} }
/**
* manually setup datanode to testing
* @param dn - datanode
* @param nsifno - namenode info
* @param bpid - block pool id
* @param nn - namenode object
* @throws IOException
*/
public static void setBPNamenodeByIndex(DataNode dn,
NamespaceInfo nsifno, String bpid, DatanodeProtocol nn)
throws IOException {
// setup the right BPOS..
BPOfferService [] bposs = dn.getAllBpOs();
if(bposs.length<0) {
throw new IOException("Datanode wasn't initializes with at least one NN");
}
for(BPOfferService bpos : bposs) {
bpos.setNamespaceInfo(nsifno);
dn.setBPNamenode(bpid, nn);
dn.initBlockPool(bpos, nsifno);
}
}
} }

View File

@ -20,6 +20,7 @@
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.AbstractList; import java.util.AbstractList;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
@ -28,29 +29,37 @@
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException; import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
import org.apache.hadoop.hdfs.server.datanode.DataNode.BPOfferService;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito;
public class TestDatanodeRegister { public class TestDatanodeRegister {
public static final Log LOG = LogFactory.getLog(TestDatanodeRegister.class); public static final Log LOG = LogFactory.getLog(TestDatanodeRegister.class);
// Invalid address
static final InetSocketAddress INVALID_ADDR =
new InetSocketAddress("127.0.0.1", 1);
@Test @Test
public void testDataNodeRegister() throws Exception { public void testDataNodeRegister() throws Exception {
DataNode.BPOfferService myMockBPOS = mock(DataNode.BPOfferService.class); DataNode mockDN = mock(DataNode.class);
doCallRealMethod().when(myMockBPOS).register(); Mockito.doReturn(true).when(mockDN).shouldRun();
myMockBPOS.bpRegistration = mock(DatanodeRegistration.class);
when(myMockBPOS.bpRegistration.getStorageID()).thenReturn("myTestStorageID"); BPOfferService bpos = new DataNode.BPOfferService(INVALID_ADDR, mockDN);
NamespaceInfo fakeNSInfo = mock(NamespaceInfo.class); NamespaceInfo fakeNSInfo = mock(NamespaceInfo.class);
when(fakeNSInfo.getBuildVersion()).thenReturn("NSBuildVersion"); when(fakeNSInfo.getBuildVersion()).thenReturn("NSBuildVersion");
DatanodeProtocol fakeDNProt = mock(DatanodeProtocol.class); DatanodeProtocol fakeDNProt = mock(DatanodeProtocol.class);
when(fakeDNProt.versionRequest()).thenReturn(fakeNSInfo); when(fakeDNProt.versionRequest()).thenReturn(fakeNSInfo);
doCallRealMethod().when(myMockBPOS).setNameNode(fakeDNProt);
myMockBPOS.setNameNode( fakeDNProt ); bpos.setNameNode( fakeDNProt );
bpos.bpNSInfo = fakeNSInfo;
try { try {
myMockBPOS.register(); bpos.retrieveNamespaceInfo();
fail("register() did not throw exception! " + fail("register() did not throw exception! " +
"Expected: IncorrectVersionException"); "Expected: IncorrectVersionException");
} catch (IncorrectVersionException ie) { } catch (IncorrectVersionException ie) {