HDFS-3741. Exhaustive failure injection test for skipped RPCs. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-3077@1370497 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2012-08-07 20:24:01 +00:00
parent e08604907c
commit 3a53ef4a80
10 changed files with 427 additions and 166 deletions

View File

@ -10,3 +10,5 @@ HDFS-3692. Support purgeEditLogs() call to remotely purge logs on JNs (todd)
HDFS-3693. JNStorage should read its storage info even before a writer becomes active (todd) HDFS-3693. JNStorage should read its storage info even before a writer becomes active (todd)
HDFS-3725. Fix QJM startup when individual JNs have gaps (todd) HDFS-3725. Fix QJM startup when individual JNs have gaps (todd)
HDFS-3741. Exhaustive failure injection test for skipped RPCs (todd)

View File

@ -17,14 +17,17 @@
*/ */
package org.apache.hadoop.hdfs.qjournal.client; package org.apache.hadoop.hdfs.qjournal.client;
import java.net.InetSocketAddress;
import java.net.URL; import java.net.URL;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo; import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
@ -43,6 +46,11 @@
*/ */
interface AsyncLogger { interface AsyncLogger {
interface Factory {
AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo,
String journalId, InetSocketAddress addr);
}
/** /**
* Send a batch of edits to the logger. * Send a batch of edits to the logger.
* @param firstTxnId the first txid of the edits. * @param firstTxnId the first txid of the edits.

View File

@ -25,7 +25,6 @@
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;

View File

@ -65,7 +65,7 @@
public class IPCLoggerChannel implements AsyncLogger { public class IPCLoggerChannel implements AsyncLogger {
private final Configuration conf; private final Configuration conf;
private final InetSocketAddress addr; protected final InetSocketAddress addr;
private QJournalProtocol proxy; private QJournalProtocol proxy;
private final ListeningExecutorService executor; private final ListeningExecutorService executor;
@ -88,6 +88,15 @@ public class IPCLoggerChannel implements AsyncLogger {
*/ */
private final int queueSizeLimitBytes; private final int queueSizeLimitBytes;
static final Factory FACTORY = new AsyncLogger.Factory() {
@Override
public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo,
String journalId, InetSocketAddress addr) {
return new IPCLoggerChannel(conf, nsInfo, journalId, addr);
}
};
public IPCLoggerChannel(Configuration conf, public IPCLoggerChannel(Configuration conf,
NamespaceInfo nsInfo, NamespaceInfo nsInfo,
String journalId, String journalId,
@ -131,15 +140,18 @@ public void close() {
protected QJournalProtocol getProxy() throws IOException { protected QJournalProtocol getProxy() throws IOException {
if (proxy != null) return proxy; if (proxy != null) return proxy;
proxy = createProxy();
return proxy;
}
protected QJournalProtocol createProxy() throws IOException {
RPC.setProtocolEngine(conf, RPC.setProtocolEngine(conf,
QJournalProtocolPB.class, ProtobufRpcEngine.class); QJournalProtocolPB.class, ProtobufRpcEngine.class);
QJournalProtocolPB pbproxy = RPC.getProxy( QJournalProtocolPB pbproxy = RPC.getProxy(
QJournalProtocolPB.class, QJournalProtocolPB.class,
RPC.getProtocolVersion(QJournalProtocolPB.class), RPC.getProtocolVersion(QJournalProtocolPB.class),
addr, conf); addr, conf);
proxy = new QJournalProtocolTranslatorPB(pbproxy); return new QJournalProtocolTranslatorPB(pbproxy);
return proxy;
} }
@Override @Override

View File

@ -34,8 +34,6 @@
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
@ -78,14 +76,20 @@ public class QuorumJournalManager implements JournalManager {
private final AsyncLoggerSet loggers; private final AsyncLoggerSet loggers;
public QuorumJournalManager(Configuration conf, QuorumJournalManager(Configuration conf,
URI uri, NamespaceInfo nsInfo) throws IOException { URI uri, NamespaceInfo nsInfo) throws IOException {
this(conf, uri, nsInfo, IPCLoggerChannel.FACTORY);
}
QuorumJournalManager(Configuration conf,
URI uri, NamespaceInfo nsInfo,
AsyncLogger.Factory loggerFactory) throws IOException {
Preconditions.checkArgument(conf != null, "must be configured"); Preconditions.checkArgument(conf != null, "must be configured");
this.conf = conf; this.conf = conf;
this.uri = uri; this.uri = uri;
this.nsInfo = nsInfo; this.nsInfo = nsInfo;
this.loggers = new AsyncLoggerSet(createLoggers()); this.loggers = new AsyncLoggerSet(createLoggers(loggerFactory));
// Configure timeouts. // Configure timeouts.
this.startSegmentTimeoutMs = conf.getInt( this.startSegmentTimeoutMs = conf.getInt(
@ -106,6 +110,11 @@ public QuorumJournalManager(Configuration conf,
} }
protected List<AsyncLogger> createLoggers(
AsyncLogger.Factory factory) throws IOException {
return createLoggers(conf, uri, nsInfo, factory);
}
static String parseJournalId(URI uri) { static String parseJournalId(URI uri) {
String path = uri.getPath(); String path = uri.getPath();
Preconditions.checkArgument(path != null && !path.isEmpty(), Preconditions.checkArgument(path != null && !path.isEmpty(),
@ -234,17 +243,14 @@ public int compare(
} }
}; };
protected List<AsyncLogger> createLoggers() throws IOException {
return createLoggers(conf, uri, nsInfo);
}
static List<AsyncLogger> createLoggers(Configuration conf, static List<AsyncLogger> createLoggers(Configuration conf,
URI uri, NamespaceInfo nsInfo) throws IOException { URI uri, NamespaceInfo nsInfo, AsyncLogger.Factory factory)
throws IOException {
List<AsyncLogger> ret = Lists.newArrayList(); List<AsyncLogger> ret = Lists.newArrayList();
List<InetSocketAddress> addrs = getLoggerAddresses(uri); List<InetSocketAddress> addrs = getLoggerAddresses(uri);
String jid = parseJournalId(uri); String jid = parseJournalId(uri);
for (InetSocketAddress addr : addrs) { for (InetSocketAddress addr : addrs) {
ret.add(new IPCLoggerChannel(conf, nsInfo, jid, addr)); ret.add(factory.createLogger(conf, nsInfo, jid, addr));
} }
return ret; return ret;
} }

View File

@ -17,13 +17,31 @@
*/ */
package org.apache.hadoop.hdfs.qjournal; package org.apache.hadoop.hdfs.qjournal;
import java.util.Arrays; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager;
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes;
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.DataOutputBuffer;
public abstract class QJMTestUtil { public abstract class QJMTestUtil {
public static final NamespaceInfo FAKE_NSINFO = new NamespaceInfo(
12345, "mycluster", "my-bp", 0L, 0);
public static final String JID = "test-journal";
public static byte[] createTxnData(int startTxn, int numTxns) throws Exception { public static byte[] createTxnData(int startTxn, int numTxns) throws Exception {
DataOutputBuffer buf = new DataOutputBuffer(); DataOutputBuffer buf = new DataOutputBuffer();
@ -38,4 +56,82 @@ public static byte[] createTxnData(int startTxn, int numTxns) throws Exception {
return Arrays.copyOf(buf.getData(), buf.getLength()); return Arrays.copyOf(buf.getData(), buf.getLength());
} }
public static void writeSegment(MiniJournalCluster cluster,
QuorumJournalManager qjm, int startTxId, int numTxns,
boolean finalize) throws IOException {
EditLogOutputStream stm = qjm.startLogSegment(startTxId);
// Should create in-progress
assertExistsInQuorum(cluster,
NNStorage.getInProgressEditsFileName(startTxId));
writeTxns(stm, startTxId, numTxns);
if (finalize) {
stm.close();
qjm.finalizeLogSegment(startTxId, startTxId + numTxns - 1);
}
}
public static void writeOp(EditLogOutputStream stm, long txid) throws IOException {
FSEditLogOp op = NameNodeAdapter.createMkdirOp("tx " + txid);
op.setTransactionId(txid);
stm.write(op);
}
public static void writeTxns(EditLogOutputStream stm, int startTxId, int numTxns)
throws IOException {
for (long txid = startTxId; txid < startTxId + numTxns; txid++) {
writeOp(stm, txid);
}
stm.setReadyToFlush();
stm.flush();
}
/**
* Verify that the given list of streams contains exactly the range of
* transactions specified, inclusive.
*/
public static void verifyEdits(List<EditLogInputStream> streams,
int firstTxnId, int lastTxnId) throws IOException {
Iterator<EditLogInputStream> iter = streams.iterator();
assertTrue(iter.hasNext());
EditLogInputStream stream = iter.next();
for (int expected = firstTxnId;
expected <= lastTxnId;
expected++) {
FSEditLogOp op = stream.readOp();
while (op == null) {
assertTrue("Expected to find txid " + expected + ", " +
"but no more streams available to read from",
iter.hasNext());
stream = iter.next();
op = stream.readOp();
}
assertEquals(FSEditLogOpCodes.OP_MKDIR, op.opCode);
assertEquals(expected, op.getTransactionId());
}
assertNull(stream.readOp());
assertFalse("Expected no more txns after " + lastTxnId +
" but more streams are available", iter.hasNext());
}
public static void assertExistsInQuorum(MiniJournalCluster cluster,
String fname) {
int count = 0;
for (int i = 0; i < 3; i++) {
File dir = cluster.getCurrentDir(i, JID);
if (new File(dir, fname).exists()) {
count++;
}
}
assertTrue("File " + fname + " should exist in a quorum of dirs",
count >= cluster.getQuorumSize());
}
} }

View File

@ -28,7 +28,6 @@
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster; import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster;
import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster.Builder;
import org.apache.hadoop.hdfs.qjournal.client.AsyncLogger; import org.apache.hadoop.hdfs.qjournal.client.AsyncLogger;
import org.apache.hadoop.hdfs.qjournal.client.AsyncLoggerSet; import org.apache.hadoop.hdfs.qjournal.client.AsyncLoggerSet;
import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager; import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager;
@ -59,7 +58,8 @@ public void testSingleThreaded() throws IOException {
// With no failures or contention, epochs should increase one-by-one // With no failures or contention, epochs should increase one-by-one
for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {
AsyncLoggerSet als = new AsyncLoggerSet( AsyncLoggerSet als = new AsyncLoggerSet(
QuorumJournalManager.createLoggers(conf, uri, FAKE_NSINFO)); QuorumJournalManager.createLoggers(conf, uri, FAKE_NSINFO,
IPCLoggerChannel.FACTORY));
als.createNewUniqueEpoch(FAKE_NSINFO); als.createNewUniqueEpoch(FAKE_NSINFO);
assertEquals(i + 1, als.getEpoch()); assertEquals(i + 1, als.getEpoch());
} }
@ -69,7 +69,8 @@ public void testSingleThreaded() throws IOException {
// skipping some // skipping some
for (int i = 0; i < 20; i++) { for (int i = 0; i < 20; i++) {
AsyncLoggerSet als = new AsyncLoggerSet( AsyncLoggerSet als = new AsyncLoggerSet(
makeFaulty(QuorumJournalManager.createLoggers(conf, uri, FAKE_NSINFO))); makeFaulty(QuorumJournalManager.createLoggers(conf, uri, FAKE_NSINFO,
IPCLoggerChannel.FACTORY)));
long newEpoch = -1; long newEpoch = -1;
while (true) { while (true) {
try { try {

View File

@ -0,0 +1,250 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.qjournal.client;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URISyntaxException;
import java.util.List;
import java.util.Map;
import java.util.SortedSet;
import java.util.concurrent.Callable;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster;
import org.apache.hadoop.hdfs.qjournal.client.AsyncLogger;
import org.apache.hadoop.hdfs.qjournal.client.QuorumException;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.JID;
import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.FAKE_NSINFO;
import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.writeSegment;
public class TestQJMWithFaults {
private static final Log LOG = LogFactory.getLog(
TestQJMWithFaults.class);
private static Configuration conf = new Configuration();
static {
// Don't retry connections - it just slows down the tests.
conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
}
private static long MAX_IPC_NUMBER;
/**
* Run through the creation of a log without any faults injected,
* and count how many RPCs are made to each node. This sets the
* bounds for the other test cases, so they can exhaustively explore
* the space of potential failures.
*/
@BeforeClass
public static void determineMaxIpcNumber() throws Exception {
Configuration conf = new Configuration();
MiniJournalCluster cluster = new MiniJournalCluster.Builder(conf).build();
try {
QuorumJournalManager qjm = createInjectableQJM(cluster);
doWorkload(cluster, qjm);
SortedSet<Integer> ipcCounts = Sets.newTreeSet();
for (AsyncLogger l : qjm.getLoggerSetForTests().getLoggersForTests()) {
InvocationCountingChannel ch = (InvocationCountingChannel)l;
ch.waitForAllPendingCalls();
ipcCounts.add(ch.getRpcCount());
}
// All of the loggers should have sent the same number of RPCs, since there
// were no failures.
assertEquals(1, ipcCounts.size());
MAX_IPC_NUMBER = ipcCounts.first();
LOG.info("Max IPC count = " + MAX_IPC_NUMBER);
} finally {
cluster.shutdown();
}
}
/**
* Sets up two of the nodes to each drop a single RPC, at all
* possible combinations of RPCs. This may result in the
* active writer failing to write. After this point, a new writer
* should be able to recover and continue writing without
* data loss.
*/
@Test
public void testRecoverAfterDoubleFailures() throws Exception {
for (int failA = 1; failA <= MAX_IPC_NUMBER; failA++) {
for (int failB = 1; failB <= MAX_IPC_NUMBER; failB++) {
String injectionStr = "(" + failA + ", " + failB + ")";
LOG.info("\n\n-------------------------------------------\n" +
"Beginning test, failing at " + injectionStr + "\n" +
"-------------------------------------------\n\n");
MiniJournalCluster cluster = new MiniJournalCluster.Builder(conf)
.build();
try {
QuorumJournalManager qjm;
qjm = createInjectableQJM(cluster);
List<AsyncLogger> loggers = qjm.getLoggerSetForTests().getLoggersForTests();
failIpcNumber(loggers.get(0), failA);
failIpcNumber(loggers.get(1), failB);
int lastAckedTxn = doWorkload(cluster, qjm);
if (lastAckedTxn < 6) {
LOG.info("Failed after injecting failures at " + injectionStr +
". This is expected since we injected a failure in the " +
"majority.");
}
// Now should be able to recover
try {
qjm = createInjectableQJM(cluster);
qjm.recoverUnfinalizedSegments();
writeSegment(cluster, qjm, lastAckedTxn + 1, 3, true);
// TODO: verify log segments
} catch (Throwable t) {
// Test failure! Rethrow with the test setup info so it can be
// easily triaged.
throw new RuntimeException("Test failed with injection: " + injectionStr,
t);
}
} finally {
cluster.shutdown();
cluster = null;
}
}
}
}
/**
* Run a simple workload of becoming the active writer and writing
* two log segments: 1-3 and 4-6.
*/
private static int doWorkload(MiniJournalCluster cluster,
QuorumJournalManager qjm) throws IOException {
int lastAcked = 0;
try {
qjm.recoverUnfinalizedSegments();
writeSegment(cluster, qjm, 1, 3, true);
lastAcked = 3;
writeSegment(cluster, qjm, 4, 3, true);
lastAcked = 6;
} catch (QuorumException qe) {
LOG.info("Failed to write at txid " + lastAcked,
qe);
}
return lastAcked;
}
/**
* Inject a failure at the given IPC number, such that the JN never
* receives the RPC. The client side sees an IOException. Future
* IPCs after this number will be received as usual.
*/
private void failIpcNumber(AsyncLogger logger, int idx) {
((InvocationCountingChannel)logger).failIpcNumber(idx);
}
private static class InvocationCountingChannel extends IPCLoggerChannel {
private int rpcCount = 0;
private Map<Integer, Callable<Void>> injections = Maps.newHashMap();
public InvocationCountingChannel(Configuration conf, NamespaceInfo nsInfo,
String journalId, InetSocketAddress addr) {
super(conf, nsInfo, journalId, addr);
}
int getRpcCount() {
return rpcCount;
}
void failIpcNumber(final int idx) {
Preconditions.checkArgument(idx > 0,
"id must be positive");
inject(idx, new Callable<Void>() {
@Override
public Void call() throws Exception {
throw new IOException("injected failed IPC at " + idx);
}
});
}
private void inject(int beforeRpcNumber, Callable<Void> injectedCode) {
injections.put(beforeRpcNumber, injectedCode);
}
@Override
protected QJournalProtocol createProxy() throws IOException {
final QJournalProtocol realProxy = super.createProxy();
QJournalProtocol mock = Mockito.mock(QJournalProtocol.class,
new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
rpcCount++;
String callStr = "[" + addr + "] " +
invocation.getMethod().getName() + "(" +
Joiner.on(", ").join(invocation.getArguments()) + ")";
Callable<Void> inject = injections.get(rpcCount);
if (inject != null) {
LOG.info("Injecting code before IPC #" + rpcCount + ": " +
callStr);
inject.call();
} else {
LOG.info("IPC call #" + rpcCount + ": " + callStr);
}
return invocation.getMethod().invoke(realProxy,
invocation.getArguments());
}
});
return mock;
}
}
private static QuorumJournalManager createInjectableQJM(MiniJournalCluster cluster)
throws IOException, URISyntaxException {
AsyncLogger.Factory spyFactory = new AsyncLogger.Factory() {
@Override
public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo,
String journalId, InetSocketAddress addr) {
return new InvocationCountingChannel(conf, nsInfo, journalId, addr);
}
};
return new QuorumJournalManager(conf, cluster.getQuorumJournalURI(JID),
FAKE_NSINFO, spyFactory);
}
}

View File

@ -18,14 +18,18 @@
package org.apache.hadoop.hdfs.qjournal.client; package org.apache.hadoop.hdfs.qjournal.client;
import static org.junit.Assert.*; import static org.junit.Assert.*;
import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.JID;
import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.FAKE_NSINFO;
import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.writeSegment;
import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.writeTxns;
import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.verifyEdits;
import java.io.Closeable; import java.io.Closeable;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.SortedSet;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -33,14 +37,9 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster; import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster;
import org.apache.hadoop.hdfs.qjournal.client.AsyncLogger; import org.apache.hadoop.hdfs.qjournal.QJMTestUtil;
import org.apache.hadoop.hdfs.qjournal.client.IPCLoggerChannel;
import org.apache.hadoop.hdfs.qjournal.client.QuorumException;
import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager;
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream; import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream; import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes;
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager; import org.apache.hadoop.hdfs.server.namenode.FileJournalManager;
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile; import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
import org.apache.hadoop.hdfs.server.namenode.NNStorage; import org.apache.hadoop.hdfs.server.namenode.NNStorage;
@ -55,7 +54,6 @@
import org.mockito.Mockito; import org.mockito.Mockito;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
/** /**
* Functional tests for QuorumJournalManager. * Functional tests for QuorumJournalManager.
@ -65,9 +63,6 @@ public class TestQuorumJournalManager {
private static final Log LOG = LogFactory.getLog( private static final Log LOG = LogFactory.getLog(
TestQuorumJournalManager.class); TestQuorumJournalManager.class);
private static final NamespaceInfo FAKE_NSINFO = new NamespaceInfo(
12345, "mycluster", "my-bp", 0L, 0);
private static final String JID = "testQuorumJournalManager";
private MiniJournalCluster cluster; private MiniJournalCluster cluster;
private Configuration conf; private Configuration conf;
private QuorumJournalManager qjm; private QuorumJournalManager qjm;
@ -95,18 +90,20 @@ public void setup() throws Exception {
@After @After
public void shutdown() throws IOException { public void shutdown() throws IOException {
cluster.shutdown(); if (cluster != null) {
cluster.shutdown();
}
} }
@Test @Test
public void testSingleWriter() throws Exception { public void testSingleWriter() throws Exception {
writeSegment(qjm, 1, 3, true); writeSegment(cluster, qjm, 1, 3, true);
// Should be finalized // Should be finalized
checkRecovery(cluster, 1, 3); checkRecovery(cluster, 1, 3);
// Start a new segment // Start a new segment
writeSegment(qjm, 4, 1, true); writeSegment(cluster, qjm, 4, 1, true);
// Should be finalized // Should be finalized
checkRecovery(cluster, 4, 4); checkRecovery(cluster, 4, 4);
@ -119,7 +116,7 @@ public void testReaderWhileAnotherWrites() throws Exception {
List<EditLogInputStream> streams = Lists.newArrayList(); List<EditLogInputStream> streams = Lists.newArrayList();
readerQjm.selectInputStreams(streams, 0, false); readerQjm.selectInputStreams(streams, 0, false);
assertEquals(0, streams.size()); assertEquals(0, streams.size());
writeSegment(qjm, 1, 3, true); writeSegment(cluster, qjm, 1, 3, true);
readerQjm.selectInputStreams(streams, 0, false); readerQjm.selectInputStreams(streams, 0, false);
try { try {
@ -138,7 +135,7 @@ public void testReaderWhileAnotherWrites() throws Exception {
// Ensure correct results when there is a stream in-progress, but we don't // Ensure correct results when there is a stream in-progress, but we don't
// ask for in-progress. // ask for in-progress.
writeSegment(qjm, 4, 3, false); writeSegment(cluster, qjm, 4, 3, false);
readerQjm.selectInputStreams(streams, 0, false); readerQjm.selectInputStreams(streams, 0, false);
try { try {
assertEquals(1, streams.size()); assertEquals(1, streams.size());
@ -178,13 +175,13 @@ public void testReaderWhileAnotherWrites() throws Exception {
*/ */
@Test @Test
public void testOneJNMissingSegments() throws Exception { public void testOneJNMissingSegments() throws Exception {
writeSegment(qjm, 1, 3, true); writeSegment(cluster, qjm, 1, 3, true);
waitForAllPendingCalls(qjm.getLoggerSetForTests()); waitForAllPendingCalls(qjm.getLoggerSetForTests());
cluster.getJournalNode(0).stopAndJoin(0); cluster.getJournalNode(0).stopAndJoin(0);
writeSegment(qjm, 4, 3, true); writeSegment(cluster, qjm, 4, 3, true);
waitForAllPendingCalls(qjm.getLoggerSetForTests()); waitForAllPendingCalls(qjm.getLoggerSetForTests());
cluster.restartJournalNode(0); cluster.restartJournalNode(0);
writeSegment(qjm, 7, 3, true); writeSegment(cluster, qjm, 7, 3, true);
waitForAllPendingCalls(qjm.getLoggerSetForTests()); waitForAllPendingCalls(qjm.getLoggerSetForTests());
cluster.getJournalNode(1).stopAndJoin(0); cluster.getJournalNode(1).stopAndJoin(0);
@ -199,37 +196,6 @@ public void testOneJNMissingSegments() throws Exception {
} }
} }
/**
* TODO: this test needs to be fleshed out to be an exhaustive failure test
* @throws Exception
*/
@Test
public void testOrchestratedFailures() throws Exception {
writeSegment(qjm, 1, 3, true);
writeSegment(qjm, 4, 3, true);
SortedSet<Long> serials = Sets.newTreeSet();
for (AsyncLogger l : qjm.getLoggerSetForTests().getLoggersForTests()) {
IPCLoggerChannel ch = (IPCLoggerChannel)l;
ch.waitForAllPendingCalls();
serials.add(ch.getNextIpcSerial());
}
// All of the loggers should have sent the same number of RPCs, since there
// were no failures.
assertEquals(1, serials.size());
long maxSerial = serials.first();
LOG.info("Max IPC serial = " + maxSerial);
cluster.shutdown();
cluster = new MiniJournalCluster.Builder(conf)
.build();
qjm = createSpyingQJM();
spies = qjm.getLoggerSetForTests().getLoggersForTests();
}
/** /**
* Test case where a new writer picks up from an old one with no failures * Test case where a new writer picks up from an old one with no failures
@ -238,8 +204,8 @@ public void testOrchestratedFailures() throws Exception {
*/ */
@Test @Test
public void testChangeWritersLogsInSync() throws Exception { public void testChangeWritersLogsInSync() throws Exception {
writeSegment(qjm, 1, 3, false); writeSegment(cluster, qjm, 1, 3, false);
assertExistsInQuorum(cluster, QJMTestUtil.assertExistsInQuorum(cluster,
NNStorage.getInProgressEditsFileName(1)); NNStorage.getInProgressEditsFileName(1));
// Make a new QJM // Make a new QJM
@ -301,7 +267,7 @@ private void doOutOfSyncTest(int missingOnRecoveryIdx,
qe); qe);
} }
assertExistsInQuorum(cluster, QJMTestUtil.assertExistsInQuorum(cluster,
NNStorage.getInProgressEditsFileName(1)); NNStorage.getInProgressEditsFileName(1));
// Shut down the specified JN, so it's not present during recovery. // Shut down the specified JN, so it's not present during recovery.
@ -389,7 +355,7 @@ public void testRecoverAfterIncompleteRecovery() throws Exception {
@Test @Test
public void testPurgeLogs() throws Exception { public void testPurgeLogs() throws Exception {
for (int txid = 1; txid <= 5; txid++) { for (int txid = 1; txid <= 5; txid++) {
writeSegment(qjm, txid, 1, true); writeSegment(cluster, qjm, txid, 1, true);
} }
File curDir = cluster.getCurrentDir(0, JID); File curDir = cluster.getCurrentDir(0, JID);
GenericTestUtils.assertGlobEquals(curDir, "edits_.*", GenericTestUtils.assertGlobEquals(curDir, "edits_.*",
@ -428,78 +394,18 @@ public void testPurgeLogs() throws Exception {
private QuorumJournalManager createSpyingQJM() private QuorumJournalManager createSpyingQJM()
throws IOException, URISyntaxException { throws IOException, URISyntaxException {
return new QuorumJournalManager( AsyncLogger.Factory spyFactory = new AsyncLogger.Factory() {
conf, cluster.getQuorumJournalURI(JID), FAKE_NSINFO) { @Override
@Override public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo,
protected List<AsyncLogger> createLoggers() throws IOException { String journalId, InetSocketAddress addr) {
List<AsyncLogger> realLoggers = super.createLoggers(); return Mockito.spy(IPCLoggerChannel.FACTORY.createLogger(
List<AsyncLogger> spies = Lists.newArrayList(); conf, nsInfo, journalId, addr));
for (AsyncLogger logger : realLoggers) {
spies.add(Mockito.spy(logger));
}
return spies;
}
};
}
private void writeSegment(QuorumJournalManager qjm,
int startTxId, int numTxns, boolean finalize) throws IOException {
EditLogOutputStream stm = qjm.startLogSegment(startTxId);
// Should create in-progress
assertExistsInQuorum(cluster,
NNStorage.getInProgressEditsFileName(startTxId));
writeTxns(stm, startTxId, numTxns);
if (finalize) {
stm.close();
qjm.finalizeLogSegment(startTxId, startTxId + numTxns - 1);
}
}
private void writeTxns(EditLogOutputStream stm, int startTxId, int numTxns)
throws IOException {
for (long txid = startTxId; txid < startTxId + numTxns; txid++) {
TestQuorumJournalManagerUnit.writeOp(stm, txid);
}
stm.setReadyToFlush();
stm.flush();
}
/**
* Verify that the given list of streams contains exactly the range of
* transactions specified, inclusive.
*/
private void verifyEdits(List<EditLogInputStream> streams,
int firstTxnId, int lastTxnId) throws IOException {
Iterator<EditLogInputStream> iter = streams.iterator();
assertTrue(iter.hasNext());
EditLogInputStream stream = iter.next();
for (int expected = firstTxnId;
expected <= lastTxnId;
expected++) {
FSEditLogOp op = stream.readOp();
while (op == null) {
assertTrue("Expected to find txid " + expected + ", " +
"but no more streams available to read from",
iter.hasNext());
stream = iter.next();
op = stream.readOp();
} }
};
assertEquals(FSEditLogOpCodes.OP_MKDIR, op.opCode); return new QuorumJournalManager(
assertEquals(expected, op.getTransactionId()); conf, cluster.getQuorumJournalURI(JID), FAKE_NSINFO, spyFactory);
}
assertNull(stream.readOp());
assertFalse("Expected no more txns after " + lastTxnId +
" but more streams are available", iter.hasNext());
} }
private static void waitForAllPendingCalls(AsyncLoggerSet als) private static void waitForAllPendingCalls(AsyncLoggerSet als)
throws InterruptedException { throws InterruptedException {
for (AsyncLogger l : als.getLoggersForTests()) { for (AsyncLogger l : als.getLoggersForTests()) {
@ -508,19 +414,6 @@ private static void waitForAllPendingCalls(AsyncLoggerSet als)
} }
} }
private void assertExistsInQuorum(MiniJournalCluster cluster,
String fname) {
int count = 0;
for (int i = 0; i < 3; i++) {
File dir = cluster.getCurrentDir(i, JID);
if (new File(dir, fname).exists()) {
count++;
}
}
assertTrue("File " + fname + " should exist in a quorum of dirs",
count >= cluster.getQuorumSize());
}
private void checkRecovery(MiniJournalCluster cluster, private void checkRecovery(MiniJournalCluster cluster,
long segmentTxId, long expectedEndTxId) long segmentTxId, long expectedEndTxId)
throws IOException { throws IOException {

View File

@ -32,8 +32,6 @@
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream; import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level; import org.apache.log4j.Level;
@ -47,6 +45,8 @@
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture; import com.google.common.util.concurrent.SettableFuture;
import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.writeOp;
/** /**
* True unit tests for QuorumJournalManager * True unit tests for QuorumJournalManager
*/ */
@ -70,7 +70,7 @@ public void setup() throws Exception {
qjm = new QuorumJournalManager(conf, new URI("qjournal://host/jid"), FAKE_NSINFO) { qjm = new QuorumJournalManager(conf, new URI("qjournal://host/jid"), FAKE_NSINFO) {
@Override @Override
protected List<AsyncLogger> createLoggers() { protected List<AsyncLogger> createLoggers(AsyncLogger.Factory factory) {
return spyLoggers; return spyLoggers;
} }
}; };
@ -192,10 +192,4 @@ private EditLogOutputStream createLogSegment() throws IOException {
EditLogOutputStream stm = qjm.startLogSegment(1); EditLogOutputStream stm = qjm.startLogSegment(1);
return stm; return stm;
} }
static void writeOp(EditLogOutputStream stm, long txid) throws IOException {
FSEditLogOp op = NameNodeAdapter.createMkdirOp("tx " + txid);
op.setTransactionId(txid);
stm.write(op);
}
} }