HDFS-2149. Move EditLogOp serialization formats into FsEditLogOp implementations. Contributed by Ivan Kelly.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1151238 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2011-07-26 20:46:58 +00:00
parent 60e4947cc7
commit 438c32aaf9
6 changed files with 937 additions and 223 deletions

View File

@ -601,6 +601,9 @@ Trunk (unreleased changes)
HDFS-2180. Refactor NameNode HTTP server into new class. (todd) HDFS-2180. Refactor NameNode HTTP server into new class. (todd)
HDFS-2149. Move EditLogOp serialization formats into FsEditLogOp
implementations. (Ivan Kelly via todd)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-1458. Improve checkpoint performance by avoiding unnecessary image HDFS-1458. Improve checkpoint performance by avoiding unnecessary image

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.namenode; package org.apache.hadoop.hdfs.server.namenode;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
@ -45,25 +46,18 @@ class EditLogBackupOutputStream extends EditLogOutputStream {
private NamenodeProtocol backupNode; // RPC proxy to backup node private NamenodeProtocol backupNode; // RPC proxy to backup node
private NamenodeRegistration bnRegistration; // backup node registration private NamenodeRegistration bnRegistration; // backup node registration
private NamenodeRegistration nnRegistration; // active node registration private NamenodeRegistration nnRegistration; // active node registration
private ArrayList<JournalRecord> bufCurrent; // current buffer for writing private ArrayList<BufferedOp> bufCurrent; // current buffer for writing
private ArrayList<JournalRecord> bufReady; // buffer ready for flushing private ArrayList<BufferedOp> bufReady; // buffer ready for flushing
private DataOutputBuffer out; // serialized output sent to backup node private DataOutputBuffer out; // serialized output sent to backup node
static class JournalRecord {
byte op; private static class BufferedOp {
Writable[] args; public final FSEditLogOpCodes opCode;
public final byte[] bytes;
JournalRecord(byte op, Writable ... writables) { public BufferedOp(FSEditLogOpCodes opCode, byte[] bytes) {
this.op = op; this.opCode = opCode;
this.args = writables; this.bytes = bytes;
}
void write(DataOutputStream out) throws IOException {
out.write(op);
if(args == null)
return;
for(Writable w : args)
w.write(out);
} }
} }
@ -84,8 +78,8 @@ class EditLogBackupOutputStream extends EditLogOutputStream {
Storage.LOG.error("Error connecting to: " + bnAddress, e); Storage.LOG.error("Error connecting to: " + bnAddress, e);
throw e; throw e;
} }
this.bufCurrent = new ArrayList<JournalRecord>(); this.bufCurrent = new ArrayList<BufferedOp>();
this.bufReady = new ArrayList<JournalRecord>(); this.bufReady = new ArrayList<BufferedOp>();
this.out = new DataOutputBuffer(DEFAULT_BUFFER_SIZE); this.out = new DataOutputBuffer(DEFAULT_BUFFER_SIZE);
} }
@ -100,13 +94,18 @@ class EditLogBackupOutputStream extends EditLogOutputStream {
} }
@Override // EditLogOutputStream @Override // EditLogOutputStream
public void write(int b) throws IOException { void write(FSEditLogOp op) throws IOException {
throw new IOException("Not implemented"); ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream s = new DataOutputStream(baos);
FSEditLogOp.Writer w = new FSEditLogOp.Writer(s);
w.writeOp(op);
bufCurrent.add(new BufferedOp(op.opCode, baos.toByteArray()));
} }
@Override // EditLogOutputStream @Override
void write(byte op, Writable ... writables) throws IOException { void writeRaw(byte[] bytes, int offset, int length) throws IOException {
bufCurrent.add(new JournalRecord(op, writables)); throw new IOException("Not supported");
} }
/** /**
@ -134,7 +133,7 @@ class EditLogBackupOutputStream extends EditLogOutputStream {
@Override // EditLogOutputStream @Override // EditLogOutputStream
void setReadyToFlush() throws IOException { void setReadyToFlush() throws IOException {
assert bufReady.size() == 0 : "previous data is not flushed yet"; assert bufReady.size() == 0 : "previous data is not flushed yet";
ArrayList<JournalRecord> tmp = bufReady; ArrayList<BufferedOp> tmp = bufReady;
bufReady = bufCurrent; bufReady = bufCurrent;
bufCurrent = tmp; bufCurrent = tmp;
} }
@ -144,12 +143,13 @@ class EditLogBackupOutputStream extends EditLogOutputStream {
assert out.size() == 0 : "Output buffer is not empty"; assert out.size() == 0 : "Output buffer is not empty";
int bufReadySize = bufReady.size(); int bufReadySize = bufReady.size();
for(int idx = 0; idx < bufReadySize; idx++) { for(int idx = 0; idx < bufReadySize; idx++) {
JournalRecord jRec = null; BufferedOp jRec = null;
for(; idx < bufReadySize; idx++) { for(; idx < bufReadySize; idx++) {
jRec = bufReady.get(idx); jRec = bufReady.get(idx);
if(jRec.op >= FSEditLogOpCodes.OP_JSPOOL_START.getOpCode()) if(jRec.opCode.getOpCode()
>= FSEditLogOpCodes.OP_JSPOOL_START.getOpCode())
break; // special operation should be sent in a separate call to BN break; // special operation should be sent in a separate call to BN
jRec.write(out); out.write(jRec.bytes, 0, jRec.bytes.length);
} }
if(out.size() > 0) if(out.size() > 0)
send(NamenodeProtocol.JA_JOURNAL); send(NamenodeProtocol.JA_JOURNAL);
@ -157,8 +157,8 @@ class EditLogBackupOutputStream extends EditLogOutputStream {
break; break;
// operation like start journal spool or increment checkpoint time // operation like start journal spool or increment checkpoint time
// is a separate call to BN // is a separate call to BN
jRec.write(out); out.write(jRec.bytes, 0, jRec.bytes.length);
send(jRec.op); send(jRec.opCode.getOpCode());
} }
bufReady.clear(); // erase all data in the buffer bufReady.clear(); // erase all data in the buffer
out.reset(); // reset buffer to the start position out.reset(); // reset buffer to the start position

View File

@ -45,6 +45,7 @@ class EditLogFileOutputStream extends EditLogOutputStream {
private FileChannel fc; // channel of the file stream for sync private FileChannel fc; // channel of the file stream for sync
private DataOutputBuffer bufCurrent; // current buffer for writing private DataOutputBuffer bufCurrent; // current buffer for writing
private DataOutputBuffer bufReady; // buffer ready for flushing private DataOutputBuffer bufReady; // buffer ready for flushing
private FSEditLogOp.Writer writer;
final private int initBufferSize; // inital buffer size final private int initBufferSize; // inital buffer size
static ByteBuffer fill = ByteBuffer.allocateDirect(1024 * 1024); // preallocation, 1MB static ByteBuffer fill = ByteBuffer.allocateDirect(1024 * 1024); // preallocation, 1MB
@ -70,6 +71,7 @@ class EditLogFileOutputStream extends EditLogOutputStream {
initBufferSize = size; initBufferSize = size;
bufCurrent = new DataOutputBuffer(size); bufCurrent = new DataOutputBuffer(size);
bufReady = new DataOutputBuffer(size); bufReady = new DataOutputBuffer(size);
writer = new FSEditLogOp.Writer(bufCurrent);
RandomAccessFile rp = new RandomAccessFile(name, "rw"); RandomAccessFile rp = new RandomAccessFile(name, "rw");
fp = new FileOutputStream(rp.getFD()); // open for append fp = new FileOutputStream(rp.getFD()); // open for append
fc = rp.getChannel(); fc = rp.getChannel();
@ -88,18 +90,11 @@ class EditLogFileOutputStream extends EditLogOutputStream {
/** {@inheritDoc} */ /** {@inheritDoc} */
@Override @Override
public void write(int b) throws IOException { void write(FSEditLogOp op) throws IOException {
bufCurrent.write(b);
}
/** {@inheritDoc} */
@Override
void write(byte op, Writable... writables) throws IOException {
int start = bufCurrent.getLength(); int start = bufCurrent.getLength();
write(op);
for (Writable w : writables) { writer.writeOp(op);
w.write(bufCurrent);
}
// write transaction checksum // write transaction checksum
int end = bufCurrent.getLength(); int end = bufCurrent.getLength();
Checksum checksum = FSEditLog.getChecksum(); Checksum checksum = FSEditLog.getChecksum();
@ -109,6 +104,12 @@ class EditLogFileOutputStream extends EditLogOutputStream {
bufCurrent.writeInt(sum); bufCurrent.writeInt(sum);
} }
/** {@inheritDoc} */
@Override
void writeRaw(byte[] bytes, int offset, int length) throws IOException {
bufCurrent.write(bytes, offset, length);
}
/** /**
* Create empty edits logs file. * Create empty edits logs file.
*/ */
@ -136,6 +137,7 @@ class EditLogFileOutputStream extends EditLogOutputStream {
} }
bufCurrent.close(); bufCurrent.close();
bufCurrent = null; bufCurrent = null;
writer = null;
} }
if(bufReady != null) { if(bufReady != null) {
@ -156,6 +158,7 @@ class EditLogFileOutputStream extends EditLogOutputStream {
} finally { } finally {
IOUtils.cleanup(FSNamesystem.LOG, bufCurrent, bufReady, fc, fp); IOUtils.cleanup(FSNamesystem.LOG, bufCurrent, bufReady, fc, fp);
bufCurrent = bufReady = null; bufCurrent = bufReady = null;
writer = null;
fc = null; fc = null;
fp = null; fp = null;
} }
@ -168,10 +171,11 @@ class EditLogFileOutputStream extends EditLogOutputStream {
@Override @Override
void setReadyToFlush() throws IOException { void setReadyToFlush() throws IOException {
assert bufReady.size() == 0 : "previous data is not flushed yet"; assert bufReady.size() == 0 : "previous data is not flushed yet";
write(FSEditLogOpCodes.OP_INVALID.getOpCode()); // insert eof marker bufCurrent.write(FSEditLogOpCodes.OP_INVALID.getOpCode()); // insert eof marker
DataOutputBuffer tmp = bufReady; DataOutputBuffer tmp = bufReady;
bufReady = bufCurrent; bufReady = bufCurrent;
bufCurrent = tmp; bufCurrent = tmp;
writer = new FSEditLogOp.Writer(bufCurrent);
} }
/** /**

View File

@ -18,17 +18,14 @@
package org.apache.hadoop.hdfs.server.namenode; package org.apache.hadoop.hdfs.server.namenode;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream;
import static org.apache.hadoop.hdfs.server.common.Util.now; import static org.apache.hadoop.hdfs.server.common.Util.now;
import org.apache.hadoop.io.Writable;
/** /**
* A generic abstract class to support journaling of edits logs into * A generic abstract class to support journaling of edits logs into
* a persistent storage. * a persistent storage.
*/ */
abstract class EditLogOutputStream extends OutputStream abstract class EditLogOutputStream implements JournalStream {
implements JournalStream {
// these are statistics counters // these are statistics counters
private long numSync; // number of sync(s) to disk private long numSync; // number of sync(s) to disk
private long totalTimeSync; // total time to sync private long totalTimeSync; // total time to sync
@ -37,19 +34,27 @@ implements JournalStream {
numSync = totalTimeSync = 0; numSync = totalTimeSync = 0;
} }
/** {@inheritDoc} */
abstract public void write(int b) throws IOException;
/** /**
* Write edits log record into the stream. * Write edits log operation to the stream.
* The record is represented by operation name and
* an array of Writable arguments.
* *
* @param op operation * @param op operation
* @param writables array of Writable arguments
* @throws IOException * @throws IOException
*/ */
abstract void write(byte op, Writable ... writables) throws IOException; abstract void write(FSEditLogOp op) throws IOException;
/**
* Write raw data to an edit log. This data should already have
* the transaction ID, checksum, etc included. It is for use
* within the BackupNode when replicating edits from the
* NameNode.
*
* @param bytes the bytes to write.
* @param offset offset in the bytes to write from
* @param length number of bytes to write
* @throws IOException
*/
abstract void writeRaw(byte[] bytes, int offset, int length)
throws IOException;
/** /**
* Create and initialize underlying persistent edits log storage. * Create and initialize underlying persistent edits log storage.

View File

@ -19,10 +19,12 @@ package org.apache.hadoop.hdfs.server.namenode;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.DataOutputStream;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.zip.Checksum; import java.util.zip.Checksum;
import java.util.zip.CheckedOutputStream;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -30,7 +32,6 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DeprecatedUTF8;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.common.Storage;
@ -43,14 +44,13 @@ import org.apache.hadoop.hdfs.server.namenode.NNStorage.NNStorageListener;
import org.apache.hadoop.hdfs.server.namenode.JournalStream.JournalType; import org.apache.hadoop.hdfs.server.namenode.JournalStream.JournalType;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics; import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Writable;
import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.util.PureJavaCrc32; import org.apache.hadoop.util.PureJavaCrc32;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.*; import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.*;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.*;
/** /**
* FSEditLog maintains a log of the namespace modifications. * FSEditLog maintains a log of the namespace modifications.
@ -319,7 +319,7 @@ public class FSEditLog implements NNStorageListener {
* Write an operation to the edit log. Do not sync to persistent * Write an operation to the edit log. Do not sync to persistent
* store yet. * store yet.
*/ */
void logEdit(FSEditLogOpCodes opCode, Writable ... writables) { void logEdit(FSEditLogOp op) {
synchronized (this) { synchronized (this) {
// wait if an automatic sync is scheduled // wait if an automatic sync is scheduled
waitIfAutoSyncScheduled(); waitIfAutoSyncScheduled();
@ -329,10 +329,10 @@ public class FSEditLog implements NNStorageListener {
ArrayList<EditLogOutputStream> errorStreams = null; ArrayList<EditLogOutputStream> errorStreams = null;
long start = now(); long start = now();
for(EditLogOutputStream eStream : editStreams) { for(EditLogOutputStream eStream : editStreams) {
if(!eStream.isOperationSupported(opCode.getOpCode())) if(!eStream.isOperationSupported(op.opCode.getOpCode()))
continue; continue;
try { try {
eStream.write(opCode.getOpCode(), writables); eStream.write(op);
} catch (IOException ie) { } catch (IOException ie) {
LOG.error("logEdit: removing "+ eStream.getName(), ie); LOG.error("logEdit: removing "+ eStream.getName(), ie);
if(errorStreams == null) if(errorStreams == null)
@ -585,49 +585,45 @@ public class FSEditLog implements NNStorageListener {
* Records the block locations of the last block. * Records the block locations of the last block.
*/ */
public void logOpenFile(String path, INodeFileUnderConstruction newNode) { public void logOpenFile(String path, INodeFileUnderConstruction newNode) {
AddOp op = AddOp.getInstance()
DeprecatedUTF8 nameReplicationPair[] = new DeprecatedUTF8[] { .setPath(path)
new DeprecatedUTF8(path), .setReplication(newNode.getReplication())
FSEditLog.toLogReplication(newNode.getReplication()), .setModificationTime(newNode.getModificationTime())
FSEditLog.toLogLong(newNode.getModificationTime()), .setAccessTime(newNode.getAccessTime())
FSEditLog.toLogLong(newNode.getAccessTime()), .setBlockSize(newNode.getPreferredBlockSize())
FSEditLog.toLogLong(newNode.getPreferredBlockSize())}; .setBlocks(newNode.getBlocks())
logEdit(OP_ADD, .setPermissionStatus(newNode.getPermissionStatus())
new ArrayWritable(DeprecatedUTF8.class, nameReplicationPair), .setClientName(newNode.getClientName())
new ArrayWritable(Block.class, newNode.getBlocks()), .setClientMachine(newNode.getClientMachine());
newNode.getPermissionStatus(),
new DeprecatedUTF8(newNode.getClientName()), logEdit(op);
new DeprecatedUTF8(newNode.getClientMachine()));
} }
/** /**
* Add close lease record to edit log. * Add close lease record to edit log.
*/ */
public void logCloseFile(String path, INodeFile newNode) { public void logCloseFile(String path, INodeFile newNode) {
DeprecatedUTF8 nameReplicationPair[] = new DeprecatedUTF8[] { CloseOp op = CloseOp.getInstance()
new DeprecatedUTF8(path), .setPath(path)
FSEditLog.toLogReplication(newNode.getReplication()), .setReplication(newNode.getReplication())
FSEditLog.toLogLong(newNode.getModificationTime()), .setModificationTime(newNode.getModificationTime())
FSEditLog.toLogLong(newNode.getAccessTime()), .setAccessTime(newNode.getAccessTime())
FSEditLog.toLogLong(newNode.getPreferredBlockSize())}; .setBlockSize(newNode.getPreferredBlockSize())
logEdit(OP_CLOSE, .setBlocks(newNode.getBlocks())
new ArrayWritable(DeprecatedUTF8.class, nameReplicationPair), .setPermissionStatus(newNode.getPermissionStatus());
new ArrayWritable(Block.class, newNode.getBlocks()),
newNode.getPermissionStatus()); logEdit(op);
} }
/** /**
* Add create directory record to edit log * Add create directory record to edit log
*/ */
public void logMkDir(String path, INode newNode) { public void logMkDir(String path, INode newNode) {
DeprecatedUTF8 info[] = new DeprecatedUTF8[] { MkdirOp op = MkdirOp.getInstance()
new DeprecatedUTF8(path), .setPath(path)
FSEditLog.toLogLong(newNode.getModificationTime()), .setTimestamp(newNode.getModificationTime())
FSEditLog.toLogLong(newNode.getAccessTime()) .setPermissionStatus(newNode.getPermissionStatus());
}; logEdit(op);
logEdit(OP_MKDIR,
new ArrayWritable(DeprecatedUTF8.class, info),
newNode.getPermissionStatus());
} }
/** /**
@ -635,33 +631,33 @@ public class FSEditLog implements NNStorageListener {
* TODO: use String parameters until just before writing to disk * TODO: use String parameters until just before writing to disk
*/ */
void logRename(String src, String dst, long timestamp) { void logRename(String src, String dst, long timestamp) {
DeprecatedUTF8 info[] = new DeprecatedUTF8[] { RenameOldOp op = RenameOldOp.getInstance()
new DeprecatedUTF8(src), .setSource(src)
new DeprecatedUTF8(dst), .setDestination(dst)
FSEditLog.toLogLong(timestamp)}; .setTimestamp(timestamp);
logEdit(OP_RENAME_OLD, new ArrayWritable(DeprecatedUTF8.class, info)); logEdit(op);
} }
/** /**
* Add rename record to edit log * Add rename record to edit log
*/ */
void logRename(String src, String dst, long timestamp, Options.Rename... options) { void logRename(String src, String dst, long timestamp, Options.Rename... options) {
DeprecatedUTF8 info[] = new DeprecatedUTF8[] { RenameOp op = RenameOp.getInstance()
new DeprecatedUTF8(src), .setSource(src)
new DeprecatedUTF8(dst), .setDestination(dst)
FSEditLog.toLogLong(timestamp)}; .setTimestamp(timestamp)
logEdit(OP_RENAME, .setOptions(options);
new ArrayWritable(DeprecatedUTF8.class, info), logEdit(op);
toBytesWritable(options));
} }
/** /**
* Add set replication record to edit log * Add set replication record to edit log
*/ */
void logSetReplication(String src, short replication) { void logSetReplication(String src, short replication) {
logEdit(OP_SET_REPLICATION, SetReplicationOp op = SetReplicationOp.getInstance()
new DeprecatedUTF8(src), .setPath(src)
FSEditLog.toLogReplication(replication)); .setReplication(replication);
logEdit(op);
} }
/** Add set namespace quota record to edit log /** Add set namespace quota record to edit log
@ -670,64 +666,69 @@ public class FSEditLog implements NNStorageListener {
* @param quota the directory size limit * @param quota the directory size limit
*/ */
void logSetQuota(String src, long nsQuota, long dsQuota) { void logSetQuota(String src, long nsQuota, long dsQuota) {
logEdit(OP_SET_QUOTA, SetQuotaOp op = SetQuotaOp.getInstance()
new DeprecatedUTF8(src), .setSource(src)
new LongWritable(nsQuota), new LongWritable(dsQuota)); .setNSQuota(nsQuota)
.setDSQuota(dsQuota);
logEdit(op);
} }
/** Add set permissions record to edit log */ /** Add set permissions record to edit log */
void logSetPermissions(String src, FsPermission permissions) { void logSetPermissions(String src, FsPermission permissions) {
logEdit(OP_SET_PERMISSIONS, new DeprecatedUTF8(src), permissions); SetPermissionsOp op = SetPermissionsOp.getInstance()
.setSource(src)
.setPermissions(permissions);
logEdit(op);
} }
/** Add set owner record to edit log */ /** Add set owner record to edit log */
void logSetOwner(String src, String username, String groupname) { void logSetOwner(String src, String username, String groupname) {
DeprecatedUTF8 u = new DeprecatedUTF8(username == null? "": username); SetOwnerOp op = SetOwnerOp.getInstance()
DeprecatedUTF8 g = new DeprecatedUTF8(groupname == null? "": groupname); .setSource(src)
logEdit(OP_SET_OWNER, new DeprecatedUTF8(src), u, g); .setUser(username)
.setGroup(groupname);
logEdit(op);
} }
/** /**
* concat(trg,src..) log * concat(trg,src..) log
*/ */
void logConcat(String trg, String [] srcs, long timestamp) { void logConcat(String trg, String [] srcs, long timestamp) {
int size = 1 + srcs.length + 1; // trg, srcs, timestamp ConcatDeleteOp op = ConcatDeleteOp.getInstance()
DeprecatedUTF8 info[] = new DeprecatedUTF8[size]; .setTarget(trg)
int idx = 0; .setSources(srcs)
info[idx++] = new DeprecatedUTF8(trg); .setTimestamp(timestamp);
for(int i=0; i<srcs.length; i++) { logEdit(op);
info[idx++] = new DeprecatedUTF8(srcs[i]);
}
info[idx] = FSEditLog.toLogLong(timestamp);
logEdit(OP_CONCAT_DELETE, new ArrayWritable(DeprecatedUTF8.class, info));
} }
/** /**
* Add delete file record to edit log * Add delete file record to edit log
*/ */
void logDelete(String src, long timestamp) { void logDelete(String src, long timestamp) {
DeprecatedUTF8 info[] = new DeprecatedUTF8[] { DeleteOp op = DeleteOp.getInstance()
new DeprecatedUTF8(src), .setPath(src)
FSEditLog.toLogLong(timestamp)}; .setTimestamp(timestamp);
logEdit(OP_DELETE, new ArrayWritable(DeprecatedUTF8.class, info)); logEdit(op);
} }
/** /**
* Add generation stamp record to edit log * Add generation stamp record to edit log
*/ */
void logGenerationStamp(long genstamp) { void logGenerationStamp(long genstamp) {
logEdit(OP_SET_GENSTAMP, new LongWritable(genstamp)); SetGenstampOp op = SetGenstampOp.getInstance()
.setGenerationStamp(genstamp);
logEdit(op);
} }
/** /**
* Add access time record to edit log * Add access time record to edit log
*/ */
void logTimes(String src, long mtime, long atime) { void logTimes(String src, long mtime, long atime) {
DeprecatedUTF8 info[] = new DeprecatedUTF8[] { TimesOp op = TimesOp.getInstance()
new DeprecatedUTF8(src), .setPath(src)
FSEditLog.toLogLong(mtime), .setModificationTime(mtime)
FSEditLog.toLogLong(atime)}; .setAccessTime(atime);
logEdit(OP_TIMES, new ArrayWritable(DeprecatedUTF8.class, info)); logEdit(op);
} }
/** /**
@ -735,14 +736,13 @@ public class FSEditLog implements NNStorageListener {
*/ */
void logSymlink(String path, String value, long mtime, void logSymlink(String path, String value, long mtime,
long atime, INodeSymlink node) { long atime, INodeSymlink node) {
DeprecatedUTF8 info[] = new DeprecatedUTF8[] { SymlinkOp op = SymlinkOp.getInstance()
new DeprecatedUTF8(path), .setPath(path)
new DeprecatedUTF8(value), .setValue(value)
FSEditLog.toLogLong(mtime), .setModificationTime(mtime)
FSEditLog.toLogLong(atime)}; .setAccessTime(atime)
logEdit(OP_SYMLINK, .setPermissionStatus(node.getPermissionStatus());
new ArrayWritable(DeprecatedUTF8.class, info), logEdit(op);
node.getPermissionStatus());
} }
/** /**
@ -753,36 +753,40 @@ public class FSEditLog implements NNStorageListener {
*/ */
void logGetDelegationToken(DelegationTokenIdentifier id, void logGetDelegationToken(DelegationTokenIdentifier id,
long expiryTime) { long expiryTime) {
logEdit(OP_GET_DELEGATION_TOKEN, id, FSEditLog.toLogLong(expiryTime)); GetDelegationTokenOp op = GetDelegationTokenOp.getInstance()
.setDelegationTokenIdentifier(id)
.setExpiryTime(expiryTime);
logEdit(op);
} }
void logRenewDelegationToken(DelegationTokenIdentifier id, void logRenewDelegationToken(DelegationTokenIdentifier id,
long expiryTime) { long expiryTime) {
logEdit(OP_RENEW_DELEGATION_TOKEN, id, FSEditLog.toLogLong(expiryTime)); RenewDelegationTokenOp op = RenewDelegationTokenOp.getInstance()
.setDelegationTokenIdentifier(id)
.setExpiryTime(expiryTime);
logEdit(op);
} }
void logCancelDelegationToken(DelegationTokenIdentifier id) { void logCancelDelegationToken(DelegationTokenIdentifier id) {
logEdit(OP_CANCEL_DELEGATION_TOKEN, id); CancelDelegationTokenOp op = CancelDelegationTokenOp.getInstance()
.setDelegationTokenIdentifier(id);
logEdit(op);
} }
void logUpdateMasterKey(DelegationKey key) { void logUpdateMasterKey(DelegationKey key) {
logEdit(OP_UPDATE_MASTER_KEY, key); UpdateMasterKeyOp op = UpdateMasterKeyOp.getInstance()
.setDelegationKey(key);
logEdit(op);
} }
void logReassignLease(String leaseHolder, String src, String newHolder) { void logReassignLease(String leaseHolder, String src, String newHolder) {
logEdit(OP_REASSIGN_LEASE, new DeprecatedUTF8(leaseHolder), ReassignLeaseOp op = ReassignLeaseOp.getInstance()
new DeprecatedUTF8(src), .setLeaseHolder(leaseHolder)
new DeprecatedUTF8(newHolder)); .setPath(src)
.setNewHolder(newHolder);
logEdit(op);
} }
static private DeprecatedUTF8 toLogReplication(short replication) {
return new DeprecatedUTF8(Short.toString(replication));
}
static private DeprecatedUTF8 toLogLong(long timestamp) {
return new DeprecatedUTF8(Long.toString(timestamp));
}
/** /**
* Return the size of the current EditLog * Return the size of the current EditLog
*/ */
@ -1030,7 +1034,7 @@ public class FSEditLog implements NNStorageListener {
boStream = new EditLogBackupOutputStream(bnReg, nnReg); boStream = new EditLogBackupOutputStream(bnReg, nnReg);
editStreams.add(boStream); editStreams.add(boStream);
} }
logEdit(OP_JSPOOL_START, (Writable[])null); logEdit(JSpoolStartOp.getInstance());
} }
/** /**
@ -1044,7 +1048,7 @@ public class FSEditLog implements NNStorageListener {
long start = now(); long start = now();
for(EditLogOutputStream eStream : editStreams) { for(EditLogOutputStream eStream : editStreams) {
try { try {
eStream.write(data, 0, length); eStream.writeRaw(data, 0, length);
} catch (IOException ie) { } catch (IOException ie) {
LOG.warn("Error in editStream " + eStream.getName(), ie); LOG.warn("Error in editStream " + eStream.getName(), ie);
if(errorStreams == null) if(errorStreams == null)
@ -1127,8 +1131,9 @@ public class FSEditLog implements NNStorageListener {
void incrementCheckpointTime() { void incrementCheckpointTime() {
storage.incrementCheckpointTime(); storage.incrementCheckpointTime();
Writable[] args = {new LongWritable(storage.getCheckpointTime())}; CheckpointTimeOp op = CheckpointTimeOp.getInstance()
logEdit(OP_CHECKPOINT_TIME, args); .setCheckpointTime(storage.getCheckpointTime());
logEdit(op);
} }
synchronized void releaseBackupStream(NamenodeRegistration registration) { synchronized void releaseBackupStream(NamenodeRegistration registration) {
@ -1179,13 +1184,6 @@ public class FSEditLog implements NNStorageListener {
return regAllowed; return regAllowed;
} }
static BytesWritable toBytesWritable(Options.Rename... options) {
byte[] bytes = new byte[options.length];
for (int i = 0; i < options.length; i++) {
bytes[i] = options[i].value();
}
return new BytesWritable(bytes);
}
/** /**
* Get the StorageDirectory for a stream * Get the StorageDirectory for a stream