HDFS-3810. Implement format() for BKJM. Contributed by Ivan Kelly.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1407182 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
8a1f333707
commit
f1fe91ec95
@ -560,6 +560,8 @@ Release 2.0.3-alpha - Unreleased
|
|||||||
HDFS-3979. For hsync, datanode should wait for the local sync to complete
|
HDFS-3979. For hsync, datanode should wait for the local sync to complete
|
||||||
before sending ack. (Lars Hofhansl via szetszwo)
|
before sending ack. (Lars Hofhansl via szetszwo)
|
||||||
|
|
||||||
|
HDFS-3810. Implement format() for BKJM (Ivan Kelly via umamahesh)
|
||||||
|
|
||||||
Release 2.0.2-alpha - 2012-09-07
|
Release 2.0.2-alpha - 2012-09-07
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -39,6 +39,7 @@
|
|||||||
import org.apache.zookeeper.CreateMode;
|
import org.apache.zookeeper.CreateMode;
|
||||||
import org.apache.zookeeper.ZooDefs.Ids;
|
import org.apache.zookeeper.ZooDefs.Ids;
|
||||||
import org.apache.zookeeper.AsyncCallback.StringCallback;
|
import org.apache.zookeeper.AsyncCallback.StringCallback;
|
||||||
|
import org.apache.zookeeper.ZKUtil;
|
||||||
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
@ -46,6 +47,7 @@
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
@ -142,13 +144,16 @@ public class BookKeeperJournalManager implements JournalManager {
|
|||||||
private final Configuration conf;
|
private final Configuration conf;
|
||||||
private final BookKeeper bkc;
|
private final BookKeeper bkc;
|
||||||
private final CurrentInprogress ci;
|
private final CurrentInprogress ci;
|
||||||
|
private final String basePath;
|
||||||
private final String ledgerPath;
|
private final String ledgerPath;
|
||||||
|
private final String versionPath;
|
||||||
private final MaxTxId maxTxId;
|
private final MaxTxId maxTxId;
|
||||||
private final int ensembleSize;
|
private final int ensembleSize;
|
||||||
private final int quorumSize;
|
private final int quorumSize;
|
||||||
private final String digestpw;
|
private final String digestpw;
|
||||||
private final CountDownLatch zkConnectLatch;
|
private final CountDownLatch zkConnectLatch;
|
||||||
private final NamespaceInfo nsInfo;
|
private final NamespaceInfo nsInfo;
|
||||||
|
private boolean initialized = false;
|
||||||
private LedgerHandle currentLedger = null;
|
private LedgerHandle currentLedger = null;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -160,16 +165,16 @@ public BookKeeperJournalManager(Configuration conf, URI uri,
|
|||||||
this.nsInfo = nsInfo;
|
this.nsInfo = nsInfo;
|
||||||
|
|
||||||
String zkConnect = uri.getAuthority().replace(";", ",");
|
String zkConnect = uri.getAuthority().replace(";", ",");
|
||||||
String zkPath = uri.getPath();
|
basePath = uri.getPath();
|
||||||
ensembleSize = conf.getInt(BKJM_BOOKKEEPER_ENSEMBLE_SIZE,
|
ensembleSize = conf.getInt(BKJM_BOOKKEEPER_ENSEMBLE_SIZE,
|
||||||
BKJM_BOOKKEEPER_ENSEMBLE_SIZE_DEFAULT);
|
BKJM_BOOKKEEPER_ENSEMBLE_SIZE_DEFAULT);
|
||||||
quorumSize = conf.getInt(BKJM_BOOKKEEPER_QUORUM_SIZE,
|
quorumSize = conf.getInt(BKJM_BOOKKEEPER_QUORUM_SIZE,
|
||||||
BKJM_BOOKKEEPER_QUORUM_SIZE_DEFAULT);
|
BKJM_BOOKKEEPER_QUORUM_SIZE_DEFAULT);
|
||||||
|
|
||||||
ledgerPath = zkPath + "/ledgers";
|
ledgerPath = basePath + "/ledgers";
|
||||||
String maxTxIdPath = zkPath + "/maxtxid";
|
String maxTxIdPath = basePath + "/maxtxid";
|
||||||
String currentInprogressNodePath = zkPath + "/CurrentInprogress";
|
String currentInprogressNodePath = basePath + "/CurrentInprogress";
|
||||||
String versionPath = zkPath + "/version";
|
versionPath = basePath + "/version";
|
||||||
digestpw = conf.get(BKJM_BOOKKEEPER_DIGEST_PW,
|
digestpw = conf.get(BKJM_BOOKKEEPER_DIGEST_PW,
|
||||||
BKJM_BOOKKEEPER_DIGEST_PW_DEFAULT);
|
BKJM_BOOKKEEPER_DIGEST_PW_DEFAULT);
|
||||||
|
|
||||||
@ -180,47 +185,7 @@ public BookKeeperJournalManager(Configuration conf, URI uri,
|
|||||||
if (!zkConnectLatch.await(6000, TimeUnit.MILLISECONDS)) {
|
if (!zkConnectLatch.await(6000, TimeUnit.MILLISECONDS)) {
|
||||||
throw new IOException("Error connecting to zookeeper");
|
throw new IOException("Error connecting to zookeeper");
|
||||||
}
|
}
|
||||||
if (zkc.exists(zkPath, false) == null) {
|
|
||||||
zkc.create(zkPath, new byte[] {'0'},
|
|
||||||
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
|
|
||||||
}
|
|
||||||
|
|
||||||
Stat versionStat = zkc.exists(versionPath, false);
|
|
||||||
if (versionStat != null) {
|
|
||||||
byte[] d = zkc.getData(versionPath, false, versionStat);
|
|
||||||
VersionProto.Builder builder = VersionProto.newBuilder();
|
|
||||||
TextFormat.merge(new String(d, UTF_8), builder);
|
|
||||||
if (!builder.isInitialized()) {
|
|
||||||
throw new IOException("Invalid/Incomplete data in znode");
|
|
||||||
}
|
|
||||||
VersionProto vp = builder.build();
|
|
||||||
|
|
||||||
// There's only one version at the moment
|
|
||||||
assert vp.getLayoutVersion() == BKJM_LAYOUT_VERSION;
|
|
||||||
|
|
||||||
NamespaceInfo readns = PBHelper.convert(vp.getNamespaceInfo());
|
|
||||||
|
|
||||||
if (nsInfo.getNamespaceID() != readns.getNamespaceID() ||
|
|
||||||
!nsInfo.clusterID.equals(readns.getClusterID()) ||
|
|
||||||
!nsInfo.getBlockPoolID().equals(readns.getBlockPoolID())) {
|
|
||||||
String err = String.format("Environment mismatch. Running process %s"
|
|
||||||
+", stored in ZK %s", nsInfo, readns);
|
|
||||||
LOG.error(err);
|
|
||||||
throw new IOException(err);
|
|
||||||
}
|
|
||||||
} else if (nsInfo.getNamespaceID() > 0) {
|
|
||||||
VersionProto.Builder builder = VersionProto.newBuilder();
|
|
||||||
builder.setNamespaceInfo(PBHelper.convert(nsInfo))
|
|
||||||
.setLayoutVersion(BKJM_LAYOUT_VERSION);
|
|
||||||
byte[] data = TextFormat.printToString(builder.build()).getBytes(UTF_8);
|
|
||||||
zkc.create(versionPath, data,
|
|
||||||
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (zkc.exists(ledgerPath, false) == null) {
|
|
||||||
zkc.create(ledgerPath, new byte[] {'0'},
|
|
||||||
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
|
|
||||||
}
|
|
||||||
prepareBookKeeperEnv();
|
prepareBookKeeperEnv();
|
||||||
bkc = new BookKeeper(new ClientConfiguration(), zkc);
|
bkc = new BookKeeper(new ClientConfiguration(), zkc);
|
||||||
} catch (KeeperException e) {
|
} catch (KeeperException e) {
|
||||||
@ -244,6 +209,7 @@ private void prepareBookKeeperEnv() throws IOException {
|
|||||||
BKJM_ZK_LEDGERS_AVAILABLE_PATH_DEFAULT);
|
BKJM_ZK_LEDGERS_AVAILABLE_PATH_DEFAULT);
|
||||||
final CountDownLatch zkPathLatch = new CountDownLatch(1);
|
final CountDownLatch zkPathLatch = new CountDownLatch(1);
|
||||||
|
|
||||||
|
final AtomicBoolean success = new AtomicBoolean(false);
|
||||||
StringCallback callback = new StringCallback() {
|
StringCallback callback = new StringCallback() {
|
||||||
@Override
|
@Override
|
||||||
public void processResult(int rc, String path, Object ctx, String name) {
|
public void processResult(int rc, String path, Object ctx, String name) {
|
||||||
@ -251,22 +217,23 @@ public void processResult(int rc, String path, Object ctx, String name) {
|
|||||||
|| KeeperException.Code.NODEEXISTS.intValue() == rc) {
|
|| KeeperException.Code.NODEEXISTS.intValue() == rc) {
|
||||||
LOG.info("Successfully created bookie available path : "
|
LOG.info("Successfully created bookie available path : "
|
||||||
+ zkAvailablePath);
|
+ zkAvailablePath);
|
||||||
zkPathLatch.countDown();
|
success.set(true);
|
||||||
} else {
|
} else {
|
||||||
KeeperException.Code code = KeeperException.Code.get(rc);
|
KeeperException.Code code = KeeperException.Code.get(rc);
|
||||||
LOG
|
LOG.error("Error : "
|
||||||
.error("Error : "
|
|
||||||
+ KeeperException.create(code, path).getMessage()
|
+ KeeperException.create(code, path).getMessage()
|
||||||
+ ", failed to create bookie available path : "
|
+ ", failed to create bookie available path : "
|
||||||
+ zkAvailablePath);
|
+ zkAvailablePath);
|
||||||
}
|
}
|
||||||
|
zkPathLatch.countDown();
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
ZkUtils.createFullPathOptimistic(zkc, zkAvailablePath, new byte[0],
|
ZkUtils.createFullPathOptimistic(zkc, zkAvailablePath, new byte[0],
|
||||||
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, callback, null);
|
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, callback, null);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
if (!zkPathLatch.await(zkc.getSessionTimeout(), TimeUnit.MILLISECONDS)) {
|
if (!zkPathLatch.await(zkc.getSessionTimeout(), TimeUnit.MILLISECONDS)
|
||||||
|
|| !success.get()) {
|
||||||
throw new IOException("Couldn't create bookie available path :"
|
throw new IOException("Couldn't create bookie available path :"
|
||||||
+ zkAvailablePath + ", timed out " + zkc.getSessionTimeout()
|
+ zkAvailablePath + ", timed out " + zkc.getSessionTimeout()
|
||||||
+ " millis");
|
+ " millis");
|
||||||
@ -281,19 +248,101 @@ public void processResult(int rc, String path, Object ctx, String name) {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void format(NamespaceInfo ns) throws IOException {
|
public void format(NamespaceInfo ns) throws IOException {
|
||||||
// Currently, BKJM automatically formats itself when first accessed.
|
try {
|
||||||
// TODO: change over to explicit formatting so that the admin can
|
// delete old info
|
||||||
// clear out the BK storage when reformatting a cluster.
|
Stat baseStat = null;
|
||||||
LOG.info("Not formatting " + this + " - BKJM does not currently " +
|
Stat ledgerStat = null;
|
||||||
"support reformatting. If it has not been used before, it will" +
|
if ((baseStat = zkc.exists(basePath, false)) != null) {
|
||||||
"be formatted automatically upon first use.");
|
if ((ledgerStat = zkc.exists(ledgerPath, false)) != null) {
|
||||||
|
for (EditLogLedgerMetadata l : getLedgerList(true)) {
|
||||||
|
try {
|
||||||
|
bkc.deleteLedger(l.getLedgerId());
|
||||||
|
} catch (BKException.BKNoSuchLedgerExistsException bke) {
|
||||||
|
LOG.warn("Ledger " + l.getLedgerId() + " does not exist;"
|
||||||
|
+ " Cannot delete.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ZKUtil.deleteRecursive(zkc, basePath);
|
||||||
|
}
|
||||||
|
|
||||||
|
// should be clean now.
|
||||||
|
zkc.create(basePath, new byte[] {'0'},
|
||||||
|
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
|
||||||
|
|
||||||
|
VersionProto.Builder builder = VersionProto.newBuilder();
|
||||||
|
builder.setNamespaceInfo(PBHelper.convert(ns))
|
||||||
|
.setLayoutVersion(BKJM_LAYOUT_VERSION);
|
||||||
|
|
||||||
|
byte[] data = TextFormat.printToString(builder.build()).getBytes(UTF_8);
|
||||||
|
zkc.create(versionPath, data,
|
||||||
|
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
|
||||||
|
|
||||||
|
zkc.create(ledgerPath, new byte[] {'0'},
|
||||||
|
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
|
||||||
|
} catch (KeeperException ke) {
|
||||||
|
LOG.error("Error accessing zookeeper to format", ke);
|
||||||
|
throw new IOException("Error accessing zookeeper to format", ke);
|
||||||
|
} catch (InterruptedException ie) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
throw new IOException("Interrupted during format", ie);
|
||||||
|
} catch (BKException bke) {
|
||||||
|
throw new IOException("Error cleaning up ledgers during format", bke);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean hasSomeData() throws IOException {
|
public boolean hasSomeData() throws IOException {
|
||||||
// Don't confirm format on BKJM, since format() is currently a
|
try {
|
||||||
// no-op anyway
|
return zkc.exists(basePath, false) != null;
|
||||||
return false;
|
} catch (KeeperException ke) {
|
||||||
|
throw new IOException("Couldn't contact zookeeper", ke);
|
||||||
|
} catch (InterruptedException ie) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
throw new IOException("Interrupted while checking for data", ie);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
synchronized private void checkEnv() throws IOException {
|
||||||
|
if (!initialized) {
|
||||||
|
try {
|
||||||
|
Stat versionStat = zkc.exists(versionPath, false);
|
||||||
|
if (versionStat == null) {
|
||||||
|
throw new IOException("Environment not initialized. "
|
||||||
|
+"Have you forgotten to format?");
|
||||||
|
}
|
||||||
|
byte[] d = zkc.getData(versionPath, false, versionStat);
|
||||||
|
|
||||||
|
VersionProto.Builder builder = VersionProto.newBuilder();
|
||||||
|
TextFormat.merge(new String(d, UTF_8), builder);
|
||||||
|
if (!builder.isInitialized()) {
|
||||||
|
throw new IOException("Invalid/Incomplete data in znode");
|
||||||
|
}
|
||||||
|
VersionProto vp = builder.build();
|
||||||
|
|
||||||
|
// There's only one version at the moment
|
||||||
|
assert vp.getLayoutVersion() == BKJM_LAYOUT_VERSION;
|
||||||
|
|
||||||
|
NamespaceInfo readns = PBHelper.convert(vp.getNamespaceInfo());
|
||||||
|
|
||||||
|
if (nsInfo.getNamespaceID() != readns.getNamespaceID() ||
|
||||||
|
!nsInfo.clusterID.equals(readns.getClusterID()) ||
|
||||||
|
!nsInfo.getBlockPoolID().equals(readns.getBlockPoolID())) {
|
||||||
|
String err = String.format("Environment mismatch. Running process %s"
|
||||||
|
+", stored in ZK %s", nsInfo, readns);
|
||||||
|
LOG.error(err);
|
||||||
|
throw new IOException(err);
|
||||||
|
}
|
||||||
|
|
||||||
|
ci.init();
|
||||||
|
initialized = true;
|
||||||
|
} catch (KeeperException ke) {
|
||||||
|
throw new IOException("Cannot access ZooKeeper", ke);
|
||||||
|
} catch (InterruptedException ie) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
throw new IOException("Interrupted while checking environment", ie);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -307,6 +356,8 @@ public boolean hasSomeData() throws IOException {
|
|||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public EditLogOutputStream startLogSegment(long txId) throws IOException {
|
public EditLogOutputStream startLogSegment(long txId) throws IOException {
|
||||||
|
checkEnv();
|
||||||
|
|
||||||
if (txId <= maxTxId.get()) {
|
if (txId <= maxTxId.get()) {
|
||||||
throw new IOException("We've already seen " + txId
|
throw new IOException("We've already seen " + txId
|
||||||
+ ". A new stream cannot be created with it");
|
+ ". A new stream cannot be created with it");
|
||||||
@ -384,6 +435,8 @@ private void cleanupLedger(LedgerHandle lh) {
|
|||||||
@Override
|
@Override
|
||||||
public void finalizeLogSegment(long firstTxId, long lastTxId)
|
public void finalizeLogSegment(long firstTxId, long lastTxId)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
checkEnv();
|
||||||
|
|
||||||
String inprogressPath = inprogressZNode(firstTxId);
|
String inprogressPath = inprogressZNode(firstTxId);
|
||||||
try {
|
try {
|
||||||
Stat inprogressStat = zkc.exists(inprogressPath, false);
|
Stat inprogressStat = zkc.exists(inprogressPath, false);
|
||||||
@ -537,6 +590,8 @@ long getNumberOfTransactions(long fromTxId, boolean inProgressOk)
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void recoverUnfinalizedSegments() throws IOException {
|
public void recoverUnfinalizedSegments() throws IOException {
|
||||||
|
checkEnv();
|
||||||
|
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
try {
|
try {
|
||||||
List<String> children = zkc.getChildren(ledgerPath, false);
|
List<String> children = zkc.getChildren(ledgerPath, false);
|
||||||
@ -589,6 +644,8 @@ public void recoverUnfinalizedSegments() throws IOException {
|
|||||||
@Override
|
@Override
|
||||||
public void purgeLogsOlderThan(long minTxIdToKeep)
|
public void purgeLogsOlderThan(long minTxIdToKeep)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
checkEnv();
|
||||||
|
|
||||||
for (EditLogLedgerMetadata l : getLedgerList(false)) {
|
for (EditLogLedgerMetadata l : getLedgerList(false)) {
|
||||||
if (l.getLastTxId() < minTxIdToKeep) {
|
if (l.getLastTxId() < minTxIdToKeep) {
|
||||||
try {
|
try {
|
||||||
|
@ -56,6 +56,9 @@ class CurrentInprogress {
|
|||||||
CurrentInprogress(ZooKeeper zkc, String lockpath) throws IOException {
|
CurrentInprogress(ZooKeeper zkc, String lockpath) throws IOException {
|
||||||
this.currentInprogressNode = lockpath;
|
this.currentInprogressNode = lockpath;
|
||||||
this.zkc = zkc;
|
this.zkc = zkc;
|
||||||
|
}
|
||||||
|
|
||||||
|
void init() throws IOException {
|
||||||
try {
|
try {
|
||||||
Stat isCurrentInprogressNodeExists = zkc.exists(currentInprogressNode,
|
Stat isCurrentInprogressNodeExists = zkc.exists(currentInprogressNode,
|
||||||
false);
|
false);
|
||||||
@ -96,15 +99,14 @@ void update(String path) throws IOException {
|
|||||||
this.versionNumberForPermission);
|
this.versionNumberForPermission);
|
||||||
} catch (KeeperException e) {
|
} catch (KeeperException e) {
|
||||||
throw new IOException("Exception when setting the data "
|
throw new IOException("Exception when setting the data "
|
||||||
+ "[layout version number,hostname,inprogressNode path]= [" + content
|
+ "[" + content + "] to CurrentInprogress. ", e);
|
||||||
+ "] to CurrentInprogress. ", e);
|
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
throw new IOException("Interrupted while setting the data "
|
throw new IOException("Interrupted while setting the data "
|
||||||
+ "[layout version number,hostname,inprogressNode path]= [" + content
|
+ "[" + content + "] to CurrentInprogress", e);
|
||||||
+ "] to CurrentInprogress", e);
|
}
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Updated data[" + content + "] to CurrentInprogress");
|
||||||
}
|
}
|
||||||
LOG.info("Updated data[layout version number,hostname,inprogressNode path]"
|
|
||||||
+ "= [" + content + "] to CurrentInprogress");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -136,7 +138,7 @@ String read() throws IOException {
|
|||||||
}
|
}
|
||||||
return builder.build().getPath();
|
return builder.build().getPath();
|
||||||
} else {
|
} else {
|
||||||
LOG.info("No data available in CurrentInprogress");
|
LOG.debug("No data available in CurrentInprogress");
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
@ -152,7 +154,7 @@ void clear() throws IOException {
|
|||||||
throw new IOException(
|
throw new IOException(
|
||||||
"Interrupted when setting the data to CurrentInprogress node", e);
|
"Interrupted when setting the data to CurrentInprogress node", e);
|
||||||
}
|
}
|
||||||
LOG.info("Cleared the data from CurrentInprogress");
|
LOG.debug("Cleared the data from CurrentInprogress");
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -149,6 +149,7 @@ public void testWithConfiguringBKAvailablePath() throws Exception {
|
|||||||
bkjm = new BookKeeperJournalManager(conf,
|
bkjm = new BookKeeperJournalManager(conf,
|
||||||
URI.create("bookkeeper://" + HOSTPORT + "/hdfsjournal-WithBKPath"),
|
URI.create("bookkeeper://" + HOSTPORT + "/hdfsjournal-WithBKPath"),
|
||||||
nsi);
|
nsi);
|
||||||
|
bkjm.format(nsi);
|
||||||
Assert.assertNotNull("Bookie available path : " + bkAvailablePath
|
Assert.assertNotNull("Bookie available path : " + bkAvailablePath
|
||||||
+ " doesn't exists", zkc.exists(bkAvailablePath, false));
|
+ " doesn't exists", zkc.exists(bkAvailablePath, false));
|
||||||
}
|
}
|
||||||
@ -166,6 +167,7 @@ public void testDefaultBKAvailablePath() throws Exception {
|
|||||||
bkjm = new BookKeeperJournalManager(conf,
|
bkjm = new BookKeeperJournalManager(conf,
|
||||||
URI.create("bookkeeper://" + HOSTPORT + "/hdfsjournal-DefaultBKPath"),
|
URI.create("bookkeeper://" + HOSTPORT + "/hdfsjournal-DefaultBKPath"),
|
||||||
nsi);
|
nsi);
|
||||||
|
bkjm.format(nsi);
|
||||||
Assert.assertNotNull("Bookie available path : " + BK_ROOT_PATH
|
Assert.assertNotNull("Bookie available path : " + BK_ROOT_PATH
|
||||||
+ " doesn't exists", zkc.exists(BK_ROOT_PATH, false));
|
+ " doesn't exists", zkc.exists(BK_ROOT_PATH, false));
|
||||||
}
|
}
|
||||||
|
@ -29,8 +29,16 @@
|
|||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
|
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
|
import java.util.concurrent.CyclicBarrier;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
|
||||||
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
|
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
|
||||||
@ -90,6 +98,7 @@ public void testSimpleWrite() throws Exception {
|
|||||||
NamespaceInfo nsi = newNSInfo();
|
NamespaceInfo nsi = newNSInfo();
|
||||||
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
|
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
|
||||||
BKJMUtil.createJournalURI("/hdfsjournal-simplewrite"), nsi);
|
BKJMUtil.createJournalURI("/hdfsjournal-simplewrite"), nsi);
|
||||||
|
bkjm.format(nsi);
|
||||||
|
|
||||||
EditLogOutputStream out = bkjm.startLogSegment(1);
|
EditLogOutputStream out = bkjm.startLogSegment(1);
|
||||||
for (long i = 1 ; i <= 100; i++) {
|
for (long i = 1 ; i <= 100; i++) {
|
||||||
@ -112,6 +121,8 @@ public void testNumberOfTransactions() throws Exception {
|
|||||||
|
|
||||||
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
|
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
|
||||||
BKJMUtil.createJournalURI("/hdfsjournal-txncount"), nsi);
|
BKJMUtil.createJournalURI("/hdfsjournal-txncount"), nsi);
|
||||||
|
bkjm.format(nsi);
|
||||||
|
|
||||||
EditLogOutputStream out = bkjm.startLogSegment(1);
|
EditLogOutputStream out = bkjm.startLogSegment(1);
|
||||||
for (long i = 1 ; i <= 100; i++) {
|
for (long i = 1 ; i <= 100; i++) {
|
||||||
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
|
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
|
||||||
@ -130,6 +141,7 @@ public void testNumberOfTransactionsWithGaps() throws Exception {
|
|||||||
NamespaceInfo nsi = newNSInfo();
|
NamespaceInfo nsi = newNSInfo();
|
||||||
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
|
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
|
||||||
BKJMUtil.createJournalURI("/hdfsjournal-gaps"), nsi);
|
BKJMUtil.createJournalURI("/hdfsjournal-gaps"), nsi);
|
||||||
|
bkjm.format(nsi);
|
||||||
|
|
||||||
long txid = 1;
|
long txid = 1;
|
||||||
for (long i = 0; i < 3; i++) {
|
for (long i = 0; i < 3; i++) {
|
||||||
@ -167,6 +179,7 @@ public void testNumberOfTransactionsWithInprogressAtEnd() throws Exception {
|
|||||||
NamespaceInfo nsi = newNSInfo();
|
NamespaceInfo nsi = newNSInfo();
|
||||||
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
|
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
|
||||||
BKJMUtil.createJournalURI("/hdfsjournal-inprogressAtEnd"), nsi);
|
BKJMUtil.createJournalURI("/hdfsjournal-inprogressAtEnd"), nsi);
|
||||||
|
bkjm.format(nsi);
|
||||||
|
|
||||||
long txid = 1;
|
long txid = 1;
|
||||||
for (long i = 0; i < 3; i++) {
|
for (long i = 0; i < 3; i++) {
|
||||||
@ -208,6 +221,7 @@ public void testWriteRestartFrom1() throws Exception {
|
|||||||
NamespaceInfo nsi = newNSInfo();
|
NamespaceInfo nsi = newNSInfo();
|
||||||
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
|
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
|
||||||
BKJMUtil.createJournalURI("/hdfsjournal-restartFrom1"), nsi);
|
BKJMUtil.createJournalURI("/hdfsjournal-restartFrom1"), nsi);
|
||||||
|
bkjm.format(nsi);
|
||||||
|
|
||||||
long txid = 1;
|
long txid = 1;
|
||||||
long start = txid;
|
long start = txid;
|
||||||
@ -266,6 +280,7 @@ public void testTwoWriters() throws Exception {
|
|||||||
|
|
||||||
BookKeeperJournalManager bkjm1 = new BookKeeperJournalManager(conf,
|
BookKeeperJournalManager bkjm1 = new BookKeeperJournalManager(conf,
|
||||||
BKJMUtil.createJournalURI("/hdfsjournal-dualWriter"), nsi);
|
BKJMUtil.createJournalURI("/hdfsjournal-dualWriter"), nsi);
|
||||||
|
bkjm1.format(nsi);
|
||||||
|
|
||||||
BookKeeperJournalManager bkjm2 = new BookKeeperJournalManager(conf,
|
BookKeeperJournalManager bkjm2 = new BookKeeperJournalManager(conf,
|
||||||
BKJMUtil.createJournalURI("/hdfsjournal-dualWriter"), nsi);
|
BKJMUtil.createJournalURI("/hdfsjournal-dualWriter"), nsi);
|
||||||
@ -288,6 +303,7 @@ public void testSimpleRead() throws Exception {
|
|||||||
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
|
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
|
||||||
BKJMUtil.createJournalURI("/hdfsjournal-simpleread"),
|
BKJMUtil.createJournalURI("/hdfsjournal-simpleread"),
|
||||||
nsi);
|
nsi);
|
||||||
|
bkjm.format(nsi);
|
||||||
|
|
||||||
final long numTransactions = 10000;
|
final long numTransactions = 10000;
|
||||||
EditLogOutputStream out = bkjm.startLogSegment(1);
|
EditLogOutputStream out = bkjm.startLogSegment(1);
|
||||||
@ -315,6 +331,7 @@ public void testSimpleRecovery() throws Exception {
|
|||||||
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
|
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
|
||||||
BKJMUtil.createJournalURI("/hdfsjournal-simplerecovery"),
|
BKJMUtil.createJournalURI("/hdfsjournal-simplerecovery"),
|
||||||
nsi);
|
nsi);
|
||||||
|
bkjm.format(nsi);
|
||||||
|
|
||||||
EditLogOutputStream out = bkjm.startLogSegment(1);
|
EditLogOutputStream out = bkjm.startLogSegment(1);
|
||||||
for (long i = 1 ; i <= 100; i++) {
|
for (long i = 1 ; i <= 100; i++) {
|
||||||
@ -365,6 +382,7 @@ public void testAllBookieFailure() throws Exception {
|
|||||||
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
|
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
|
||||||
BKJMUtil.createJournalURI("/hdfsjournal-allbookiefailure"),
|
BKJMUtil.createJournalURI("/hdfsjournal-allbookiefailure"),
|
||||||
nsi);
|
nsi);
|
||||||
|
bkjm.format(nsi);
|
||||||
EditLogOutputStream out = bkjm.startLogSegment(txid);
|
EditLogOutputStream out = bkjm.startLogSegment(txid);
|
||||||
|
|
||||||
for (long i = 1 ; i <= 3; i++) {
|
for (long i = 1 ; i <= 3; i++) {
|
||||||
@ -450,6 +468,7 @@ public void testOneBookieFailure() throws Exception {
|
|||||||
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
|
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
|
||||||
BKJMUtil.createJournalURI("/hdfsjournal-onebookiefailure"),
|
BKJMUtil.createJournalURI("/hdfsjournal-onebookiefailure"),
|
||||||
nsi);
|
nsi);
|
||||||
|
bkjm.format(nsi);
|
||||||
|
|
||||||
EditLogOutputStream out = bkjm.startLogSegment(txid);
|
EditLogOutputStream out = bkjm.startLogSegment(txid);
|
||||||
for (long i = 1 ; i <= 3; i++) {
|
for (long i = 1 ; i <= 3; i++) {
|
||||||
@ -500,6 +519,7 @@ public void testEmptyInprogressNode() throws Exception {
|
|||||||
NamespaceInfo nsi = newNSInfo();
|
NamespaceInfo nsi = newNSInfo();
|
||||||
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri,
|
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri,
|
||||||
nsi);
|
nsi);
|
||||||
|
bkjm.format(nsi);
|
||||||
|
|
||||||
EditLogOutputStream out = bkjm.startLogSegment(1);
|
EditLogOutputStream out = bkjm.startLogSegment(1);
|
||||||
for (long i = 1; i <= 100; i++) {
|
for (long i = 1; i <= 100; i++) {
|
||||||
@ -541,6 +561,7 @@ public void testCorruptInprogressNode() throws Exception {
|
|||||||
NamespaceInfo nsi = newNSInfo();
|
NamespaceInfo nsi = newNSInfo();
|
||||||
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri,
|
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri,
|
||||||
nsi);
|
nsi);
|
||||||
|
bkjm.format(nsi);
|
||||||
|
|
||||||
EditLogOutputStream out = bkjm.startLogSegment(1);
|
EditLogOutputStream out = bkjm.startLogSegment(1);
|
||||||
for (long i = 1; i <= 100; i++) {
|
for (long i = 1; i <= 100; i++) {
|
||||||
@ -583,6 +604,7 @@ public void testEmptyInprogressLedger() throws Exception {
|
|||||||
NamespaceInfo nsi = newNSInfo();
|
NamespaceInfo nsi = newNSInfo();
|
||||||
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri,
|
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri,
|
||||||
nsi);
|
nsi);
|
||||||
|
bkjm.format(nsi);
|
||||||
|
|
||||||
EditLogOutputStream out = bkjm.startLogSegment(1);
|
EditLogOutputStream out = bkjm.startLogSegment(1);
|
||||||
for (long i = 1; i <= 100; i++) {
|
for (long i = 1; i <= 100; i++) {
|
||||||
@ -622,6 +644,7 @@ public void testRefinalizeAlreadyFinalizedInprogress() throws Exception {
|
|||||||
NamespaceInfo nsi = newNSInfo();
|
NamespaceInfo nsi = newNSInfo();
|
||||||
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri,
|
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri,
|
||||||
nsi);
|
nsi);
|
||||||
|
bkjm.format(nsi);
|
||||||
|
|
||||||
EditLogOutputStream out = bkjm.startLogSegment(1);
|
EditLogOutputStream out = bkjm.startLogSegment(1);
|
||||||
for (long i = 1; i <= 100; i++) {
|
for (long i = 1; i <= 100; i++) {
|
||||||
@ -669,6 +692,7 @@ public void testEditLogFileNotExistsWhenReadingMetadata() throws Exception {
|
|||||||
NamespaceInfo nsi = newNSInfo();
|
NamespaceInfo nsi = newNSInfo();
|
||||||
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri,
|
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri,
|
||||||
nsi);
|
nsi);
|
||||||
|
bkjm.format(nsi);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// start new inprogress log segment with txid=1
|
// start new inprogress log segment with txid=1
|
||||||
@ -697,6 +721,81 @@ public void testEditLogFileNotExistsWhenReadingMetadata() throws Exception {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private enum ThreadStatus {
|
||||||
|
COMPLETED, GOODEXCEPTION, BADEXCEPTION;
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests that concurrent calls to format will still allow one to succeed.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testConcurrentFormat() throws Exception {
|
||||||
|
final URI uri = BKJMUtil.createJournalURI("/hdfsjournal-concurrentformat");
|
||||||
|
final NamespaceInfo nsi = newNSInfo();
|
||||||
|
|
||||||
|
// populate with data first
|
||||||
|
BookKeeperJournalManager bkjm
|
||||||
|
= new BookKeeperJournalManager(conf, uri, nsi);
|
||||||
|
bkjm.format(nsi);
|
||||||
|
for (int i = 1; i < 100*2; i += 2) {
|
||||||
|
bkjm.startLogSegment(i);
|
||||||
|
bkjm.finalizeLogSegment(i, i+1);
|
||||||
|
}
|
||||||
|
bkjm.close();
|
||||||
|
|
||||||
|
final int numThreads = 40;
|
||||||
|
List<Callable<ThreadStatus>> threads
|
||||||
|
= new ArrayList<Callable<ThreadStatus>>();
|
||||||
|
final CyclicBarrier barrier = new CyclicBarrier(numThreads);
|
||||||
|
|
||||||
|
for (int i = 0; i < numThreads; i++) {
|
||||||
|
threads.add(new Callable<ThreadStatus>() {
|
||||||
|
public ThreadStatus call() {
|
||||||
|
BookKeeperJournalManager bkjm = null;
|
||||||
|
try {
|
||||||
|
bkjm = new BookKeeperJournalManager(conf, uri, nsi);
|
||||||
|
barrier.await();
|
||||||
|
bkjm.format(nsi);
|
||||||
|
return ThreadStatus.COMPLETED;
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
LOG.info("Exception formatting ", ioe);
|
||||||
|
return ThreadStatus.GOODEXCEPTION;
|
||||||
|
} catch (InterruptedException ie) {
|
||||||
|
LOG.error("Interrupted. Something is broken", ie);
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
return ThreadStatus.BADEXCEPTION;
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("Some other bad exception", e);
|
||||||
|
return ThreadStatus.BADEXCEPTION;
|
||||||
|
} finally {
|
||||||
|
if (bkjm != null) {
|
||||||
|
try {
|
||||||
|
bkjm.close();
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
LOG.error("Error closing journal manager", ioe);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
ExecutorService service = Executors.newFixedThreadPool(numThreads);
|
||||||
|
List<Future<ThreadStatus>> statuses = service.invokeAll(threads, 60,
|
||||||
|
TimeUnit.SECONDS);
|
||||||
|
int numCompleted = 0;
|
||||||
|
for (Future<ThreadStatus> s : statuses) {
|
||||||
|
assertTrue(s.isDone());
|
||||||
|
assertTrue("Thread threw invalid exception",
|
||||||
|
s.get() == ThreadStatus.COMPLETED
|
||||||
|
|| s.get() == ThreadStatus.GOODEXCEPTION);
|
||||||
|
if (s.get() == ThreadStatus.COMPLETED) {
|
||||||
|
numCompleted++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
LOG.info("Completed " + numCompleted + " formats");
|
||||||
|
assertTrue("No thread managed to complete formatting", numCompleted > 0);
|
||||||
|
}
|
||||||
|
|
||||||
private String startAndFinalizeLogSegment(BookKeeperJournalManager bkjm,
|
private String startAndFinalizeLogSegment(BookKeeperJournalManager bkjm,
|
||||||
int startTxid, int endTxid) throws IOException, KeeperException,
|
int startTxid, int endTxid) throws IOException, KeeperException,
|
||||||
InterruptedException {
|
InterruptedException {
|
||||||
|
@ -118,6 +118,7 @@ public void teardown() throws Exception {
|
|||||||
public void testReadShouldReturnTheZnodePathAfterUpdate() throws Exception {
|
public void testReadShouldReturnTheZnodePathAfterUpdate() throws Exception {
|
||||||
String data = "inprogressNode";
|
String data = "inprogressNode";
|
||||||
CurrentInprogress ci = new CurrentInprogress(zkc, CURRENT_NODE_PATH);
|
CurrentInprogress ci = new CurrentInprogress(zkc, CURRENT_NODE_PATH);
|
||||||
|
ci.init();
|
||||||
ci.update(data);
|
ci.update(data);
|
||||||
String inprogressNodePath = ci.read();
|
String inprogressNodePath = ci.read();
|
||||||
assertEquals("Not returning inprogressZnode", "inprogressNode",
|
assertEquals("Not returning inprogressZnode", "inprogressNode",
|
||||||
@ -131,6 +132,7 @@ public void testReadShouldReturnTheZnodePathAfterUpdate() throws Exception {
|
|||||||
@Test
|
@Test
|
||||||
public void testReadShouldReturnNullAfterClear() throws Exception {
|
public void testReadShouldReturnNullAfterClear() throws Exception {
|
||||||
CurrentInprogress ci = new CurrentInprogress(zkc, CURRENT_NODE_PATH);
|
CurrentInprogress ci = new CurrentInprogress(zkc, CURRENT_NODE_PATH);
|
||||||
|
ci.init();
|
||||||
ci.update("myInprogressZnode");
|
ci.update("myInprogressZnode");
|
||||||
ci.read();
|
ci.read();
|
||||||
ci.clear();
|
ci.clear();
|
||||||
@ -146,6 +148,7 @@ public void testReadShouldReturnNullAfterClear() throws Exception {
|
|||||||
public void testUpdateShouldFailWithIOEIfVersionNumberChangedAfterRead()
|
public void testUpdateShouldFailWithIOEIfVersionNumberChangedAfterRead()
|
||||||
throws Exception {
|
throws Exception {
|
||||||
CurrentInprogress ci = new CurrentInprogress(zkc, CURRENT_NODE_PATH);
|
CurrentInprogress ci = new CurrentInprogress(zkc, CURRENT_NODE_PATH);
|
||||||
|
ci.init();
|
||||||
ci.update("myInprogressZnode");
|
ci.update("myInprogressZnode");
|
||||||
assertEquals("Not returning myInprogressZnode", "myInprogressZnode", ci
|
assertEquals("Not returning myInprogressZnode", "myInprogressZnode", ci
|
||||||
.read());
|
.read());
|
||||||
|
Loading…
Reference in New Issue
Block a user