HDFS-2717. BookKeeper Journal output stream doesn't check addComplete rc. Contributed by Ivan Kelly.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1340750 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Uma Maheswara Rao G 2012-05-20 15:14:53 +00:00
parent a37aa28e4a
commit 75041b9950
3 changed files with 278 additions and 47 deletions

View File

@ -33,6 +33,9 @@
import org.apache.hadoop.io.DataOutputBuffer;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Output stream for BookKeeper Journal.
* Multiple complete edit log entries are packed into a single bookkeeper
@ -44,11 +47,15 @@
*/
class BookKeeperEditLogOutputStream
extends EditLogOutputStream implements AddCallback {
static final Log LOG = LogFactory.getLog(BookKeeperEditLogOutputStream.class);
private final DataOutputBuffer bufCurrent;
private final AtomicInteger outstandingRequests;
private final int transmissionThreshold;
private final LedgerHandle lh;
private CountDownLatch syncLatch;
private final AtomicInteger transmitResult
= new AtomicInteger(BKException.Code.OK);
private final WriteLock wl;
private final Writer writer;
@ -141,6 +148,11 @@ public void flushAndSync() throws IOException {
} catch (InterruptedException ie) {
throw new IOException("Interrupted waiting on latch", ie);
}
if (transmitResult.get() != BKException.Code.OK) {
throw new IOException("Failed to write to bookkeeper; Error is ("
+ transmitResult.get() + ") "
+ BKException.getMessage(transmitResult.get()));
}
syncLatch = null;
// wait for whatever we wait on
@ -154,6 +166,12 @@ public void flushAndSync() throws IOException {
private void transmit() throws IOException {
wl.checkWriteLock();
if (!transmitResult.compareAndSet(BKException.Code.OK,
BKException.Code.OK)) {
throw new IOException("Trying to write to an errored stream;"
+ " Error code : (" + transmitResult.get()
+ ") " + BKException.getMessage(transmitResult.get()));
}
if (bufCurrent.getLength() > 0) {
byte[] entry = Arrays.copyOf(bufCurrent.getData(),
bufCurrent.getLength());
@ -168,6 +186,12 @@ public void addComplete(int rc, LedgerHandle handle,
long entryId, Object ctx) {
synchronized(this) {
outstandingRequests.decrementAndGet();
if (!transmitResult.compareAndSet(BKException.Code.OK, rc)) {
LOG.warn("Tried to set transmit result to (" + rc + ") \""
+ BKException.getMessage(rc) + "\""
+ " but is already (" + transmitResult.get() + ") \""
+ BKException.getMessage(transmitResult.get()) + "\"");
}
CountDownLatch l = syncLatch;
if (l != null) {
l.countDown();

View File

@ -61,7 +61,7 @@
* </property>
*
* <property>
* <name>dfs.namenode.edits.journalPlugin.bookkeeper</name>
* <name>dfs.namenode.edits.journal-plugin.bookkeeper</name>
* <value>org.apache.hadoop.contrib.bkjournal.BookKeeperJournalManager</value>
* </property>
* }
@ -212,15 +212,15 @@ public EditLogOutputStream startLogSegment(long txId) throws IOException {
throw new IOException("We've already seen " + txId
+ ". A new stream cannot be created with it");
}
if (currentLedger != null) {
throw new IOException("Already writing to a ledger, id="
+ currentLedger.getId());
}
try {
if (currentLedger != null) {
// bookkeeper errored on last stream, clean up ledger
currentLedger.close();
}
currentLedger = bkc.createLedger(ensembleSize, quorumSize,
BookKeeper.DigestType.MAC,
digestpw.getBytes());
String znodePath = inprogressZNode();
String znodePath = inprogressZNode(txId);
EditLogLedgerMetadata l = new EditLogLedgerMetadata(znodePath,
HdfsConstants.LAYOUT_VERSION, currentLedger.getId(), txId);
/* Write the ledger metadata out to the inprogress ledger znode
@ -258,7 +258,7 @@ public EditLogOutputStream startLogSegment(long txId) throws IOException {
@Override
public void finalizeLogSegment(long firstTxId, long lastTxId)
throws IOException {
String inprogressPath = inprogressZNode();
String inprogressPath = inprogressZNode(firstTxId);
try {
Stat inprogressStat = zkc.exists(inprogressPath, false);
if (inprogressStat == null) {
@ -372,21 +372,33 @@ public long getNumberOfTransactions(long fromTxnId, boolean inProgressOk)
@Override
public void recoverUnfinalizedSegments() throws IOException {
wl.acquire();
synchronized (this) {
try {
EditLogLedgerMetadata l
= EditLogLedgerMetadata.read(zkc, inprogressZNode());
long endTxId = recoverLastTxId(l);
if (endTxId == HdfsConstants.INVALID_TXID) {
LOG.error("Unrecoverable corruption has occurred in segment "
+ l.toString() + " at path " + inprogressZNode()
+ ". Unable to continue recovery.");
throw new IOException("Unrecoverable corruption, please check logs.");
List<String> children = zkc.getChildren(ledgerPath, false);
for (String child : children) {
if (!child.startsWith("inprogress_")) {
continue;
}
String znode = ledgerPath + "/" + child;
EditLogLedgerMetadata l
= EditLogLedgerMetadata.read(zkc, znode);
long endTxId = recoverLastTxId(l);
if (endTxId == HdfsConstants.INVALID_TXID) {
LOG.error("Unrecoverable corruption has occurred in segment "
+ l.toString() + " at path " + znode
+ ". Unable to continue recovery.");
throw new IOException("Unrecoverable corruption,"
+ " please check logs.");
}
finalizeLogSegment(l.getFirstTxId(), endTxId);
}
finalizeLogSegment(l.getFirstTxId(), endTxId);
} catch (KeeperException.NoNodeException nne) {
// nothing to recover, ignore
} catch (KeeperException ke) {
throw new IOException("Couldn't get list of inprogress segments", ke);
} catch (InterruptedException ie) {
throw new IOException("Interrupted getting list of inprogress segments",
ie);
} finally {
if (wl.haveLock()) {
wl.release();
@ -495,8 +507,8 @@ String finalizedLedgerZNode(long startTxId, long endTxId) {
/**
* Get the znode path for the inprogressZNode
*/
String inprogressZNode() {
return ledgerPath + "/inprogress";
String inprogressZNode(long startTxid) {
return ledgerPath + "/inprogress_" + Long.toString(startTxid, 16);
}
/**

View File

@ -28,6 +28,8 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.util.LocalBookKeeper;
import java.io.RandomAccessFile;
@ -74,11 +76,15 @@ public class TestBookKeeperJournalManager {
private static final long DEFAULT_SEGMENT_SIZE = 1000;
private static final String zkEnsemble = "localhost:2181";
final static private int numBookies = 5;
private static Thread bkthread;
protected static Configuration conf = new Configuration();
private ZooKeeper zkc;
static int nextPort = 6000; // next port for additionally created bookies
private static ZooKeeper connectZooKeeper(String ensemble)
throws IOException, KeeperException, InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
@ -96,9 +102,72 @@ public void process(WatchedEvent event) {
return zkc;
}
private static BookieServer newBookie() throws Exception {
int port = nextPort++;
ServerConfiguration bookieConf = new ServerConfiguration();
bookieConf.setBookiePort(port);
File tmpdir = File.createTempFile("bookie" + Integer.toString(port) + "_",
"test");
tmpdir.delete();
tmpdir.mkdir();
bookieConf.setZkServers(zkEnsemble);
bookieConf.setJournalDirName(tmpdir.getPath());
bookieConf.setLedgerDirNames(new String[] { tmpdir.getPath() });
BookieServer b = new BookieServer(bookieConf);
b.start();
for (int i = 0; i < 10 && !b.isRunning(); i++) {
Thread.sleep(10000);
}
if (!b.isRunning()) {
throw new IOException("Bookie would not start");
}
return b;
}
/**
* Check that a number of bookies are available
* @param count number of bookies required
* @param timeout number of seconds to wait for bookies to start
* @throws IOException if bookies are not started by the time the timeout hits
*/
private static int checkBookiesUp(int count, int timeout) throws Exception {
ZooKeeper zkc = connectZooKeeper(zkEnsemble);
try {
boolean up = false;
int mostRecentSize = 0;
for (int i = 0; i < timeout; i++) {
try {
List<String> children = zkc.getChildren("/ledgers/available",
false);
mostRecentSize = children.size();
if (LOG.isDebugEnabled()) {
LOG.debug("Found " + mostRecentSize + " bookies up, "
+ "waiting for " + count);
if (LOG.isTraceEnabled()) {
for (String child : children) {
LOG.trace(" server: " + child);
}
}
}
if (mostRecentSize == count) {
up = true;
break;
}
} catch (KeeperException e) {
// ignore
}
Thread.sleep(1000);
}
return mostRecentSize;
} finally {
zkc.close();
}
}
@BeforeClass
public static void setupBookkeeper() throws Exception {
final int numBookies = 5;
bkthread = new Thread() {
public void run() {
try {
@ -118,29 +187,8 @@ public void run() {
if (!LocalBookKeeper.waitForServerUp(zkEnsemble, 10000)) {
throw new Exception("Error starting zookeeper/bookkeeper");
}
ZooKeeper zkc = connectZooKeeper(zkEnsemble);
try {
boolean up = false;
for (int i = 0; i < 10; i++) {
try {
List<String> children = zkc.getChildren("/ledgers/available",
false);
if (children.size() == numBookies) {
up = true;
break;
}
} catch (KeeperException e) {
// ignore
}
Thread.sleep(1000);
}
if (!up) {
throw new IOException("Not enough bookies started");
}
} finally {
zkc.close();
}
assertEquals("Not all bookies started",
numBookies, checkBookiesUp(numBookies, 10));
}
@Before
@ -178,7 +226,7 @@ public void testSimpleWrite() throws Exception {
String zkpath = bkjm.finalizedLedgerZNode(1, 100);
assertNotNull(zkc.exists(zkpath, false));
assertNull(zkc.exists(bkjm.inprogressZNode(), false));
assertNull(zkc.exists(bkjm.inprogressZNode(1), false));
}
@Test
@ -385,11 +433,158 @@ public void testSimpleRecovery() throws Exception {
assertNull(zkc.exists(bkjm.finalizedLedgerZNode(1, 100), false));
assertNotNull(zkc.exists(bkjm.inprogressZNode(), false));
assertNotNull(zkc.exists(bkjm.inprogressZNode(1), false));
bkjm.recoverUnfinalizedSegments();
assertNotNull(zkc.exists(bkjm.finalizedLedgerZNode(1, 100), false));
assertNull(zkc.exists(bkjm.inprogressZNode(), false));
assertNull(zkc.exists(bkjm.inprogressZNode(1), false));
}
}
/**
* Test that if enough bookies fail to prevent an ensemble,
* writes the bookkeeper will fail. Test that when once again
* an ensemble is available, it can continue to write.
*/
@Test
public void testAllBookieFailure() throws Exception {
BookieServer bookieToFail = newBookie();
BookieServer replacementBookie = null;
try {
int ensembleSize = numBookies + 1;
assertEquals("New bookie didn't start",
ensembleSize, checkBookiesUp(ensembleSize, 10));
// ensure that the journal manager has to use all bookies,
// so that a failure will fail the journal manager
Configuration conf = new Configuration();
conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ENSEMBLE_SIZE,
ensembleSize);
conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_QUORUM_SIZE,
ensembleSize);
long txid = 1;
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
URI.create("bookkeeper://" + zkEnsemble
+ "/hdfsjournal-allbookiefailure"));
EditLogOutputStream out = bkjm.startLogSegment(txid);
for (long i = 1 ; i <= 3; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
op.setTransactionId(txid++);
out.write(op);
}
out.setReadyToFlush();
out.flush();
bookieToFail.shutdown();
assertEquals("New bookie didn't die",
numBookies, checkBookiesUp(numBookies, 10));
try {
for (long i = 1 ; i <= 3; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
op.setTransactionId(txid++);
out.write(op);
}
out.setReadyToFlush();
out.flush();
fail("should not get to this stage");
} catch (IOException ioe) {
LOG.debug("Error writing to bookkeeper", ioe);
assertTrue("Invalid exception message",
ioe.getMessage().contains("Failed to write to bookkeeper"));
}
replacementBookie = newBookie();
assertEquals("New bookie didn't start",
numBookies+1, checkBookiesUp(numBookies+1, 10));
out = bkjm.startLogSegment(txid);
for (long i = 1 ; i <= 3; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
op.setTransactionId(txid++);
out.write(op);
}
out.setReadyToFlush();
out.flush();
} catch (Exception e) {
LOG.error("Exception in test", e);
throw e;
} finally {
if (replacementBookie != null) {
replacementBookie.shutdown();
}
bookieToFail.shutdown();
if (checkBookiesUp(numBookies, 30) != numBookies) {
LOG.warn("Not all bookies from this test shut down, expect errors");
}
}
}
/**
* Test that a BookKeeper JM can continue to work across the
* failure of a bookie. This should be handled transparently
* by bookkeeper.
*/
@Test
public void testOneBookieFailure() throws Exception {
BookieServer bookieToFail = newBookie();
BookieServer replacementBookie = null;
try {
int ensembleSize = numBookies + 1;
assertEquals("New bookie didn't start",
ensembleSize, checkBookiesUp(ensembleSize, 10));
// ensure that the journal manager has to use all bookies,
// so that a failure will fail the journal manager
Configuration conf = new Configuration();
conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ENSEMBLE_SIZE,
ensembleSize);
conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_QUORUM_SIZE,
ensembleSize);
long txid = 1;
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
URI.create("bookkeeper://" + zkEnsemble
+ "/hdfsjournal-onebookiefailure"));
EditLogOutputStream out = bkjm.startLogSegment(txid);
for (long i = 1 ; i <= 3; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
op.setTransactionId(txid++);
out.write(op);
}
out.setReadyToFlush();
out.flush();
replacementBookie = newBookie();
assertEquals("replacement bookie didn't start",
ensembleSize+1, checkBookiesUp(ensembleSize+1, 10));
bookieToFail.shutdown();
assertEquals("New bookie didn't die",
ensembleSize, checkBookiesUp(ensembleSize, 10));
for (long i = 1 ; i <= 3; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
op.setTransactionId(txid++);
out.write(op);
}
out.setReadyToFlush();
out.flush();
} catch (Exception e) {
LOG.error("Exception in test", e);
throw e;
} finally {
if (replacementBookie != null) {
replacementBookie.shutdown();
}
bookieToFail.shutdown();
if (checkBookiesUp(numBookies, 30) != numBookies) {
LOG.warn("Not all bookies from this test shut down, expect errors");
}
}
}
}