HDFS-3452. BKJM:Switch from standby to active fails and NN gets shut down due to delay in clearing of lock. Contributed by Uma Maheswara Rao G.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1343913 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Uma Maheswara Rao G 2012-05-29 18:50:07 +00:00
parent 515e29ac82
commit 787c08c05a
7 changed files with 345 additions and 230 deletions

View File

@ -56,15 +56,13 @@ class BookKeeperEditLogOutputStream
private CountDownLatch syncLatch;
private final AtomicInteger transmitResult
= new AtomicInteger(BKException.Code.OK);
private final WriteLock wl;
private final Writer writer;
/**
* Construct an edit log output stream which writes to a ledger.
*/
protected BookKeeperEditLogOutputStream(Configuration conf,
LedgerHandle lh, WriteLock wl)
protected BookKeeperEditLogOutputStream(Configuration conf, LedgerHandle lh)
throws IOException {
super();
@ -72,8 +70,6 @@ protected BookKeeperEditLogOutputStream(Configuration conf,
outstandingRequests = new AtomicInteger(0);
syncLatch = null;
this.lh = lh;
this.wl = wl;
this.wl.acquire();
this.writer = new Writer(bufCurrent);
this.transmissionThreshold
= conf.getInt(BookKeeperJournalManager.BKJM_OUTPUT_BUFFER_SIZE,
@ -108,7 +104,6 @@ public void abort() throws IOException {
throw new IOException("BookKeeper error during abort", bke);
}
wl.release();
}
@Override
@ -118,8 +113,6 @@ public void writeRaw(final byte[] data, int off, int len) throws IOException {
@Override
public void write(FSEditLogOp op) throws IOException {
wl.checkWriteLock();
writer.writeOp(op);
if (bufCurrent.getLength() > transmissionThreshold) {
@ -129,19 +122,15 @@ public void write(FSEditLogOp op) throws IOException {
@Override
public void setReadyToFlush() throws IOException {
wl.checkWriteLock();
transmit();
synchronized(this) {
synchronized (this) {
syncLatch = new CountDownLatch(outstandingRequests.get());
}
}
@Override
public void flushAndSync() throws IOException {
wl.checkWriteLock();
assert(syncLatch != null);
try {
syncLatch.await();
@ -164,8 +153,6 @@ public void flushAndSync() throws IOException {
* are never called at the same time.
*/
private void transmit() throws IOException {
wl.checkWriteLock();
if (!transmitResult.compareAndSet(BKException.Code.OK,
BKException.Code.OK)) {
throw new IOException("Trying to write to an errored stream;"

View File

@ -117,7 +117,7 @@ public class BookKeeperJournalManager implements JournalManager {
private final ZooKeeper zkc;
private final Configuration conf;
private final BookKeeper bkc;
private final WriteLock wl;
private final CurrentInprogress ci;
private final String ledgerPath;
private final MaxTxId maxTxId;
private final int ensembleSize;
@ -155,7 +155,7 @@ public BookKeeperJournalManager(Configuration conf, URI uri)
ledgerPath = zkPath + "/ledgers";
String maxTxIdPath = zkPath + "/maxtxid";
String lockPath = zkPath + "/lock";
String currentInprogressNodePath = zkPath + "/CurrentInprogress";
String versionPath = zkPath + "/version";
digestpw = conf.get(BKJM_BOOKKEEPER_DIGEST_PW,
BKJM_BOOKKEEPER_DIGEST_PW_DEFAULT);
@ -192,7 +192,7 @@ public BookKeeperJournalManager(Configuration conf, URI uri)
throw new IOException("Error initializing zk", e);
}
wl = new WriteLock(zkc, lockPath);
ci = new CurrentInprogress(zkc, currentInprogressNodePath);
maxTxId = new MaxTxId(zkc, maxTxIdPath);
}
@ -207,13 +207,16 @@ public BookKeeperJournalManager(Configuration conf, URI uri)
*/
@Override
public EditLogOutputStream startLogSegment(long txId) throws IOException {
wl.acquire();
if (txId <= maxTxId.get()) {
throw new IOException("We've already seen " + txId
+ ". A new stream cannot be created with it");
}
try {
String existingInprogressNode = ci.read();
if (null != existingInprogressNode
&& zkc.exists(existingInprogressNode, false) != null) {
throw new IOException("Inprogress node already exists");
}
if (currentLedger != null) {
// bookkeeper errored on last stream, clean up ledger
currentLedger.close();
@ -234,7 +237,8 @@ public EditLogOutputStream startLogSegment(long txId) throws IOException {
l.write(zkc, znodePath);
maxTxId.store(txId);
return new BookKeeperEditLogOutputStream(conf, currentLedger, wl);
ci.update(znodePath);
return new BookKeeperEditLogOutputStream(conf, currentLedger);
} catch (Exception e) {
if (currentLedger != null) {
try {
@ -270,7 +274,6 @@ public void finalizeLogSegment(long firstTxId, long lastTxId)
+ " doesn't exist");
}
wl.checkWriteLock();
EditLogLedgerMetadata l
= EditLogLedgerMetadata.read(zkc, inprogressPath);
@ -307,13 +310,15 @@ public void finalizeLogSegment(long firstTxId, long lastTxId)
}
maxTxId.store(lastTxId);
zkc.delete(inprogressPath, inprogressStat.getVersion());
String inprogressPathFromCI = ci.read();
if (inprogressPath.equals(inprogressPathFromCI)) {
ci.clear();
}
} catch (KeeperException e) {
throw new IOException("Error finalising ledger", e);
} catch (InterruptedException ie) {
throw new IOException("Error finalising ledger", ie);
} finally {
wl.release();
}
}
}
EditLogInputStream getInputStream(long fromTxId, boolean inProgressOk)
@ -417,7 +422,6 @@ long getNumberOfTransactions(long fromTxId, boolean inProgressOk)
@Override
public void recoverUnfinalizedSegments() throws IOException {
wl.acquire();
synchronized (this) {
try {
List<String> children = zkc.getChildren(ledgerPath, false);
@ -445,10 +449,6 @@ public void recoverUnfinalizedSegments() throws IOException {
} catch (InterruptedException ie) {
throw new IOException("Interrupted getting list of inprogress segments",
ie);
} finally {
if (wl.haveLock()) {
wl.release();
}
}
}
}

View File

@ -0,0 +1,161 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.contrib.bkjournal;
import java.io.IOException;
import java.net.InetAddress;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
/**
* Distributed write permission lock, using ZooKeeper. Read the version number
* and return the current inprogress node path available in CurrentInprogress
* path. If it exist, caller can treat that some other client already operating
* on it. Then caller can take action. If there is no inprogress node exist,
* then caller can treat that there is no client operating on it. Later same
* caller should update the his newly created inprogress node path. At this
* point, if some other activities done on this node, version number might
* change, so update will fail. So, this read, update api will ensure that there
* is only node can continue further after checking with CurrentInprogress.
*/
class CurrentInprogress {
private static final String CONTENT_DELIMITER = ",";
static final Log LOG = LogFactory.getLog(CurrentInprogress.class);
private final ZooKeeper zkc;
private final String currentInprogressNode;
private volatile int versionNumberForPermission = -1;
private static final int CURRENT_INPROGRESS_LAYOUT_VERSION = -1;
private final String hostName = InetAddress.getLocalHost().toString();
CurrentInprogress(ZooKeeper zkc, String lockpath) throws IOException {
this.currentInprogressNode = lockpath;
this.zkc = zkc;
try {
Stat isCurrentInprogressNodeExists = zkc.exists(lockpath, false);
if (isCurrentInprogressNodeExists == null) {
try {
zkc.create(lockpath, null, Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
} catch (NodeExistsException e) {
// Node might created by other process at the same time. Ignore it.
if (LOG.isDebugEnabled()) {
LOG.debug(lockpath + " already created by other process.", e);
}
}
}
} catch (Exception e) {
throw new IOException("Exception accessing Zookeeper", e);
}
}
/**
* Update the path with prepending version number and hostname
*
* @param path
* - to be updated in zookeeper
* @throws IOException
*/
void update(String path) throws IOException {
String content = CURRENT_INPROGRESS_LAYOUT_VERSION
+ CONTENT_DELIMITER + hostName + CONTENT_DELIMITER + path;
try {
zkc.setData(this.currentInprogressNode, content.getBytes(),
this.versionNumberForPermission);
} catch (KeeperException e) {
throw new IOException("Exception when setting the data "
+ "[layout version number,hostname,inprogressNode path]= [" + content
+ "] to CurrentInprogress. ", e);
} catch (InterruptedException e) {
throw new IOException("Interrupted while setting the data "
+ "[layout version number,hostname,inprogressNode path]= [" + content
+ "] to CurrentInprogress", e);
}
LOG.info("Updated data[layout version number,hostname,inprogressNode path]"
+ "= [" + content + "] to CurrentInprogress");
}
/**
* Read the CurrentInprogress node data from Zookeeper and also get the znode
* version number. Return the 3rd field from the data. i.e saved path with
* #update api
*
* @return available inprogress node path. returns null if not available.
* @throws IOException
*/
String read() throws IOException {
Stat stat = new Stat();
byte[] data = null;
try {
data = zkc.getData(this.currentInprogressNode, false, stat);
} catch (KeeperException e) {
throw new IOException("Exception while reading the data from "
+ currentInprogressNode, e);
} catch (InterruptedException e) {
throw new IOException("Interrupted while reading data from "
+ currentInprogressNode, e);
}
this.versionNumberForPermission = stat.getVersion();
if (data != null) {
String stringData = new String(data);
LOG.info("Read data[layout version number,hostname,inprogressNode path]"
+ "= [" + stringData + "] from CurrentInprogress");
String[] contents = stringData.split(CONTENT_DELIMITER);
assert contents.length == 3 : "As per the current data format, "
+ "CurrentInprogress node data should contain 3 fields. "
+ "i.e layout version number,hostname,inprogressNode path";
String layoutVersion = contents[0];
if (Long.valueOf(layoutVersion) > CURRENT_INPROGRESS_LAYOUT_VERSION) {
throw new IOException(
"Supported layout version of CurrentInprogress node is : "
+ CURRENT_INPROGRESS_LAYOUT_VERSION
+ " . Layout version of CurrentInprogress node in ZK is : "
+ layoutVersion);
}
String inprogressNodePath = contents[2];
return inprogressNodePath;
} else {
LOG.info("No data available in CurrentInprogress");
}
return null;
}
/** Clear the CurrentInprogress node data */
void clear() throws IOException {
try {
zkc.setData(this.currentInprogressNode, null, versionNumberForPermission);
} catch (KeeperException e) {
throw new IOException(
"Exception when setting the data to CurrentInprogress node", e);
} catch (InterruptedException e) {
throw new IOException(
"Interrupted when setting the data to CurrentInprogress node", e);
}
LOG.info("Cleared the data from CurrentInprogress");
}
}

View File

@ -1,186 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.contrib.bkjournal;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs.Ids;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.List;
import java.util.Collections;
import java.util.Comparator;
import java.net.InetAddress;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Distributed lock, using ZooKeeper.
*
* The lock is vulnerable to timing issues. For example, the process could
* encounter a really long GC cycle between acquiring the lock, and writing to
* a ledger. This could have timed out the lock, and another process could have
* acquired the lock and started writing to bookkeeper. Therefore other
* mechanisms are required to ensure correctness (i.e. Fencing).
*/
class WriteLock implements Watcher {
static final Log LOG = LogFactory.getLog(WriteLock.class);
private final ZooKeeper zkc;
private final String lockpath;
private AtomicInteger lockCount = new AtomicInteger(0);
private String myznode = null;
WriteLock(ZooKeeper zkc, String lockpath) throws IOException {
this.lockpath = lockpath;
this.zkc = zkc;
try {
if (zkc.exists(lockpath, false) == null) {
String localString = InetAddress.getLocalHost().toString();
zkc.create(lockpath, localString.getBytes(),
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (Exception e) {
throw new IOException("Exception accessing Zookeeper", e);
}
}
void acquire() throws IOException {
while (true) {
if (lockCount.get() == 0) {
try {
synchronized(this) {
if (lockCount.get() > 0) {
lockCount.incrementAndGet();
return;
}
myznode = zkc.create(lockpath + "/lock-", new byte[] {'0'},
Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
if (LOG.isTraceEnabled()) {
LOG.trace("Acquiring lock, trying " + myznode);
}
List<String> nodes = zkc.getChildren(lockpath, false);
Collections.sort(nodes, new Comparator<String>() {
public int compare(String o1,
String o2) {
Integer l1 = Integer.valueOf(o1.replace("lock-", ""));
Integer l2 = Integer.valueOf(o2.replace("lock-", ""));
return l1 - l2;
}
});
if ((lockpath + "/" + nodes.get(0)).equals(myznode)) {
if (LOG.isTraceEnabled()) {
LOG.trace("Lock acquired - " + myznode);
}
lockCount.set(1);
zkc.exists(myznode, this);
return;
} else {
LOG.error("Failed to acquire lock with " + myznode
+ ", " + nodes.get(0) + " already has it");
throw new IOException("Could not acquire lock");
}
}
} catch (KeeperException e) {
throw new IOException("Exception accessing Zookeeper", e);
} catch (InterruptedException ie) {
throw new IOException("Exception accessing Zookeeper", ie);
}
} else {
int ret = lockCount.getAndIncrement();
if (ret == 0) {
lockCount.decrementAndGet();
continue; // try again;
} else {
return;
}
}
}
}
void release() throws IOException {
try {
if (lockCount.decrementAndGet() <= 0) {
if (lockCount.get() < 0) {
LOG.warn("Unbalanced lock handling somewhere, lockCount down to "
+ lockCount.get());
}
synchronized(this) {
if (lockCount.get() <= 0) {
if (LOG.isTraceEnabled()) {
LOG.trace("releasing lock " + myznode);
}
if (myznode != null) {
zkc.delete(myznode, -1);
myznode = null;
}
}
}
}
} catch (Exception e) {
throw new IOException("Exception accessing Zookeeper", e);
}
}
public void checkWriteLock() throws IOException {
if (!haveLock()) {
throw new IOException("Lost writer lock");
}
}
boolean haveLock() throws IOException {
return lockCount.get() > 0;
}
public void process(WatchedEvent event) {
if (event.getState() == KeeperState.Disconnected
|| event.getState() == KeeperState.Expired) {
LOG.warn("Lost zookeeper session, lost lock ");
lockCount.set(0);
} else {
// reapply the watch
synchronized (this) {
LOG.info("Zookeeper event " + event
+ " received, reapplying watch to " + myznode);
if (myznode != null) {
try {
zkc.exists(myznode, this);
} catch (Exception e) {
LOG.warn("Could not set watch on lock, releasing", e);
try {
release();
} catch (IOException ioe) {
LOG.error("Could not release Zk lock", ioe);
}
}
}
}
}
}
}

View File

@ -215,8 +215,7 @@ public void testFailoverWithFailingBKCluster() throws Exception {
}
/**
* Test that two namenodes can't become primary at the same
* time.
* Test that two namenodes can't continue as primary
*/
@Test
public void testMultiplePrimariesStarted() throws Exception {
@ -247,21 +246,17 @@ public void testMultiplePrimariesStarted() throws Exception {
FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf);
fs.mkdirs(p1);
nn1.getRpcServer().rollEditLog();
try {
cluster.transitionToActive(1);
fail("Shouldn't have been able to start two primaries"
+ " with single shared storage");
} catch (ServiceFailedException sfe) {
assertTrue("Wrong exception",
sfe.getMessage().contains("Failed to start active services"));
}
cluster.transitionToActive(1);
fs = cluster.getFileSystem(0); // get the older active server.
// This edit log updation on older active should make older active
// shutdown.
fs.delete(p1, true);
verify(mockRuntime1, atLeastOnce()).exit(anyInt());
verify(mockRuntime2, times(0)).exit(anyInt());
} finally {
verify(mockRuntime1, times(0)).exit(anyInt());
verify(mockRuntime2, atLeastOnce()).exit(anyInt());
if (cluster != null) {
cluster.shutdown();
}
}
}
}
}

View File

@ -361,6 +361,7 @@ public void testAllBookieFailure() throws Exception {
assertEquals("New bookie didn't start",
numBookies+1, bkutil.checkBookiesUp(numBookies+1, 10));
bkjm.recoverUnfinalizedSegments();
out = bkjm.startLogSegment(txid);
for (long i = 1 ; i <= 3; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();

View File

@ -0,0 +1,157 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.contrib.bkjournal;
import static org.junit.Assert.assertEquals;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.util.LocalBookKeeper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.server.NIOServerCnxnFactory;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* Tests that read, update, clear api from CurrentInprogress
*/
public class TestCurrentInprogress {
private static final Log LOG = LogFactory.getLog(TestCurrentInprogress.class);
private static final String CURRENT_NODE_PATH = "/test";
private static final String HOSTPORT = "127.0.0.1:2181";
private static final int CONNECTION_TIMEOUT = 30000;
private static NIOServerCnxnFactory serverFactory;
private static ZooKeeperServer zks;
private static ZooKeeper zkc;
private static int ZooKeeperDefaultPort = 2181;
private static File zkTmpDir;
private static ZooKeeper connectZooKeeper(String ensemble)
throws IOException, KeeperException, InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
ZooKeeper zkc = new ZooKeeper(HOSTPORT, 3600, new Watcher() {
public void process(WatchedEvent event) {
if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
latch.countDown();
}
}
});
if (!latch.await(10, TimeUnit.SECONDS)) {
throw new IOException("Zookeeper took too long to connect");
}
return zkc;
}
@BeforeClass
public static void setupZooKeeper() throws Exception {
LOG.info("Starting ZK server");
zkTmpDir = File.createTempFile("zookeeper", "test");
zkTmpDir.delete();
zkTmpDir.mkdir();
try {
zks = new ZooKeeperServer(zkTmpDir, zkTmpDir, ZooKeeperDefaultPort);
serverFactory = new NIOServerCnxnFactory();
serverFactory.configure(new InetSocketAddress(ZooKeeperDefaultPort), 10);
serverFactory.startup(zks);
} catch (Exception e) {
LOG.error("Exception while instantiating ZooKeeper", e);
}
boolean b = LocalBookKeeper.waitForServerUp(HOSTPORT, CONNECTION_TIMEOUT);
LOG.debug("ZooKeeper server up: " + b);
}
@AfterClass
public static void shutDownServer() {
if (null != zks) {
zks.shutdown();
}
zkTmpDir.delete();
}
@Before
public void setup() throws Exception {
zkc = connectZooKeeper(HOSTPORT);
}
@After
public void teardown() throws Exception {
if (null != zkc) {
zkc.close();
}
}
/**
* Tests that read should be able to read the data which updated with update
* api
*/
@Test
public void testReadShouldReturnTheZnodePathAfterUpdate() throws Exception {
String data = "inprogressNode";
CurrentInprogress ci = new CurrentInprogress(zkc, CURRENT_NODE_PATH);
ci.update(data);
String inprogressNodePath = ci.read();
assertEquals("Not returning inprogressZnode", "inprogressNode",
inprogressNodePath);
}
/**
* Tests that read should return null if we clear the updated data in
* CurrentInprogress node
*/
@Test
public void testReadShouldReturnNullAfterClear() throws Exception {
CurrentInprogress ci = new CurrentInprogress(zkc, CURRENT_NODE_PATH);
ci.update("myInprogressZnode");
ci.read();
ci.clear();
String inprogressNodePath = ci.read();
assertEquals("Expecting null to be return", null, inprogressNodePath);
}
/**
* Tests that update should throw IOE, if version number modifies between read
* and update
*/
@Test(expected = IOException.class)
public void testUpdateShouldFailWithIOEIfVersionNumberChangedAfterRead()
throws Exception {
CurrentInprogress ci = new CurrentInprogress(zkc, CURRENT_NODE_PATH);
ci.update("myInprogressZnode");
assertEquals("Not returning myInprogressZnode", "myInprogressZnode", ci
.read());
// Updating data in-between to change the data to change the version number
ci.update("YourInprogressZnode");
ci.update("myInprogressZnode");
}
}