HDFS-4265. BKJM doesn't take advantage of speculative reads. Contributed by Rakesh R.

This commit is contained in:
Akira Ajisaka 2015-02-13 15:20:52 -08:00
parent 19be82cd16
commit 0d521e3326
3 changed files with 195 additions and 6 deletions

View File

@ -962,6 +962,8 @@ Release 2.7.0 - UNRELEASED
HDFS-7776. Adding additional unit tests for Quota By Storage Type. HDFS-7776. Adding additional unit tests for Quota By Storage Type.
(Xiaoyu Yao via Arpit Agarwal) (Xiaoyu Yao via Arpit Agarwal)
HDFS-4625. BKJM doesn't take advantage of speculative reads. (Rakesh R
via aajisaka)
Release 2.6.1 - UNRELEASED Release 2.6.1 - UNRELEASED

View File

@ -59,6 +59,7 @@
import com.google.protobuf.TextFormat; import com.google.protobuf.TextFormat;
import static com.google.common.base.Charsets.UTF_8; import static com.google.common.base.Charsets.UTF_8;
import org.apache.commons.io.Charsets;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
@ -142,6 +143,15 @@ public class BookKeeperJournalManager implements JournalManager {
public static final String BKJM_ZK_LEDGERS_AVAILABLE_PATH_DEFAULT public static final String BKJM_ZK_LEDGERS_AVAILABLE_PATH_DEFAULT
= "/ledgers/available"; = "/ledgers/available";
public static final String BKJM_BOOKKEEPER_SPECULATIVE_READ_TIMEOUT_MS
= "dfs.namenode.bookkeeperjournal.speculativeReadTimeoutMs";
public static final int BKJM_BOOKKEEPER_SPECULATIVE_READ_TIMEOUT_DEFAULT
= 2000;
public static final String BKJM_BOOKKEEPER_READ_ENTRY_TIMEOUT_SEC
= "dfs.namenode.bookkeeperjournal.readEntryTimeoutSec";
public static final int BKJM_BOOKKEEPER_READ_ENTRY_TIMEOUT_DEFAULT = 5;
private ZooKeeper zkc; private ZooKeeper zkc;
private final Configuration conf; private final Configuration conf;
private final BookKeeper bkc; private final BookKeeper bkc;
@ -153,6 +163,8 @@ public class BookKeeperJournalManager implements JournalManager {
private final int ensembleSize; private final int ensembleSize;
private final int quorumSize; private final int quorumSize;
private final String digestpw; private final String digestpw;
private final int speculativeReadTimeout;
private final int readEntryTimeout;
private final CountDownLatch zkConnectLatch; private final CountDownLatch zkConnectLatch;
private final NamespaceInfo nsInfo; private final NamespaceInfo nsInfo;
private boolean initialized = false; private boolean initialized = false;
@ -172,6 +184,11 @@ public BookKeeperJournalManager(Configuration conf, URI uri,
BKJM_BOOKKEEPER_ENSEMBLE_SIZE_DEFAULT); BKJM_BOOKKEEPER_ENSEMBLE_SIZE_DEFAULT);
quorumSize = conf.getInt(BKJM_BOOKKEEPER_QUORUM_SIZE, quorumSize = conf.getInt(BKJM_BOOKKEEPER_QUORUM_SIZE,
BKJM_BOOKKEEPER_QUORUM_SIZE_DEFAULT); BKJM_BOOKKEEPER_QUORUM_SIZE_DEFAULT);
speculativeReadTimeout = conf.getInt(
BKJM_BOOKKEEPER_SPECULATIVE_READ_TIMEOUT_MS,
BKJM_BOOKKEEPER_SPECULATIVE_READ_TIMEOUT_DEFAULT);
readEntryTimeout = conf.getInt(BKJM_BOOKKEEPER_READ_ENTRY_TIMEOUT_SEC,
BKJM_BOOKKEEPER_READ_ENTRY_TIMEOUT_DEFAULT);
ledgerPath = basePath + "/ledgers"; ledgerPath = basePath + "/ledgers";
String maxTxIdPath = basePath + "/maxtxid"; String maxTxIdPath = basePath + "/maxtxid";
@ -196,7 +213,10 @@ public BookKeeperJournalManager(Configuration conf, URI uri,
} }
prepareBookKeeperEnv(); prepareBookKeeperEnv();
bkc = new BookKeeper(new ClientConfiguration(), zkc); ClientConfiguration clientConf = new ClientConfiguration();
clientConf.setSpeculativeReadTimeout(speculativeReadTimeout);
clientConf.setReadEntryTimeout(readEntryTimeout);
bkc = new BookKeeper(clientConf, zkc);
} catch (KeeperException e) { } catch (KeeperException e) {
throw new IOException("Error initializing zk", e); throw new IOException("Error initializing zk", e);
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
@ -385,7 +405,7 @@ public EditLogOutputStream startLogSegment(long txId, int layoutVersion)
} }
currentLedger = bkc.createLedger(ensembleSize, quorumSize, currentLedger = bkc.createLedger(ensembleSize, quorumSize,
BookKeeper.DigestType.MAC, BookKeeper.DigestType.MAC,
digestpw.getBytes()); digestpw.getBytes(Charsets.UTF_8));
} catch (BKException bke) { } catch (BKException bke) {
throw new IOException("Error creating ledger", bke); throw new IOException("Error creating ledger", bke);
} catch (KeeperException ke) { } catch (KeeperException ke) {
@ -522,10 +542,10 @@ public void selectInputStreams(Collection<EditLogInputStream> streams,
LedgerHandle h; LedgerHandle h;
if (l.isInProgress()) { // we don't want to fence the current journal if (l.isInProgress()) { // we don't want to fence the current journal
h = bkc.openLedgerNoRecovery(l.getLedgerId(), h = bkc.openLedgerNoRecovery(l.getLedgerId(),
BookKeeper.DigestType.MAC, digestpw.getBytes()); BookKeeper.DigestType.MAC, digestpw.getBytes(Charsets.UTF_8));
} else { } else {
h = bkc.openLedger(l.getLedgerId(), BookKeeper.DigestType.MAC, h = bkc.openLedger(l.getLedgerId(), BookKeeper.DigestType.MAC,
digestpw.getBytes()); digestpw.getBytes(Charsets.UTF_8));
} }
elis = new BookKeeperEditLogInputStream(h, l); elis = new BookKeeperEditLogInputStream(h, l);
elis.skipTo(fromTxId); elis.skipTo(fromTxId);
@ -732,11 +752,11 @@ private long recoverLastTxId(EditLogLedgerMetadata l, boolean fence)
if (fence) { if (fence) {
lh = bkc.openLedger(l.getLedgerId(), lh = bkc.openLedger(l.getLedgerId(),
BookKeeper.DigestType.MAC, BookKeeper.DigestType.MAC,
digestpw.getBytes()); digestpw.getBytes(Charsets.UTF_8));
} else { } else {
lh = bkc.openLedgerNoRecovery(l.getLedgerId(), lh = bkc.openLedgerNoRecovery(l.getLedgerId(),
BookKeeper.DigestType.MAC, BookKeeper.DigestType.MAC,
digestpw.getBytes()); digestpw.getBytes(Charsets.UTF_8));
} }
} catch (BKException bke) { } catch (BKException bke) {
throw new IOException("Exception opening ledger for " + l, bke); throw new IOException("Exception opening ledger for " + l, bke);

View File

@ -0,0 +1,167 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.contrib.bkjournal;
import static org.junit.Assert.assertEquals;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogTestUtil;
import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
public class TestBookKeeperSpeculativeRead {
private static final Log LOG = LogFactory
.getLog(TestBookKeeperSpeculativeRead.class);
private ZooKeeper zkc;
private static BKJMUtil bkutil;
private static int numLocalBookies = 1;
private static List<BookieServer> bks = new ArrayList<BookieServer>();
@BeforeClass
public static void setupBookkeeper() throws Exception {
bkutil = new BKJMUtil(1);
bkutil.start();
}
@AfterClass
public static void teardownBookkeeper() throws Exception {
bkutil.teardown();
for (BookieServer bk : bks) {
bk.shutdown();
}
}
@Before
public void setup() throws Exception {
zkc = BKJMUtil.connectZooKeeper();
}
@After
public void teardown() throws Exception {
zkc.close();
}
private NamespaceInfo newNSInfo() {
Random r = new Random();
return new NamespaceInfo(r.nextInt(), "testCluster", "TestBPID", -1);
}
/**
* Test speculative read feature supported by bookkeeper. Keep one bookie
* alive and sleep all the other bookies. Non spec client will hang for long
* time to read the entries from the bookkeeper.
*/
@Test(timeout = 120000)
public void testSpeculativeRead() throws Exception {
// starting 9 more servers
for (int i = 1; i < 10; i++) {
bks.add(bkutil.newBookie());
}
NamespaceInfo nsi = newNSInfo();
Configuration conf = new Configuration();
int ensembleSize = numLocalBookies + 9;
conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ENSEMBLE_SIZE,
ensembleSize);
conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_QUORUM_SIZE,
ensembleSize);
conf.setInt(
BookKeeperJournalManager.BKJM_BOOKKEEPER_SPECULATIVE_READ_TIMEOUT_MS,
100);
// sets 60 minute
conf.setInt(
BookKeeperJournalManager.BKJM_BOOKKEEPER_READ_ENTRY_TIMEOUT_SEC, 3600);
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
BKJMUtil.createJournalURI("/hdfsjournal-specread"), nsi);
bkjm.format(nsi);
final long numTransactions = 1000;
EditLogOutputStream out = bkjm.startLogSegment(1,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
for (long i = 1; i <= numTransactions; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
op.setTransactionId(i);
out.write(op);
}
out.close();
bkjm.finalizeLogSegment(1, numTransactions);
List<EditLogInputStream> in = new ArrayList<EditLogInputStream>();
bkjm.selectInputStreams(in, 1, true);
// sleep 9 bk servers. Now only one server is running and responding to the
// clients
CountDownLatch sleepLatch = new CountDownLatch(1);
for (final BookieServer bookie : bks) {
sleepBookie(sleepLatch, bookie);
}
try {
assertEquals(numTransactions,
FSEditLogTestUtil.countTransactionsInStream(in.get(0)));
} finally {
in.get(0).close();
sleepLatch.countDown();
bkjm.close();
}
}
/**
* Sleep a bookie until I count down the latch
*
* @param latch
* latch to wait on
* @param bookie
* bookie server
* @throws Exception
*/
private void sleepBookie(final CountDownLatch latch, final BookieServer bookie)
throws Exception {
Thread sleeper = new Thread() {
public void run() {
try {
bookie.suspendProcessing();
latch.await(2, TimeUnit.MINUTES);
bookie.resumeProcessing();
} catch (Exception e) {
LOG.error("Error suspending bookie", e);
}
}
};
sleeper.setName("BookieServerSleeper-" + bookie.getBookie().getId());
sleeper.start();
}
}