HDFS-13596. NN restart fails after RollingUpgrade from 2.x to 3.x. Contributed by Fei Hui.
This commit is contained in:
parent
84b1982060
commit
abc8fde4ca
@ -433,7 +433,7 @@ public EditLogOutputStream startLogSegment(long txId, int layoutVersion)
|
||||
loggers.waitForWriteQuorum(q, startSegmentTimeoutMs,
|
||||
"startLogSegment(" + txId + ")");
|
||||
return new QuorumOutputStream(loggers, txId, outputBufferCapacity,
|
||||
writeTxnsTimeoutMs);
|
||||
writeTxnsTimeoutMs, layoutVersion);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -36,17 +36,18 @@ class QuorumOutputStream extends EditLogOutputStream {
|
||||
|
||||
public QuorumOutputStream(AsyncLoggerSet loggers,
|
||||
long txId, int outputBufferCapacity,
|
||||
int writeTimeoutMs) throws IOException {
|
||||
int writeTimeoutMs, int logVersion) throws IOException {
|
||||
super();
|
||||
this.buf = new EditsDoubleBuffer(outputBufferCapacity);
|
||||
this.loggers = loggers;
|
||||
this.segmentTxId = txId;
|
||||
this.writeTimeoutMs = writeTimeoutMs;
|
||||
setCurrentLogVersion(logVersion);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(FSEditLogOp op) throws IOException {
|
||||
buf.writeOp(op);
|
||||
buf.writeOp(op, getCurrentLogVersion());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -59,7 +59,7 @@ public boolean hasSomeData() {
|
||||
public EditLogOutputStream startLogSegment(long txId, int layoutVersion)
|
||||
throws IOException {
|
||||
EditLogBackupOutputStream stm = new EditLogBackupOutputStream(bnReg,
|
||||
journalInfo);
|
||||
journalInfo, layoutVersion);
|
||||
stm.startLogSegment(txId);
|
||||
return stm;
|
||||
}
|
||||
|
@ -54,8 +54,8 @@ class EditLogBackupOutputStream extends EditLogOutputStream {
|
||||
private EditsDoubleBuffer doubleBuf;
|
||||
|
||||
EditLogBackupOutputStream(NamenodeRegistration bnReg, // backup node
|
||||
JournalInfo journalInfo) // active name-node
|
||||
throws IOException {
|
||||
JournalInfo journalInfo, int logVersion) // active name-node
|
||||
throws IOException {
|
||||
super();
|
||||
this.bnRegistration = bnReg;
|
||||
this.journalInfo = journalInfo;
|
||||
@ -71,11 +71,12 @@ class EditLogBackupOutputStream extends EditLogOutputStream {
|
||||
}
|
||||
this.doubleBuf = new EditsDoubleBuffer(DEFAULT_BUFFER_SIZE);
|
||||
this.out = new DataOutputBuffer(DEFAULT_BUFFER_SIZE);
|
||||
setCurrentLogVersion(logVersion);
|
||||
}
|
||||
|
||||
@Override // EditLogOutputStream
|
||||
public void write(FSEditLogOp op) throws IOException {
|
||||
doubleBuf.writeOp(op);
|
||||
doubleBuf.writeOp(op, getCurrentLogVersion());
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -90,6 +91,7 @@ public void writeRaw(byte[] bytes, int offset, int length) throws IOException {
|
||||
public void create(int layoutVersion) throws IOException {
|
||||
assert doubleBuf.isFlushed() : "previous data is not flushed yet";
|
||||
this.doubleBuf = new EditsDoubleBuffer(DEFAULT_BUFFER_SIZE);
|
||||
setCurrentLogVersion(layoutVersion);
|
||||
}
|
||||
|
||||
@Override // EditLogOutputStream
|
||||
|
@ -95,7 +95,7 @@ public EditLogFileOutputStream(Configuration conf, File name, int size)
|
||||
|
||||
@Override
|
||||
public void write(FSEditLogOp op) throws IOException {
|
||||
doubleBuf.writeOp(op);
|
||||
doubleBuf.writeOp(op, getCurrentLogVersion());
|
||||
}
|
||||
|
||||
/**
|
||||
@ -121,6 +121,7 @@ public void create(int layoutVersion) throws IOException {
|
||||
writeHeader(layoutVersion, doubleBuf.getCurrentBuf());
|
||||
setReadyToFlush();
|
||||
flush();
|
||||
setCurrentLogVersion(layoutVersion);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -35,6 +35,8 @@ public abstract class EditLogOutputStream implements Closeable {
|
||||
// these are statistics counters
|
||||
private long numSync; // number of sync(s) to disk
|
||||
private long totalTimeSync; // total time to sync
|
||||
// The version of the current edit log
|
||||
private int currentLogVersion;
|
||||
|
||||
public EditLogOutputStream() throws IOException {
|
||||
numSync = totalTimeSync = 0;
|
||||
@ -147,4 +149,18 @@ protected long getNumSync() {
|
||||
public String generateReport() {
|
||||
return toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return The version of the current edit log
|
||||
*/
|
||||
public int getCurrentLogVersion() {
|
||||
return currentLogVersion;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param logVersion The version of the current edit log
|
||||
*/
|
||||
public void setCurrentLogVersion(int logVersion) {
|
||||
this.currentLogVersion = logVersion;
|
||||
}
|
||||
}
|
||||
|
@ -56,9 +56,9 @@ public EditsDoubleBuffer(int defaultBufferSize) {
|
||||
bufReady = new TxnBuffer(initBufferSize);
|
||||
|
||||
}
|
||||
|
||||
public void writeOp(FSEditLogOp op) throws IOException {
|
||||
bufCurrent.writeOp(op);
|
||||
|
||||
public void writeOp(FSEditLogOp op, int logVersion) throws IOException {
|
||||
bufCurrent.writeOp(op, logVersion);
|
||||
}
|
||||
|
||||
public void writeRaw(byte[] bytes, int offset, int length) throws IOException {
|
||||
@ -149,13 +149,13 @@ public TxnBuffer(int initBufferSize) {
|
||||
reset();
|
||||
}
|
||||
|
||||
public void writeOp(FSEditLogOp op) throws IOException {
|
||||
public void writeOp(FSEditLogOp op, int logVersion) throws IOException {
|
||||
if (firstTxId == HdfsServerConstants.INVALID_TXID) {
|
||||
firstTxId = op.txid;
|
||||
} else {
|
||||
assert op.txid > firstTxId;
|
||||
}
|
||||
writer.writeOp(op);
|
||||
writer.writeOp(op, logVersion);
|
||||
numTxns++;
|
||||
}
|
||||
|
||||
|
@ -155,7 +155,9 @@ private enum State {
|
||||
|
||||
//initialize
|
||||
private JournalSet journalSet = null;
|
||||
private EditLogOutputStream editLogStream = null;
|
||||
|
||||
@VisibleForTesting
|
||||
EditLogOutputStream editLogStream = null;
|
||||
|
||||
// a monotonically increasing counter that represents transactionIds.
|
||||
// All of the threads which update/increment txid are synchronized,
|
||||
|
@ -282,6 +282,11 @@ abstract void readFields(DataInputStream in, int logVersion)
|
||||
public abstract void writeFields(DataOutputStream out)
|
||||
throws IOException;
|
||||
|
||||
public void writeFields(DataOutputStream out, int logVersion)
|
||||
throws IOException {
|
||||
writeFields(out);
|
||||
}
|
||||
|
||||
static interface BlockListUpdatingOp {
|
||||
Block[] getBlocks();
|
||||
String getPath();
|
||||
@ -546,6 +551,12 @@ <T extends AddCloseOp> T setErasureCodingPolicyId(byte ecPolicyId) {
|
||||
|
||||
@Override
|
||||
public void writeFields(DataOutputStream out) throws IOException {
|
||||
throw new IOException("Unsupported without logversion");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeFields(DataOutputStream out, int logVersion)
|
||||
throws IOException {
|
||||
FSImageSerialization.writeLong(inodeId, out);
|
||||
FSImageSerialization.writeString(path, out);
|
||||
FSImageSerialization.writeShort(replication, out);
|
||||
@ -564,7 +575,10 @@ public void writeFields(DataOutputStream out) throws IOException {
|
||||
FSImageSerialization.writeString(clientMachine,out);
|
||||
FSImageSerialization.writeBoolean(overwrite, out);
|
||||
FSImageSerialization.writeByte(storagePolicyId, out);
|
||||
FSImageSerialization.writeByte(erasureCodingPolicyId, out);
|
||||
if (NameNodeLayoutVersion.supports(
|
||||
NameNodeLayoutVersion.Feature.ERASURE_CODING, logVersion)) {
|
||||
FSImageSerialization.writeByte(erasureCodingPolicyId, out);
|
||||
}
|
||||
// write clientId and callId
|
||||
writeRpcIds(rpcClientId, rpcCallId, out);
|
||||
}
|
||||
@ -4854,16 +4868,18 @@ public Writer(DataOutputBuffer out) {
|
||||
* Write an operation to the output stream
|
||||
*
|
||||
* @param op The operation to write
|
||||
* @param logVersion The version of edit log
|
||||
* @throws IOException if an error occurs during writing.
|
||||
*/
|
||||
public void writeOp(FSEditLogOp op) throws IOException {
|
||||
public void writeOp(FSEditLogOp op, int logVersion)
|
||||
throws IOException {
|
||||
int start = buf.getLength();
|
||||
// write the op code first to make padding and terminator verification
|
||||
// work
|
||||
buf.writeByte(op.opCode.getOpCode());
|
||||
buf.writeInt(0); // write 0 for the length first
|
||||
buf.writeLong(op.txid);
|
||||
op.writeFields(buf);
|
||||
op.writeFields(buf, logVersion);
|
||||
int end = buf.getLength();
|
||||
|
||||
// write the length back: content of the op + 4 bytes checksum - op_code
|
||||
|
@ -2527,6 +2527,7 @@ private HdfsFileStatus startFileInt(String src,
|
||||
final ErasureCodingPolicy ecPolicy = FSDirErasureCodingOp
|
||||
.getErasureCodingPolicy(this, ecPolicyName, iip);
|
||||
if (ecPolicy != null && (!ecPolicy.isReplicationPolicy())) {
|
||||
checkErasureCodingSupported("createWithEC");
|
||||
if (blockSize < ecPolicy.getCellSize()) {
|
||||
throw new IOException("Specified block size (" + blockSize
|
||||
+ ") is less than the cell size (" + ecPolicy.getCellSize()
|
||||
@ -7597,6 +7598,7 @@ void setErasureCodingPolicy(final String srcArg, final String ecPolicyName,
|
||||
UnresolvedLinkException, SafeModeException, AccessControlException {
|
||||
final String operationName = "setErasureCodingPolicy";
|
||||
checkOperation(OperationCategory.WRITE);
|
||||
checkErasureCodingSupported(operationName);
|
||||
FileStatus resultingStat = null;
|
||||
final FSPermissionChecker pc = getPermissionChecker();
|
||||
boolean success = false;
|
||||
@ -7629,6 +7631,7 @@ AddErasureCodingPolicyResponse[] addErasureCodingPolicies(
|
||||
final String operationName = "addErasureCodingPolicies";
|
||||
List<String> addECPolicyNames = new ArrayList<>(policies.length);
|
||||
checkOperation(OperationCategory.WRITE);
|
||||
checkErasureCodingSupported(operationName);
|
||||
List<AddErasureCodingPolicyResponse> responses =
|
||||
new ArrayList<>(policies.length);
|
||||
boolean success = false;
|
||||
@ -7670,6 +7673,7 @@ void removeErasureCodingPolicy(String ecPolicyName,
|
||||
final boolean logRetryCache) throws IOException {
|
||||
final String operationName = "removeErasureCodingPolicy";
|
||||
checkOperation(OperationCategory.WRITE);
|
||||
checkErasureCodingSupported(operationName);
|
||||
boolean success = false;
|
||||
writeLock();
|
||||
try {
|
||||
@ -7700,6 +7704,7 @@ boolean enableErasureCodingPolicy(String ecPolicyName,
|
||||
final boolean logRetryCache) throws IOException {
|
||||
final String operationName = "enableErasureCodingPolicy";
|
||||
checkOperation(OperationCategory.WRITE);
|
||||
checkErasureCodingSupported(operationName);
|
||||
boolean success = false;
|
||||
writeLock();
|
||||
try {
|
||||
@ -7731,6 +7736,7 @@ boolean disableErasureCodingPolicy(String ecPolicyName,
|
||||
final boolean logRetryCache) throws IOException {
|
||||
final String operationName = "disableErasureCodingPolicy";
|
||||
checkOperation(OperationCategory.WRITE);
|
||||
checkErasureCodingSupported(operationName);
|
||||
boolean success = false;
|
||||
LOG.info("Disable the erasure coding policy " + ecPolicyName);
|
||||
writeLock();
|
||||
@ -7764,6 +7770,7 @@ void unsetErasureCodingPolicy(final String srcArg,
|
||||
UnresolvedLinkException, SafeModeException, AccessControlException {
|
||||
final String operationName = "unsetErasureCodingPolicy";
|
||||
checkOperation(OperationCategory.WRITE);
|
||||
checkErasureCodingSupported(operationName);
|
||||
FileStatus resultingStat = null;
|
||||
final FSPermissionChecker pc = getPermissionChecker();
|
||||
boolean success = false;
|
||||
@ -7791,6 +7798,7 @@ ErasureCodingPolicy getErasureCodingPolicy(String src)
|
||||
final String operationName = "getErasureCodingPolicy";
|
||||
boolean success = false;
|
||||
checkOperation(OperationCategory.READ);
|
||||
checkErasureCodingSupported(operationName);
|
||||
final FSPermissionChecker pc = getPermissionChecker();
|
||||
readLock();
|
||||
try {
|
||||
@ -7812,6 +7820,7 @@ ErasureCodingPolicyInfo[] getErasureCodingPolicies() throws IOException {
|
||||
final String operationName = "getErasureCodingPolicies";
|
||||
boolean success = false;
|
||||
checkOperation(OperationCategory.READ);
|
||||
checkErasureCodingSupported(operationName);
|
||||
readLock();
|
||||
try {
|
||||
checkOperation(OperationCategory.READ);
|
||||
@ -7832,6 +7841,7 @@ Map<String, String> getErasureCodingCodecs() throws IOException {
|
||||
final String operationName = "getErasureCodingCodecs";
|
||||
boolean success = false;
|
||||
checkOperation(OperationCategory.READ);
|
||||
checkErasureCodingSupported(operationName);
|
||||
readLock();
|
||||
try {
|
||||
checkOperation(OperationCategory.READ);
|
||||
@ -8224,5 +8234,19 @@ String getFailedStorageCommand(String mode) {
|
||||
return "disableRestoreFailedStorage";
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check whether operation is supported.
|
||||
* @param operationName the name of operation.
|
||||
* @throws UnsupportedActionException throws UAE if not supported.
|
||||
*/
|
||||
public void checkErasureCodingSupported(String operationName)
|
||||
throws UnsupportedActionException {
|
||||
if (!NameNodeLayoutVersion.supports(
|
||||
NameNodeLayoutVersion.Feature.ERASURE_CODING,
|
||||
getEffectiveLayoutVersion())) {
|
||||
throw new UnsupportedActionException(operationName + " not supported.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -56,7 +56,7 @@ public static byte[] createTxnData(int startTxn, int numTxns) throws Exception {
|
||||
for (long txid = startTxn; txid < startTxn + numTxns; txid++) {
|
||||
FSEditLogOp op = NameNodeAdapter.createMkdirOp("tx " + txid);
|
||||
op.setTransactionId(txid);
|
||||
writer.writeOp(op);
|
||||
writer.writeOp(op, FAKE_NSINFO.getLayoutVersion());
|
||||
}
|
||||
|
||||
return Arrays.copyOf(buf.getData(), buf.getLength());
|
||||
@ -73,7 +73,7 @@ public static byte[] createGabageTxns(long startTxId, int numTxns)
|
||||
for (long txid = startTxId; txid < startTxId + numTxns; txid++) {
|
||||
FSEditLogOp op = new TestEditLog.GarbageMkdirOp();
|
||||
op.setTransactionId(txid);
|
||||
writer.writeOp(op);
|
||||
writer.writeOp(op, FAKE_NSINFO.getLayoutVersion());
|
||||
}
|
||||
return Arrays.copyOf(buf.getData(), buf.getLength());
|
||||
}
|
||||
|
@ -64,13 +64,16 @@
|
||||
import org.apache.hadoop.fs.permission.AclEntry;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.fs.permission.PermissionStatus;
|
||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSInotifyEventInputStream;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
|
||||
@ -1710,4 +1713,77 @@ public void testReadActivelyUpdatedLog() throws Exception {
|
||||
LogManager.getRootLogger().removeAppender(appender);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test edits can be writen and read without ErasureCoding supported.
|
||||
*/
|
||||
@Test
|
||||
public void testEditLogWithoutErasureCodingSupported()
|
||||
throws IOException {
|
||||
Configuration conf = getConf();
|
||||
MiniDFSCluster cluster = null;
|
||||
|
||||
// ERASURECODING not supported
|
||||
int logVersion = -61;
|
||||
assertFalse(NameNodeLayoutVersion.supports(
|
||||
NameNodeLayoutVersion.Feature.ERASURE_CODING, logVersion));
|
||||
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
||||
cluster.waitActive();
|
||||
|
||||
final FSNamesystem namesystem = cluster.getNamesystem();
|
||||
FSImage fsimage = namesystem.getFSImage();
|
||||
FileSystem fileSys = cluster.getFileSystem();
|
||||
final FSEditLog editLog = fsimage.getEditLog();
|
||||
editLog.editLogStream.setCurrentLogVersion(logVersion);
|
||||
// Write new version edit log
|
||||
long txid = editLog.rollEditLog(logVersion);
|
||||
|
||||
String testDir = "/test";
|
||||
String testFile = "testfile_001";
|
||||
String testFilePath = testDir + "/" + testFile;
|
||||
|
||||
fileSys.mkdirs(new Path(testDir), new FsPermission("755"));
|
||||
|
||||
// Create a file
|
||||
Path p = new Path(testFilePath);
|
||||
DFSTestUtil.createFile(fileSys, p, 0, (short) 1, 1);
|
||||
|
||||
long blkId = 1;
|
||||
long blkNumBytes = 1024;
|
||||
long timestamp = 1426222918;
|
||||
// Add a block to the file
|
||||
BlockInfoContiguous blockInfo =
|
||||
new BlockInfoContiguous(
|
||||
new Block(blkId, blkNumBytes, timestamp),
|
||||
(short)1);
|
||||
INodeFile file
|
||||
= (INodeFile)namesystem.getFSDirectory().getINode(testFilePath);
|
||||
file.addBlock(blockInfo);
|
||||
file.toUnderConstruction("testClient", "testMachine");
|
||||
|
||||
// Write edit log
|
||||
editLog.logAddBlock(testFilePath, file);
|
||||
editLog.rollEditLog(logVersion);
|
||||
|
||||
// Read edit log
|
||||
Collection<EditLogInputStream> editStreams
|
||||
= editLog.selectInputStreams(txid, txid + 1);
|
||||
EditLogInputStream inputStream = null;
|
||||
for (EditLogInputStream s : editStreams) {
|
||||
if (s.getFirstTxId() == txid) {
|
||||
inputStream = s;
|
||||
break;
|
||||
}
|
||||
}
|
||||
assertNotNull(inputStream);
|
||||
int readLogVersion = inputStream.getVersion(false);
|
||||
assertEquals(logVersion, readLogVersion);
|
||||
FSEditLogLoader loader = new FSEditLogLoader(namesystem, 0);
|
||||
long records = loader.loadFSEdits(inputStream, txid);
|
||||
assertTrue(records > 0);
|
||||
|
||||
editLog.close();
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
@ -87,6 +87,9 @@ public void shouldFailToCloseWhenUnflushed() throws IOException {
|
||||
@Test
|
||||
public void testDumpEdits() throws IOException {
|
||||
final int defaultBufferSize = 256;
|
||||
final int fakeLogVersion =
|
||||
NameNodeLayoutVersion.Feature.ROLLING_UPGRADE
|
||||
.getInfo().getLayoutVersion();
|
||||
EditsDoubleBuffer buffer = new EditsDoubleBuffer(defaultBufferSize);
|
||||
FSEditLogOp.OpInstanceCache cache = new FSEditLogOp.OpInstanceCache();
|
||||
|
||||
@ -98,7 +101,7 @@ public void testDumpEdits() throws IOException {
|
||||
.setPath(src)
|
||||
.setReplication(replication);
|
||||
op.setTransactionId(1);
|
||||
buffer.writeOp(op);
|
||||
buffer.writeOp(op, fakeLogVersion);
|
||||
|
||||
src = "/testdumpedits2";
|
||||
|
||||
@ -107,13 +110,13 @@ public void testDumpEdits() throws IOException {
|
||||
.setPath(src)
|
||||
.setTimestamp(0);
|
||||
op2.setTransactionId(2);
|
||||
buffer.writeOp(op2);
|
||||
buffer.writeOp(op2, fakeLogVersion);
|
||||
|
||||
FSEditLogOp.AllocateBlockIdOp op3 =
|
||||
FSEditLogOp.AllocateBlockIdOp.getInstance(cache.get())
|
||||
.setBlockId(0);
|
||||
op3.setTransactionId(3);
|
||||
buffer.writeOp(op3);
|
||||
buffer.writeOp(op3, fakeLogVersion);
|
||||
|
||||
GenericTestUtils.LogCapturer logs =
|
||||
GenericTestUtils.LogCapturer.captureLogs(EditsDoubleBuffer.LOG);
|
||||
|
Loading…
Reference in New Issue
Block a user