diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 610d45ca8c..1ec2bd2400 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -962,6 +962,8 @@ Release 2.7.0 - UNRELEASED HDFS-7776. Adding additional unit tests for Quota By Storage Type. (Xiaoyu Yao via Arpit Agarwal) + HDFS-4625. BKJM doesn't take advantage of speculative reads. (Rakesh R + via aajisaka) Release 2.6.1 - UNRELEASED diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java index 227be6b2c1..51905c07f7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java @@ -59,6 +59,7 @@ import com.google.protobuf.TextFormat; import static com.google.common.base.Charsets.UTF_8; +import org.apache.commons.io.Charsets; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import com.google.common.annotations.VisibleForTesting; @@ -142,6 +143,15 @@ public class BookKeeperJournalManager implements JournalManager { public static final String BKJM_ZK_LEDGERS_AVAILABLE_PATH_DEFAULT = "/ledgers/available"; + public static final String BKJM_BOOKKEEPER_SPECULATIVE_READ_TIMEOUT_MS + = "dfs.namenode.bookkeeperjournal.speculativeReadTimeoutMs"; + public static final int BKJM_BOOKKEEPER_SPECULATIVE_READ_TIMEOUT_DEFAULT + = 2000; + + public static final String BKJM_BOOKKEEPER_READ_ENTRY_TIMEOUT_SEC + = "dfs.namenode.bookkeeperjournal.readEntryTimeoutSec"; + public static final int BKJM_BOOKKEEPER_READ_ENTRY_TIMEOUT_DEFAULT = 5; + private ZooKeeper zkc; private final Configuration conf; private final BookKeeper bkc; @@ -153,6 +163,8 @@ public class BookKeeperJournalManager implements JournalManager { private final int ensembleSize; private final int quorumSize; private final String digestpw; + private final int speculativeReadTimeout; + private final int readEntryTimeout; private final CountDownLatch zkConnectLatch; private final NamespaceInfo nsInfo; private boolean initialized = false; @@ -172,6 +184,11 @@ public BookKeeperJournalManager(Configuration conf, URI uri, BKJM_BOOKKEEPER_ENSEMBLE_SIZE_DEFAULT); quorumSize = conf.getInt(BKJM_BOOKKEEPER_QUORUM_SIZE, BKJM_BOOKKEEPER_QUORUM_SIZE_DEFAULT); + speculativeReadTimeout = conf.getInt( + BKJM_BOOKKEEPER_SPECULATIVE_READ_TIMEOUT_MS, + BKJM_BOOKKEEPER_SPECULATIVE_READ_TIMEOUT_DEFAULT); + readEntryTimeout = conf.getInt(BKJM_BOOKKEEPER_READ_ENTRY_TIMEOUT_SEC, + BKJM_BOOKKEEPER_READ_ENTRY_TIMEOUT_DEFAULT); ledgerPath = basePath + "/ledgers"; String maxTxIdPath = basePath + "/maxtxid"; @@ -196,7 +213,10 @@ public BookKeeperJournalManager(Configuration conf, URI uri, } prepareBookKeeperEnv(); - bkc = new BookKeeper(new ClientConfiguration(), zkc); + ClientConfiguration clientConf = new ClientConfiguration(); + clientConf.setSpeculativeReadTimeout(speculativeReadTimeout); + clientConf.setReadEntryTimeout(readEntryTimeout); + bkc = new BookKeeper(clientConf, zkc); } catch (KeeperException e) { throw new IOException("Error initializing zk", e); } catch (InterruptedException ie) { @@ -385,7 +405,7 @@ public EditLogOutputStream startLogSegment(long txId, int layoutVersion) } currentLedger = bkc.createLedger(ensembleSize, quorumSize, BookKeeper.DigestType.MAC, - digestpw.getBytes()); + digestpw.getBytes(Charsets.UTF_8)); } catch (BKException bke) { throw new IOException("Error creating ledger", bke); } catch (KeeperException ke) { @@ -522,10 +542,10 @@ public void selectInputStreams(Collection streams, LedgerHandle h; if (l.isInProgress()) { // we don't want to fence the current journal h = bkc.openLedgerNoRecovery(l.getLedgerId(), - BookKeeper.DigestType.MAC, digestpw.getBytes()); + BookKeeper.DigestType.MAC, digestpw.getBytes(Charsets.UTF_8)); } else { h = bkc.openLedger(l.getLedgerId(), BookKeeper.DigestType.MAC, - digestpw.getBytes()); + digestpw.getBytes(Charsets.UTF_8)); } elis = new BookKeeperEditLogInputStream(h, l); elis.skipTo(fromTxId); @@ -732,11 +752,11 @@ private long recoverLastTxId(EditLogLedgerMetadata l, boolean fence) if (fence) { lh = bkc.openLedger(l.getLedgerId(), BookKeeper.DigestType.MAC, - digestpw.getBytes()); + digestpw.getBytes(Charsets.UTF_8)); } else { lh = bkc.openLedgerNoRecovery(l.getLedgerId(), BookKeeper.DigestType.MAC, - digestpw.getBytes()); + digestpw.getBytes(Charsets.UTF_8)); } } catch (BKException bke) { throw new IOException("Exception opening ledger for " + l, bke); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperSpeculativeRead.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperSpeculativeRead.java new file mode 100644 index 0000000000..f5b86bc784 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperSpeculativeRead.java @@ -0,0 +1,167 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.contrib.bkjournal; + +import static org.junit.Assert.assertEquals; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.bookkeeper.proto.BookieServer; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream; +import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream; +import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp; +import org.apache.hadoop.hdfs.server.namenode.FSEditLogTestUtil; +import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion; +import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; +import org.apache.zookeeper.ZooKeeper; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestBookKeeperSpeculativeRead { + private static final Log LOG = LogFactory + .getLog(TestBookKeeperSpeculativeRead.class); + + private ZooKeeper zkc; + private static BKJMUtil bkutil; + private static int numLocalBookies = 1; + private static List bks = new ArrayList(); + + @BeforeClass + public static void setupBookkeeper() throws Exception { + bkutil = new BKJMUtil(1); + bkutil.start(); + } + + @AfterClass + public static void teardownBookkeeper() throws Exception { + bkutil.teardown(); + for (BookieServer bk : bks) { + bk.shutdown(); + } + } + + @Before + public void setup() throws Exception { + zkc = BKJMUtil.connectZooKeeper(); + } + + @After + public void teardown() throws Exception { + zkc.close(); + } + + private NamespaceInfo newNSInfo() { + Random r = new Random(); + return new NamespaceInfo(r.nextInt(), "testCluster", "TestBPID", -1); + } + + /** + * Test speculative read feature supported by bookkeeper. Keep one bookie + * alive and sleep all the other bookies. Non spec client will hang for long + * time to read the entries from the bookkeeper. + */ + @Test(timeout = 120000) + public void testSpeculativeRead() throws Exception { + // starting 9 more servers + for (int i = 1; i < 10; i++) { + bks.add(bkutil.newBookie()); + } + NamespaceInfo nsi = newNSInfo(); + Configuration conf = new Configuration(); + int ensembleSize = numLocalBookies + 9; + conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ENSEMBLE_SIZE, + ensembleSize); + conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_QUORUM_SIZE, + ensembleSize); + conf.setInt( + BookKeeperJournalManager.BKJM_BOOKKEEPER_SPECULATIVE_READ_TIMEOUT_MS, + 100); + // sets 60 minute + conf.setInt( + BookKeeperJournalManager.BKJM_BOOKKEEPER_READ_ENTRY_TIMEOUT_SEC, 3600); + BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, + BKJMUtil.createJournalURI("/hdfsjournal-specread"), nsi); + bkjm.format(nsi); + + final long numTransactions = 1000; + EditLogOutputStream out = bkjm.startLogSegment(1, + NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); + for (long i = 1; i <= numTransactions; i++) { + FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); + op.setTransactionId(i); + out.write(op); + } + out.close(); + bkjm.finalizeLogSegment(1, numTransactions); + + List in = new ArrayList(); + bkjm.selectInputStreams(in, 1, true); + + // sleep 9 bk servers. Now only one server is running and responding to the + // clients + CountDownLatch sleepLatch = new CountDownLatch(1); + for (final BookieServer bookie : bks) { + sleepBookie(sleepLatch, bookie); + } + try { + assertEquals(numTransactions, + FSEditLogTestUtil.countTransactionsInStream(in.get(0))); + } finally { + in.get(0).close(); + sleepLatch.countDown(); + bkjm.close(); + } + } + + /** + * Sleep a bookie until I count down the latch + * + * @param latch + * latch to wait on + * @param bookie + * bookie server + * @throws Exception + */ + private void sleepBookie(final CountDownLatch latch, final BookieServer bookie) + throws Exception { + + Thread sleeper = new Thread() { + public void run() { + try { + bookie.suspendProcessing(); + latch.await(2, TimeUnit.MINUTES); + bookie.resumeProcessing(); + } catch (Exception e) { + LOG.error("Error suspending bookie", e); + } + } + }; + sleeper.setName("BookieServerSleeper-" + bookie.getBookie().getId()); + sleeper.start(); + } +}