HDFS-11306. Print remaining edit logs from buffer if edit log can't be rolled. Contributed by Wei-Chiu Chuang.
This commit is contained in:
parent
d3170f9eba
commit
1cde954a4f
@ -17,9 +17,15 @@
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.namenode;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.DataInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.util.Arrays;
|
||||
|
||||
import org.apache.commons.codec.binary.Hex;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.Writer;
|
||||
@ -37,6 +43,7 @@
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class EditsDoubleBuffer {
|
||||
protected static final Log LOG = LogFactory.getLog(EditsDoubleBuffer.class);
|
||||
|
||||
private TxnBuffer bufCurrent; // current buffer for writing
|
||||
private TxnBuffer bufReady; // buffer ready for flushing
|
||||
@ -63,6 +70,7 @@ public void close() throws IOException {
|
||||
|
||||
int bufSize = bufCurrent.size();
|
||||
if (bufSize != 0) {
|
||||
bufCurrent.dumpRemainingEditLogs();
|
||||
throw new IOException("FSEditStream has " + bufSize
|
||||
+ " bytes still to be flushed and cannot be closed.");
|
||||
}
|
||||
@ -157,6 +165,32 @@ public DataOutputBuffer reset() {
|
||||
numTxns = 0;
|
||||
return this;
|
||||
}
|
||||
|
||||
private void dumpRemainingEditLogs() {
|
||||
byte[] buf = this.getData();
|
||||
byte[] remainingRawEdits = Arrays.copyOfRange(buf, 0, this.size());
|
||||
ByteArrayInputStream bis = new ByteArrayInputStream(remainingRawEdits);
|
||||
DataInputStream dis = new DataInputStream(bis);
|
||||
FSEditLogLoader.PositionTrackingInputStream tracker =
|
||||
new FSEditLogLoader.PositionTrackingInputStream(bis);
|
||||
FSEditLogOp.Reader reader = FSEditLogOp.Reader.create(dis, tracker,
|
||||
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
|
||||
FSEditLogOp op;
|
||||
LOG.warn("The edits buffer is " + size() + " bytes long with " + numTxns +
|
||||
" unflushed transactions. " +
|
||||
"Below is the list of unflushed transactions:");
|
||||
int numTransactions = 0;
|
||||
try {
|
||||
while ((op = reader.readOp(false)) != null) {
|
||||
LOG.warn("Unflushed op [" + numTransactions + "]: " + op);
|
||||
numTransactions++;
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
// If any exceptions, print raw bytes and stop.
|
||||
LOG.warn("Unable to dump remaining ops. Remaining raw bytes: " +
|
||||
Hex.encodeHexString(remainingRawEdits), ioe);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -25,6 +25,8 @@
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.io.DataOutputBuffer;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestEditsDoubleBuffer {
|
||||
@ -81,4 +83,56 @@ public void shouldFailToCloseWhenUnflushed() throws IOException {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDumpEdits() throws IOException {
|
||||
final int defaultBufferSize = 256;
|
||||
EditsDoubleBuffer buffer = new EditsDoubleBuffer(defaultBufferSize);
|
||||
FSEditLogOp.OpInstanceCache cache = new FSEditLogOp.OpInstanceCache();
|
||||
|
||||
String src = "/testdumpedits";
|
||||
short replication = 1;
|
||||
|
||||
FSEditLogOp.SetReplicationOp op =
|
||||
FSEditLogOp.SetReplicationOp.getInstance(cache.get())
|
||||
.setPath(src)
|
||||
.setReplication(replication);
|
||||
op.setTransactionId(1);
|
||||
buffer.writeOp(op);
|
||||
|
||||
src = "/testdumpedits2";
|
||||
|
||||
FSEditLogOp.DeleteOp op2 =
|
||||
FSEditLogOp.DeleteOp.getInstance(cache.get())
|
||||
.setPath(src)
|
||||
.setTimestamp(0);
|
||||
op2.setTransactionId(2);
|
||||
buffer.writeOp(op2);
|
||||
|
||||
FSEditLogOp.AllocateBlockIdOp op3 =
|
||||
FSEditLogOp.AllocateBlockIdOp.getInstance(cache.get())
|
||||
.setBlockId(0);
|
||||
op3.setTransactionId(3);
|
||||
buffer.writeOp(op3);
|
||||
|
||||
GenericTestUtils.LogCapturer logs =
|
||||
GenericTestUtils.LogCapturer.captureLogs(EditsDoubleBuffer.LOG);
|
||||
try {
|
||||
buffer.close();
|
||||
fail();
|
||||
} catch (IOException ioe) {
|
||||
GenericTestUtils.assertExceptionContains(
|
||||
"bytes still to be flushed and cannot be closed.",
|
||||
ioe);
|
||||
EditsDoubleBuffer.LOG.info("Exception expected: ", ioe);
|
||||
}
|
||||
logs.stopCapturing();
|
||||
// Make sure ops are dumped into log in human readable format.
|
||||
Assert.assertTrue("expected " + op.toString() + " in the log",
|
||||
logs.getOutput().contains(op.toString()));
|
||||
Assert.assertTrue("expected " + op2.toString() + " in the log",
|
||||
logs.getOutput().contains(op2.toString()));
|
||||
Assert.assertTrue("expected " + op3.toString() + " in the log",
|
||||
logs.getOutput().contains(op3.toString()));
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user