HDFS-4304. Make FSEditLogOp.MAX_OP_SIZE configurable. Contributed by Colin Patrick McCabe.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1449218 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
8271fae921
commit
019d6a22b1
@ -309,9 +309,13 @@ Release 2.0.4-beta - UNRELEASED
|
|||||||
NEW FEATURES
|
NEW FEATURES
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
|
|
||||||
HDFS-4222. NN is unresponsive and loses heartbeats from DNs when
|
HDFS-4222. NN is unresponsive and loses heartbeats from DNs when
|
||||||
configured to use LDAP and LDAP has issues. (Xiaobo Peng, suresh)
|
configured to use LDAP and LDAP has issues. (Xiaobo Peng, suresh)
|
||||||
|
|
||||||
|
HDFS-4304. Make FSEditLogOp.MAX_OP_SIZE configurable. (Colin Patrick
|
||||||
|
McCabe via atm)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
@ -163,6 +163,11 @@ public String toString() {
|
|||||||
return ("BookKeeperEditLogInputStream {" + this.getName() + "}");
|
return ("BookKeeperEditLogInputStream {" + this.getName() + "}");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setMaxOpSize(int maxOpSize) {
|
||||||
|
reader.setMaxOpSize(maxOpSize);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Input stream implementation which can be used by
|
* Input stream implementation which can be used by
|
||||||
* FSEditLogOp.Reader
|
* FSEditLogOp.Reader
|
||||||
|
@ -392,6 +392,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
|||||||
public static final int DFS_NAMENODE_CHECKED_VOLUMES_MINIMUM_DEFAULT = 1;
|
public static final int DFS_NAMENODE_CHECKED_VOLUMES_MINIMUM_DEFAULT = 1;
|
||||||
public static final String DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY = "dfs.web.authentication.kerberos.principal";
|
public static final String DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY = "dfs.web.authentication.kerberos.principal";
|
||||||
public static final String DFS_WEB_AUTHENTICATION_KERBEROS_KEYTAB_KEY = "dfs.web.authentication.kerberos.keytab";
|
public static final String DFS_WEB_AUTHENTICATION_KERBEROS_KEYTAB_KEY = "dfs.web.authentication.kerberos.keytab";
|
||||||
|
public static final String DFS_NAMENODE_MAX_OP_SIZE_KEY = "dfs.namenode.max.op.size";
|
||||||
|
public static final int DFS_NAMENODE_MAX_OP_SIZE_DEFAULT = 50 * 1024 * 1024;
|
||||||
|
|
||||||
public static final String DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY = "dfs.block.local-path-access.user";
|
public static final String DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY = "dfs.block.local-path-access.user";
|
||||||
|
|
||||||
|
@ -142,4 +142,9 @@ public long getLastTxId() {
|
|||||||
public boolean isInProgress() {
|
public boolean isInProgress() {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setMaxOpSize(int maxOpSize) {
|
||||||
|
reader.setMaxOpSize(maxOpSize);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -32,6 +32,7 @@
|
|||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.hdfs.server.common.Storage;
|
import org.apache.hadoop.hdfs.server.common.Storage;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.TransferFsImage.HttpGetFailedException;
|
import org.apache.hadoop.hdfs.server.namenode.TransferFsImage.HttpGetFailedException;
|
||||||
@ -53,6 +54,7 @@ public class EditLogFileInputStream extends EditLogInputStream {
|
|||||||
private final long firstTxId;
|
private final long firstTxId;
|
||||||
private final long lastTxId;
|
private final long lastTxId;
|
||||||
private final boolean isInProgress;
|
private final boolean isInProgress;
|
||||||
|
private int maxOpSize;
|
||||||
static private enum State {
|
static private enum State {
|
||||||
UNINIT,
|
UNINIT,
|
||||||
OPEN,
|
OPEN,
|
||||||
@ -118,6 +120,7 @@ private EditLogFileInputStream(LogSource log,
|
|||||||
this.firstTxId = firstTxId;
|
this.firstTxId = firstTxId;
|
||||||
this.lastTxId = lastTxId;
|
this.lastTxId = lastTxId;
|
||||||
this.isInProgress = isInProgress;
|
this.isInProgress = isInProgress;
|
||||||
|
this.maxOpSize = DFSConfigKeys.DFS_NAMENODE_MAX_OP_SIZE_DEFAULT;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void init() throws LogHeaderCorruptException, IOException {
|
private void init() throws LogHeaderCorruptException, IOException {
|
||||||
@ -134,6 +137,7 @@ private void init() throws LogHeaderCorruptException, IOException {
|
|||||||
throw new LogHeaderCorruptException("No header found in log");
|
throw new LogHeaderCorruptException("No header found in log");
|
||||||
}
|
}
|
||||||
reader = new FSEditLogOp.Reader(dataIn, tracker, logVersion);
|
reader = new FSEditLogOp.Reader(dataIn, tracker, logVersion);
|
||||||
|
reader.setMaxOpSize(maxOpSize);
|
||||||
state = State.OPEN;
|
state = State.OPEN;
|
||||||
} finally {
|
} finally {
|
||||||
if (reader == null) {
|
if (reader == null) {
|
||||||
@ -413,4 +417,11 @@ public String getName() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setMaxOpSize(int maxOpSize) {
|
||||||
|
this.maxOpSize = maxOpSize;
|
||||||
|
if (reader != null) {
|
||||||
|
reader.setMaxOpSize(maxOpSize);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -165,4 +165,9 @@ public boolean skipUntil(long txid) throws IOException {
|
|||||||
* Return true if this stream is in progress, false if it is finalized.
|
* Return true if this stream is in progress, false if it is finalized.
|
||||||
*/
|
*/
|
||||||
public abstract boolean isInProgress();
|
public abstract boolean isInProgress();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the maximum opcode size in bytes.
|
||||||
|
*/
|
||||||
|
public abstract void setMaxOpSize(int maxOpSize);
|
||||||
}
|
}
|
||||||
|
@ -51,6 +51,7 @@
|
|||||||
import org.apache.hadoop.hdfs.util.XMLUtils;
|
import org.apache.hadoop.hdfs.util.XMLUtils;
|
||||||
import org.apache.hadoop.hdfs.util.XMLUtils.InvalidXmlException;
|
import org.apache.hadoop.hdfs.util.XMLUtils.InvalidXmlException;
|
||||||
import org.apache.hadoop.hdfs.util.XMLUtils.Stanza;
|
import org.apache.hadoop.hdfs.util.XMLUtils.Stanza;
|
||||||
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.DeprecatedUTF8;
|
import org.apache.hadoop.hdfs.DeprecatedUTF8;
|
||||||
import org.xml.sax.ContentHandler;
|
import org.xml.sax.ContentHandler;
|
||||||
import org.xml.sax.SAXException;
|
import org.xml.sax.SAXException;
|
||||||
@ -75,11 +76,6 @@
|
|||||||
public abstract class FSEditLogOp {
|
public abstract class FSEditLogOp {
|
||||||
public final FSEditLogOpCodes opCode;
|
public final FSEditLogOpCodes opCode;
|
||||||
long txid;
|
long txid;
|
||||||
/**
|
|
||||||
* Opcode size is limited to 1.5 megabytes
|
|
||||||
*/
|
|
||||||
public static final int MAX_OP_SIZE = (3 * 1024 * 1024) / 2;
|
|
||||||
|
|
||||||
|
|
||||||
@SuppressWarnings("deprecation")
|
@SuppressWarnings("deprecation")
|
||||||
final public static class OpInstanceCache {
|
final public static class OpInstanceCache {
|
||||||
@ -2246,6 +2242,7 @@ public static class Reader {
|
|||||||
private final int logVersion;
|
private final int logVersion;
|
||||||
private final Checksum checksum;
|
private final Checksum checksum;
|
||||||
private final OpInstanceCache cache;
|
private final OpInstanceCache cache;
|
||||||
|
private int maxOpSize;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Construct the reader
|
* Construct the reader
|
||||||
@ -2253,7 +2250,8 @@ public static class Reader {
|
|||||||
* @param logVersion The version of the data coming from the stream.
|
* @param logVersion The version of the data coming from the stream.
|
||||||
*/
|
*/
|
||||||
@SuppressWarnings("deprecation")
|
@SuppressWarnings("deprecation")
|
||||||
public Reader(DataInputStream in, StreamLimiter limiter, int logVersion) {
|
public Reader(DataInputStream in, StreamLimiter limiter,
|
||||||
|
int logVersion) {
|
||||||
this.logVersion = logVersion;
|
this.logVersion = logVersion;
|
||||||
if (LayoutVersion.supports(Feature.EDITS_CHESKUM, logVersion)) {
|
if (LayoutVersion.supports(Feature.EDITS_CHESKUM, logVersion)) {
|
||||||
this.checksum = new PureJavaCrc32();
|
this.checksum = new PureJavaCrc32();
|
||||||
@ -2269,6 +2267,11 @@ public Reader(DataInputStream in, StreamLimiter limiter, int logVersion) {
|
|||||||
}
|
}
|
||||||
this.limiter = limiter;
|
this.limiter = limiter;
|
||||||
this.cache = new OpInstanceCache();
|
this.cache = new OpInstanceCache();
|
||||||
|
this.maxOpSize = DFSConfigKeys.DFS_NAMENODE_MAX_OP_SIZE_DEFAULT;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMaxOpSize(int maxOpSize) {
|
||||||
|
this.maxOpSize = maxOpSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -2363,8 +2366,8 @@ private void verifyTerminator() throws IOException {
|
|||||||
* problematic byte. This usually means the beginning of the opcode.
|
* problematic byte. This usually means the beginning of the opcode.
|
||||||
*/
|
*/
|
||||||
private FSEditLogOp decodeOp() throws IOException {
|
private FSEditLogOp decodeOp() throws IOException {
|
||||||
limiter.setLimit(MAX_OP_SIZE);
|
limiter.setLimit(maxOpSize);
|
||||||
in.mark(MAX_OP_SIZE);
|
in.mark(maxOpSize);
|
||||||
|
|
||||||
if (checksum != null) {
|
if (checksum != null) {
|
||||||
checksum.reset();
|
checksum.reset();
|
||||||
|
@ -607,6 +607,12 @@ boolean loadFSImage(FSNamesystem target, MetaRecoveryContext recovery)
|
|||||||
editStreams = FSImagePreTransactionalStorageInspector
|
editStreams = FSImagePreTransactionalStorageInspector
|
||||||
.getEditLogStreams(storage);
|
.getEditLogStreams(storage);
|
||||||
}
|
}
|
||||||
|
int maxOpSize = conf.getInt(DFSConfigKeys.
|
||||||
|
DFS_NAMENODE_MAX_OP_SIZE_KEY,
|
||||||
|
DFSConfigKeys.DFS_NAMENODE_MAX_OP_SIZE_DEFAULT);
|
||||||
|
for (EditLogInputStream elis : editStreams) {
|
||||||
|
elis.setMaxOpSize(maxOpSize);
|
||||||
|
}
|
||||||
|
|
||||||
LOG.debug("Planning to load image :\n" + imageFile);
|
LOG.debug("Planning to load image :\n" + imageFile);
|
||||||
for (EditLogInputStream l : editStreams) {
|
for (EditLogInputStream l : editStreams) {
|
||||||
|
@ -267,4 +267,11 @@ static private final class PrematureEOFException extends IOException {
|
|||||||
super(msg);
|
super(msg);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setMaxOpSize(int maxOpSize) {
|
||||||
|
for (EditLogInputStream elis : streams) {
|
||||||
|
elis.setMaxOpSize(maxOpSize);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -861,6 +861,11 @@ public String getName() {
|
|||||||
public boolean isInProgress() {
|
public boolean isInProgress() {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setMaxOpSize(int maxOpSize) {
|
||||||
|
reader.setMaxOpSize(maxOpSize);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -83,6 +83,7 @@ static void runEditLogTest(EditLogTestSetup elts) throws IOException {
|
|||||||
elfos.close();
|
elfos.close();
|
||||||
elfos = null;
|
elfos = null;
|
||||||
elfis = new EditLogFileInputStream(TEST_LOG_NAME);
|
elfis = new EditLogFileInputStream(TEST_LOG_NAME);
|
||||||
|
elfis.setMaxOpSize(elts.getMaxOpSize());
|
||||||
|
|
||||||
// reading through normally will get you an exception
|
// reading through normally will get you an exception
|
||||||
Set<Long> validTxIds = elts.getValidTxIds();
|
Set<Long> validTxIds = elts.getValidTxIds();
|
||||||
@ -143,7 +144,7 @@ static void runEditLogTest(EditLogTestSetup elts) throws IOException {
|
|||||||
/**
|
/**
|
||||||
* A test scenario for the edit log
|
* A test scenario for the edit log
|
||||||
*/
|
*/
|
||||||
private interface EditLogTestSetup {
|
private static abstract class EditLogTestSetup {
|
||||||
/**
|
/**
|
||||||
* Set up the edit log.
|
* Set up the edit log.
|
||||||
*/
|
*/
|
||||||
@ -162,6 +163,13 @@ abstract public void addTransactionsToLog(EditLogOutputStream elos,
|
|||||||
* edit log.
|
* edit log.
|
||||||
**/
|
**/
|
||||||
abstract public Set<Long> getValidTxIds();
|
abstract public Set<Long> getValidTxIds();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the maximum opcode size we will use for input.
|
||||||
|
*/
|
||||||
|
public int getMaxOpSize() {
|
||||||
|
return DFSConfigKeys.DFS_NAMENODE_MAX_OP_SIZE_DEFAULT;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void padEditLog(EditLogOutputStream elos, int paddingLength)
|
static void padEditLog(EditLogOutputStream elos, int paddingLength)
|
||||||
@ -182,10 +190,10 @@ static void padEditLog(EditLogOutputStream elos, int paddingLength)
|
|||||||
}
|
}
|
||||||
|
|
||||||
static void addDeleteOpcode(EditLogOutputStream elos,
|
static void addDeleteOpcode(EditLogOutputStream elos,
|
||||||
OpInstanceCache cache) throws IOException {
|
OpInstanceCache cache, long txId, String path) throws IOException {
|
||||||
DeleteOp op = DeleteOp.getInstance(cache);
|
DeleteOp op = DeleteOp.getInstance(cache);
|
||||||
op.setTransactionId(0x0);
|
op.setTransactionId(txId);
|
||||||
op.setPath("/foo");
|
op.setPath(path);
|
||||||
op.setTimestamp(0);
|
op.setTimestamp(0);
|
||||||
elos.write(op);
|
elos.write(op);
|
||||||
}
|
}
|
||||||
@ -198,7 +206,7 @@ static void addDeleteOpcode(EditLogOutputStream elos,
|
|||||||
* able to handle any amount of padding (including no padding) without
|
* able to handle any amount of padding (including no padding) without
|
||||||
* throwing an exception.
|
* throwing an exception.
|
||||||
*/
|
*/
|
||||||
private static class EltsTestEmptyLog implements EditLogTestSetup {
|
private static class EltsTestEmptyLog extends EditLogTestSetup {
|
||||||
private int paddingLength;
|
private int paddingLength;
|
||||||
|
|
||||||
public EltsTestEmptyLog(int paddingLength) {
|
public EltsTestEmptyLog(int paddingLength) {
|
||||||
@ -242,6 +250,42 @@ public void testEmptyExtraPaddedLog() throws IOException {
|
|||||||
3 * EditLogFileOutputStream.MIN_PREALLOCATION_LENGTH));
|
3 * EditLogFileOutputStream.MIN_PREALLOCATION_LENGTH));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test using a non-default maximum opcode length.
|
||||||
|
*/
|
||||||
|
private static class EltsTestNonDefaultMaxOpSize extends EditLogTestSetup {
|
||||||
|
public EltsTestNonDefaultMaxOpSize() {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void addTransactionsToLog(EditLogOutputStream elos,
|
||||||
|
OpInstanceCache cache) throws IOException {
|
||||||
|
addDeleteOpcode(elos, cache, 0, "/foo");
|
||||||
|
addDeleteOpcode(elos, cache, 1,
|
||||||
|
"/supercalifragalisticexpialadocius.supercalifragalisticexpialadocius");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getLastValidTxId() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Set<Long> getValidTxIds() {
|
||||||
|
return Sets.newHashSet(0L);
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getMaxOpSize() {
|
||||||
|
return 30;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Test an empty edit log with extra-long padding */
|
||||||
|
@Test(timeout=180000)
|
||||||
|
public void testNonDefaultMaxOpSize() throws IOException {
|
||||||
|
runEditLogTest(new EltsTestNonDefaultMaxOpSize());
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test the scenario where an edit log contains some padding (0xff) bytes
|
* Test the scenario where an edit log contains some padding (0xff) bytes
|
||||||
* followed by valid opcode data.
|
* followed by valid opcode data.
|
||||||
@ -249,7 +293,7 @@ public void testEmptyExtraPaddedLog() throws IOException {
|
|||||||
* These edit logs are corrupt, but all the opcodes should be recoverable
|
* These edit logs are corrupt, but all the opcodes should be recoverable
|
||||||
* with recovery mode.
|
* with recovery mode.
|
||||||
*/
|
*/
|
||||||
private static class EltsTestOpcodesAfterPadding implements EditLogTestSetup {
|
private static class EltsTestOpcodesAfterPadding extends EditLogTestSetup {
|
||||||
private int paddingLength;
|
private int paddingLength;
|
||||||
|
|
||||||
public EltsTestOpcodesAfterPadding(int paddingLength) {
|
public EltsTestOpcodesAfterPadding(int paddingLength) {
|
||||||
@ -260,7 +304,7 @@ public EltsTestOpcodesAfterPadding(int paddingLength) {
|
|||||||
public void addTransactionsToLog(EditLogOutputStream elos,
|
public void addTransactionsToLog(EditLogOutputStream elos,
|
||||||
OpInstanceCache cache) throws IOException {
|
OpInstanceCache cache) throws IOException {
|
||||||
padEditLog(elos, paddingLength);
|
padEditLog(elos, paddingLength);
|
||||||
addDeleteOpcode(elos, cache);
|
addDeleteOpcode(elos, cache, 0, "/foo");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -286,7 +330,7 @@ public void testOpcodesAfterExtraPadding() throws IOException {
|
|||||||
3 * EditLogFileOutputStream.MIN_PREALLOCATION_LENGTH));
|
3 * EditLogFileOutputStream.MIN_PREALLOCATION_LENGTH));
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class EltsTestGarbageInEditLog implements EditLogTestSetup {
|
private static class EltsTestGarbageInEditLog extends EditLogTestSetup {
|
||||||
final private long BAD_TXID = 4;
|
final private long BAD_TXID = 4;
|
||||||
final private long MAX_TXID = 10;
|
final private long MAX_TXID = 10;
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user