HDFS-3689. Add support for variable length block. Contributed by Jing Zhao.

This commit is contained in:
Jing Zhao 2015-01-27 12:58:10 -08:00
parent 1e2d98a394
commit 2848db814a
42 changed files with 1530 additions and 571 deletions

View File

@ -47,6 +47,10 @@
* <li> SYNC_BLOCK - to force closed blocks to the disk device.
* In addition {@link Syncable#hsync()} should be called after each write,
* if true synchronous behavior is required.</li>
* <li> LAZY_PERSIST - Create the block on transient storage (RAM) if
* available.</li>
* <li> APPEND_NEWBLOCK - Append data to a new block instead of end of the last
* partial block.</li>
* </ol>
*
* Following combination is not valid and will result in
@ -93,7 +97,13 @@ public enum CreateFlag {
* This flag must only be used for intermediate data whose loss can be
* tolerated by the application.
*/
LAZY_PERSIST((short) 0x10);
LAZY_PERSIST((short) 0x10),
/**
* Append data to a new block instead of the end of the last partial block.
* This is only useful for APPEND.
*/
NEW_BLOCK((short) 0x20);
private final short mode;
@ -149,4 +159,16 @@ public static void validate(Object path, boolean pathExists,
+ ". Create option is not specified in " + flag);
}
}
/**
* Validate the CreateFlag for the append operation. The flag must contain
* APPEND, and cannot contain OVERWRITE.
*/
public static void validateForAppend(EnumSet<CreateFlag> flag) {
validate(flag);
if (!flag.contains(APPEND)) {
throw new HadoopIllegalArgumentException(flag
+ " does not contain APPEND");
}
}
}

View File

@ -165,7 +165,7 @@ protected synchronized int flushBuffer(boolean keep,
count = partialLen;
System.arraycopy(buf, bufLen - count, buf, 0, count);
} else {
count = 0;
count = 0;
}
}

View File

@ -18,10 +18,12 @@
package org.apache.hadoop.hdfs.nfs.nfs3;
import java.io.IOException;
import java.util.EnumSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.nfs.conf.NfsConfigKeys;
@ -147,7 +149,8 @@ void handleWrite(DFSClient dfsClient, WRITE3Request request, Channel channel,
CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
fos = dfsClient.append(fileIdPath, bufferSize, null, null);
fos = dfsClient.append(fileIdPath, bufferSize,
EnumSet.of(CreateFlag.APPEND), null, null);
latestAttr = Nfs3Utils.getFileAttr(dfsClient, fileIdPath, iug);
} catch (RemoteException e) {

View File

@ -18,6 +18,8 @@ Trunk (Unreleased)
HDFS-3125. Add JournalService to enable Journal Daemon. (suresh)
HDFS-3689. Add support for variable length block. (jing9)
IMPROVEMENTS
HDFS-4665. Move TestNetworkTopologyWithNodeGroup to common.

View File

@ -1656,9 +1656,8 @@ public DFSOutputStream create(String src,
* @param checksumOpt checksum options
*
* @return output stream
*
* @see ClientProtocol#create(String, FsPermission, String, EnumSetWritable,
* boolean, short, long) for detailed description of exceptions thrown
*
* @see ClientProtocol#create for detailed description of exceptions thrown
*/
public DFSOutputStream create(String src,
FsPermission permission,
@ -1732,7 +1731,7 @@ private DFSOutputStream primitiveAppend(String src, EnumSet<CreateFlag> flag,
}
return null;
}
return callAppend(src, buffersize, progress);
return callAppend(src, buffersize, flag, progress);
}
return null;
}
@ -1810,11 +1809,16 @@ public String getLinkTarget(String path) throws IOException {
}
/** Method to get stream returned by append call */
private DFSOutputStream callAppend(String src,
int buffersize, Progressable progress) throws IOException {
LastBlockWithStatus lastBlockWithStatus = null;
private DFSOutputStream callAppend(String src, int buffersize,
EnumSet<CreateFlag> flag, Progressable progress) throws IOException {
CreateFlag.validateForAppend(flag);
try {
lastBlockWithStatus = namenode.append(src, clientName);
LastBlockWithStatus blkWithStatus = namenode.append(src, clientName,
new EnumSetWritable<>(flag, CreateFlag.class));
return DFSOutputStream.newStreamForAppend(this, src,
flag.contains(CreateFlag.NEW_BLOCK),
buffersize, progress, blkWithStatus.getLastBlock(),
blkWithStatus.getFileStatus(), dfsClientConf.createChecksum());
} catch(RemoteException re) {
throw re.unwrapRemoteException(AccessControlException.class,
FileNotFoundException.class,
@ -1824,10 +1828,6 @@ private DFSOutputStream callAppend(String src,
UnresolvedPathException.class,
SnapshotAccessControlException.class);
}
HdfsFileStatus newStat = lastBlockWithStatus.getFileStatus();
return DFSOutputStream.newStreamForAppend(this, src, buffersize, progress,
lastBlockWithStatus.getLastBlock(), newStat,
dfsClientConf.createChecksum());
}
/**
@ -1835,23 +1835,25 @@ private DFSOutputStream callAppend(String src,
*
* @param src file name
* @param buffersize buffer size
* @param flag indicates whether to append data to a new block instead of
* the last block
* @param progress for reporting write-progress; null is acceptable.
* @param statistics file system statistics; null is acceptable.
* @return an output stream for writing into the file
*
* @see ClientProtocol#append(String, String)
* @see ClientProtocol#append(String, String, EnumSetWritable)
*/
public HdfsDataOutputStream append(final String src, final int buffersize,
final Progressable progress, final FileSystem.Statistics statistics
) throws IOException {
final DFSOutputStream out = append(src, buffersize, progress);
EnumSet<CreateFlag> flag, final Progressable progress,
final FileSystem.Statistics statistics) throws IOException {
final DFSOutputStream out = append(src, buffersize, flag, progress);
return createWrappedOutputStream(out, statistics, out.getInitialLen());
}
private DFSOutputStream append(String src, int buffersize, Progressable progress)
throws IOException {
private DFSOutputStream append(String src, int buffersize,
EnumSet<CreateFlag> flag, Progressable progress) throws IOException {
checkOpen();
final DFSOutputStream result = callAppend(src, buffersize, progress);
final DFSOutputStream result = callAppend(src, buffersize, flag, progress);
beginFileLease(result.getFileId(), result);
return result;
}
@ -1938,7 +1940,7 @@ public boolean rename(String src, String dst) throws IOException {
/**
* Move blocks from src to trg and delete src
* See {@link ClientProtocol#concat(String, String [])}.
* See {@link ClientProtocol#concat}.
*/
public void concat(String trg, String [] srcs) throws IOException {
checkOpen();
@ -1980,7 +1982,7 @@ public void rename(String src, String dst, Options.Rename... options)
/**
* Truncate a file to an indicated size
* See {@link ClientProtocol#truncate(String, long)}.
* See {@link ClientProtocol#truncate}.
*/
public boolean truncate(String src, long newLength) throws IOException {
checkOpen();
@ -3005,7 +3007,7 @@ public boolean primitiveMkdir(String src, FsPermission absPermission,
/**
* Get {@link ContentSummary} rooted at the specified directory.
* @param path The string representation of the path
* @param src The string representation of the path
*
* @see ClientProtocol#getContentSummary(String)
*/

View File

@ -426,15 +426,16 @@ public DatanodeInfo load(DatanodeInfo key) throws Exception {
/**
* construction with tracing info
*/
private DataStreamer(HdfsFileStatus stat, Span span) {
private DataStreamer(HdfsFileStatus stat, ExtendedBlock block, Span span) {
isAppend = false;
isLazyPersistFile = isLazyPersist(stat);
this.block = block;
stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
traceSpan = span;
}
/**
* Construct a data streamer for append
* Construct a data streamer for appending to the last partial block
* @param lastBlock last block of the file to be appended
* @param stat status of the file to be appended
* @param bytesPerChecksum number of bytes per checksum
@ -1716,7 +1717,7 @@ private DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
if (Trace.isTracing()) {
traceSpan = Trace.startSpan(this.getClass().getSimpleName()).detach();
}
streamer = new DataStreamer(stat, traceSpan);
streamer = new DataStreamer(stat, null, traceSpan);
if (favoredNodes != null && favoredNodes.length != 0) {
streamer.setFavoredNodes(favoredNodes);
}
@ -1773,7 +1774,7 @@ static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,
}
/** Construct a new output stream for append. */
private DFSOutputStream(DFSClient dfsClient, String src,
private DFSOutputStream(DFSClient dfsClient, String src, boolean toNewBlock,
Progressable progress, LocatedBlock lastBlock, HdfsFileStatus stat,
DataChecksum checksum) throws IOException {
this(dfsClient, src, progress, stat, checksum);
@ -1785,21 +1786,24 @@ private DFSOutputStream(DFSClient dfsClient, String src,
}
// The last partial block of the file has to be filled.
if (lastBlock != null) {
if (!toNewBlock && lastBlock != null) {
// indicate that we are appending to an existing block
bytesCurBlock = lastBlock.getBlockSize();
streamer = new DataStreamer(lastBlock, stat, bytesPerChecksum, traceSpan);
} else {
computePacketChunkSize(dfsClient.getConf().writePacketSize, bytesPerChecksum);
streamer = new DataStreamer(stat, traceSpan);
computePacketChunkSize(dfsClient.getConf().writePacketSize,
bytesPerChecksum);
streamer = new DataStreamer(stat,
lastBlock != null ? lastBlock.getBlock() : null, traceSpan);
}
this.fileEncryptionInfo = stat.getFileEncryptionInfo();
}
static DFSOutputStream newStreamForAppend(DFSClient dfsClient, String src,
int buffersize, Progressable progress, LocatedBlock lastBlock,
HdfsFileStatus stat, DataChecksum checksum) throws IOException {
final DFSOutputStream out = new DFSOutputStream(dfsClient, src,
boolean toNewBlock, int bufferSize, Progressable progress,
LocatedBlock lastBlock, HdfsFileStatus stat, DataChecksum checksum)
throws IOException {
final DFSOutputStream out = new DFSOutputStream(dfsClient, src, toNewBlock,
progress, lastBlock, stat, checksum);
out.start();
return out;
@ -1995,35 +1999,37 @@ private void flushOrSync(boolean isSync, EnumSet<SyncFlag> syncFlags)
long toWaitFor;
long lastBlockLength = -1L;
boolean updateLength = syncFlags.contains(SyncFlag.UPDATE_LENGTH);
boolean endBlock = syncFlags.contains(SyncFlag.END_BLOCK);
synchronized (this) {
// flush checksum buffer, but keep checksum buffer intact
int numKept = flushBuffer(true, true);
// flush checksum buffer, but keep checksum buffer intact if we do not
// need to end the current block
int numKept = flushBuffer(!endBlock, true);
// bytesCurBlock potentially incremented if there was buffered data
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug(
"DFSClient flush() :" +
" bytesCurBlock " + bytesCurBlock +
" lastFlushOffset " + lastFlushOffset);
DFSClient.LOG.debug("DFSClient flush():"
+ " bytesCurBlock=" + bytesCurBlock
+ " lastFlushOffset=" + lastFlushOffset
+ " createNewBlock=" + endBlock);
}
// Flush only if we haven't already flushed till this offset.
if (lastFlushOffset != bytesCurBlock) {
assert bytesCurBlock > lastFlushOffset;
// record the valid offset of this flush
lastFlushOffset = bytesCurBlock;
if (isSync && currentPacket == null) {
if (isSync && currentPacket == null && !endBlock) {
// Nothing to send right now,
// but sync was requested.
// Send an empty packet
// Send an empty packet if we do not end the block right now
currentPacket = createPacket(packetSize, chunksPerPacket,
bytesCurBlock, currentSeqno++);
}
} else {
if (isSync && bytesCurBlock > 0) {
if (isSync && bytesCurBlock > 0 && !endBlock) {
// Nothing to send right now,
// and the block was partially written,
// and sync was requested.
// So send an empty sync packet.
// So send an empty sync packet if we do not end the block right now
currentPacket = createPacket(packetSize, chunksPerPacket,
bytesCurBlock, currentSeqno++);
} else if (currentPacket != null) {
@ -2036,10 +2042,21 @@ private void flushOrSync(boolean isSync, EnumSet<SyncFlag> syncFlags)
currentPacket.syncBlock = isSync;
waitAndQueueCurrentPacket();
}
// Restore state of stream. Record the last flush offset
// of the last full chunk that was flushed.
//
bytesCurBlock -= numKept;
if (endBlock && bytesCurBlock > 0) {
// Need to end the current block, thus send an empty packet to
// indicate this is the end of the block and reset bytesCurBlock
currentPacket = createPacket(0, 0, bytesCurBlock, currentSeqno++);
currentPacket.lastPacketInBlock = true;
currentPacket.syncBlock = shouldSyncBlock || isSync;
waitAndQueueCurrentPacket();
bytesCurBlock = 0;
lastFlushOffset = 0;
} else {
// Restore state of stream. Record the last flush offset
// of the last full chunk that was flushed.
bytesCurBlock -= numKept;
}
toWaitFor = lastQueuedSeqno;
} // end synchronized
@ -2058,8 +2075,8 @@ private void flushOrSync(boolean isSync, EnumSet<SyncFlag> syncFlags)
// namenode.
if (persistBlocks.getAndSet(false) || updateLength) {
try {
dfsClient.namenode.fsync(src, fileId,
dfsClient.clientName, lastBlockLength);
dfsClient.namenode.fsync(src, fileId, dfsClient.clientName,
lastBlockLength);
} catch (IOException ioe) {
DFSClient.LOG.warn("Unable to persist blocks in hflush for " + src, ioe);
// If we got an error here, it might be because some other thread called

View File

@ -314,13 +314,19 @@ public FSDataInputStream next(final FileSystem fs, final Path p)
@Override
public FSDataOutputStream append(Path f, final int bufferSize,
final Progressable progress) throws IOException {
return append(f, EnumSet.of(CreateFlag.APPEND), bufferSize, progress);
}
public FSDataOutputStream append(Path f, final EnumSet<CreateFlag> flag,
final int bufferSize, final Progressable progress) throws IOException {
statistics.incrementWriteOps(1);
Path absF = fixRelativePart(f);
return new FileSystemLinkResolver<FSDataOutputStream>() {
@Override
public FSDataOutputStream doCall(final Path p)
throws IOException, UnresolvedLinkException {
return dfs.append(getPathName(p), bufferSize, progress, statistics);
throws IOException {
return dfs.append(getPathName(p), bufferSize, flag, progress,
statistics);
}
@Override
public FSDataOutputStream next(final FileSystem fs, final Path p)

View File

@ -101,6 +101,12 @@ public static enum SyncFlag {
* When doing sync to DataNodes, also update the metadata (block length) in
* the NameNode.
*/
UPDATE_LENGTH;
UPDATE_LENGTH,
/**
* Sync the data to DataNode, close the current block, and allocate a new
* block
*/
END_BLOCK;
}
}

View File

@ -463,15 +463,22 @@ public long getTimestamp() {
*/
public static class AppendEvent extends Event {
private String path;
private boolean newBlock;
public static class Builder {
private String path;
private boolean newBlock;
public Builder path(String path) {
this.path = path;
return this;
}
public Builder newBlock(boolean newBlock) {
this.newBlock = newBlock;
return this;
}
public AppendEvent build() {
return new AppendEvent(this);
}
@ -480,11 +487,16 @@ public AppendEvent build() {
private AppendEvent(Builder b) {
super(EventType.APPEND);
this.path = b.path;
this.newBlock = b.newBlock;
}
public String getPath() {
return path;
}
public boolean toNewBlock() {
return newBlock;
}
}
/**

View File

@ -203,6 +203,7 @@ public HdfsFileStatus create(String src, FsPermission masked,
* Append to the end of the file.
* @param src path of the file being created.
* @param clientName name of the current client.
* @param flag indicates whether the data is appended to a new block.
* @return wrapper with information about the last partial block and file
* status if any
* @throws AccessControlException if permission to append file is
@ -225,10 +226,10 @@ public HdfsFileStatus create(String src, FsPermission masked,
* @throws UnsupportedOperationException if append is not supported
*/
@AtMostOnce
public LastBlockWithStatus append(String src, String clientName)
throws AccessControlException, DSQuotaExceededException,
FileNotFoundException, SafeModeException, UnresolvedLinkException,
SnapshotAccessControlException, IOException;
public LastBlockWithStatus append(String src, String clientName,
EnumSetWritable<CreateFlag> flag) throws AccessControlException,
DSQuotaExceededException, FileNotFoundException, SafeModeException,
UnresolvedLinkException, SnapshotAccessControlException, IOException;
/**
* Set replication for an existing file.

View File

@ -18,12 +18,14 @@
package org.apache.hadoop.hdfs.protocolPB;
import java.io.IOException;
import java.util.EnumSet;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
@ -65,6 +67,8 @@
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AllowSnapshotResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CheckAccessRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CheckAccessResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CompleteRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CompleteResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ConcatRequestProto;
@ -187,8 +191,6 @@
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdateBlockForPipelineResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CheckAccessRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CheckAccessResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.CreateEncryptionZoneResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.CreateEncryptionZoneRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.GetEZForPathResponseProto;
@ -209,6 +211,7 @@
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.namenode.INodeId;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenRequestProto;
import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenResponseProto;
@ -412,8 +415,11 @@ public CreateResponseProto create(RpcController controller,
public AppendResponseProto append(RpcController controller,
AppendRequestProto req) throws ServiceException {
try {
EnumSetWritable<CreateFlag> flags = req.hasFlag() ?
PBHelper.convertCreateFlag(req.getFlag()) :
new EnumSetWritable<>(EnumSet.of(CreateFlag.APPEND));
LastBlockWithStatus result = server.append(req.getSrc(),
req.getClientName());
req.getClientName(), flags);
AppendResponseProto.Builder builder = AppendResponseProto.newBuilder();
if (result.getLastBlock() != null) {
builder.setBlock(PBHelper.convert(result.getLastBlock()));
@ -522,7 +528,7 @@ public GetAdditionalDatanodeResponseProto getAdditionalDatanode(
throw new ServiceException(e);
}
}
@Override
public CompleteResponseProto complete(RpcController controller,
CompleteRequestProto req) throws ServiceException {

View File

@ -28,7 +28,6 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.crypto.CipherSuite;
import org.apache.hadoop.crypto.CryptoProtocolVersion;
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
import org.apache.hadoop.fs.CacheFlag;
@ -85,6 +84,7 @@
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolEntryProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CheckAccessRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CompleteRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ConcatRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateRequestProto;
@ -158,13 +158,11 @@
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.TruncateRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdateBlockForPipelineRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CheckAccessRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetStoragePolicyRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos;
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.CreateEncryptionZoneRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.GetEZForPathRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ListEncryptionZonesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.GetXAttrsRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.ListXAttrsRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.RemoveXAttrRequestProto;
@ -318,13 +316,12 @@ public boolean truncate(String src, long newLength, String clientName)
}
@Override
public LastBlockWithStatus append(String src, String clientName)
throws AccessControlException, DSQuotaExceededException,
FileNotFoundException, SafeModeException, UnresolvedLinkException,
IOException {
AppendRequestProto req = AppendRequestProto.newBuilder()
.setSrc(src)
.setClientName(clientName)
public LastBlockWithStatus append(String src, String clientName,
EnumSetWritable<CreateFlag> flag) throws AccessControlException,
DSQuotaExceededException, FileNotFoundException, SafeModeException,
UnresolvedLinkException, IOException {
AppendRequestProto req = AppendRequestProto.newBuilder().setSrc(src)
.setClientName(clientName).setFlag(PBHelper.convertCreateFlag(flag))
.build();
try {
AppendResponseProto res = rpcProxy.append(null, req);

View File

@ -1373,6 +1373,9 @@ public static int convertCreateFlag(EnumSetWritable<CreateFlag> flag) {
if (flag.contains(CreateFlag.LAZY_PERSIST)) {
value |= CreateFlagProto.LAZY_PERSIST.getNumber();
}
if (flag.contains(CreateFlag.NEW_BLOCK)) {
value |= CreateFlagProto.NEW_BLOCK.getNumber();
}
return value;
}
@ -1393,7 +1396,11 @@ public static EnumSetWritable<CreateFlag> convertCreateFlag(int flag) {
== CreateFlagProto.LAZY_PERSIST_VALUE) {
result.add(CreateFlag.LAZY_PERSIST);
}
return new EnumSetWritable<CreateFlag>(result);
if ((flag & CreateFlagProto.NEW_BLOCK_VALUE)
== CreateFlagProto.NEW_BLOCK_VALUE) {
result.add(CreateFlag.NEW_BLOCK);
}
return new EnumSetWritable<CreateFlag>(result, CreateFlag.class);
}
public static int convertCacheFlags(EnumSet<CacheFlag> flags) {
@ -2605,11 +2612,11 @@ public static EventBatchList convert(GetEditsFromTxidResponseProto resp) throws
.build());
break;
case EVENT_APPEND:
InotifyProtos.AppendEventProto reopen =
InotifyProtos.AppendEventProto append =
InotifyProtos.AppendEventProto.parseFrom(p.getContents());
events.add(new Event.AppendEvent.Builder()
.path(reopen.getPath())
.build());
events.add(new Event.AppendEvent.Builder().path(append.getPath())
.newBlock(append.hasNewBlock() && append.getNewBlock())
.build());
break;
case EVENT_UNLINK:
InotifyProtos.UnlinkEventProto unlink =
@ -2710,10 +2717,10 @@ public static GetEditsFromTxidResponseProto convertEditsResponse(EventBatchList
Event.AppendEvent re2 = (Event.AppendEvent) e;
events.add(InotifyProtos.EventProto.newBuilder()
.setType(InotifyProtos.EventType.EVENT_APPEND)
.setContents(
InotifyProtos.AppendEventProto.newBuilder()
.setPath(re2.getPath()).build().toByteString()
).build());
.setContents(InotifyProtos.AppendEventProto.newBuilder()
.setPath(re2.getPath())
.setNewBlock(re2.toNewBlock()).build().toByteString())
.build());
break;
case UNLINK:
Event.UnlinkEvent ue = (Event.UnlinkEvent) e;

View File

@ -176,7 +176,8 @@ private void onAppend(ChannelHandlerContext ctx) throws IOException {
final int bufferSize = params.bufferSize();
DFSClient dfsClient = newDfsClient(nnId, conf);
OutputStream out = dfsClient.append(path, bufferSize, null, null);
OutputStream out = dfsClient.append(path, bufferSize,
EnumSet.of(CreateFlag.APPEND), null, null);
DefaultHttpResponse resp = new DefaultHttpResponse(HTTP_1_1, OK);
resp.headers().set(CONTENT_LENGTH, 0);
ctx.pipeline().replace(this, HdfsWriter.class.getSimpleName(),

View File

@ -19,11 +19,10 @@
import com.google.common.base.Preconditions;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.protocol.SnapshotException;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import java.io.IOException;
import java.util.Arrays;
@ -33,127 +32,29 @@
import static org.apache.hadoop.util.Time.now;
class FSDirConcatOp {
static HdfsFileStatus concat(
FSDirectory fsd, String target, String[] srcs,
static HdfsFileStatus concat(FSDirectory fsd, String target, String[] srcs,
boolean logRetryCache) throws IOException {
Preconditions.checkArgument(!target.isEmpty(), "Target file name is empty");
Preconditions.checkArgument(srcs != null && srcs.length > 0,
"No sources given");
assert srcs != null;
FSDirectory.LOG.debug("concat {} to {}", Arrays.toString(srcs), target);
// We require all files be in the same directory
String trgParent =
target.substring(0, target.lastIndexOf(Path.SEPARATOR_CHAR));
for (String s : srcs) {
String srcParent = s.substring(0, s.lastIndexOf(Path.SEPARATOR_CHAR));
if (!srcParent.equals(trgParent)) {
throw new IllegalArgumentException(
"Sources and target are not in the same directory");
}
if (FSDirectory.LOG.isDebugEnabled()) {
FSDirectory.LOG.debug("concat {} to {}", Arrays.toString(srcs), target);
}
final INodesInPath trgIip = fsd.getINodesInPath4Write(target);
final INodesInPath targetIIP = fsd.getINodesInPath4Write(target);
// write permission for the target
FSPermissionChecker pc = null;
if (fsd.isPermissionEnabled()) {
FSPermissionChecker pc = fsd.getPermissionChecker();
fsd.checkPathAccess(pc, trgIip, FsAction.WRITE);
// and srcs
for(String aSrc: srcs) {
final INodesInPath srcIip = fsd.getINodesInPath4Write(aSrc);
fsd.checkPathAccess(pc, srcIip, FsAction.READ); // read the file
fsd.checkParentAccess(pc, srcIip, FsAction.WRITE); // for delete
}
pc = fsd.getPermissionChecker();
fsd.checkPathAccess(pc, targetIIP, FsAction.WRITE);
}
// to make sure no two files are the same
Set<INode> si = new HashSet<INode>();
// we put the following prerequisite for the operation
// replication and blocks sizes should be the same for ALL the blocks
// check the target
if (fsd.getEZForPath(trgIip) != null) {
throw new HadoopIllegalArgumentException(
"concat can not be called for files in an encryption zone.");
}
final INodeFile trgInode = INodeFile.valueOf(trgIip.getLastINode(), target);
if(trgInode.isUnderConstruction()) {
throw new HadoopIllegalArgumentException("concat: target file "
+ target + " is under construction");
}
// per design target shouldn't be empty and all the blocks same size
if(trgInode.numBlocks() == 0) {
throw new HadoopIllegalArgumentException("concat: target file "
+ target + " is empty");
}
if (trgInode.isWithSnapshot()) {
throw new HadoopIllegalArgumentException("concat: target file "
+ target + " is in a snapshot");
}
long blockSize = trgInode.getPreferredBlockSize();
// check the end block to be full
final BlockInfo last = trgInode.getLastBlock();
if(blockSize != last.getNumBytes()) {
throw new HadoopIllegalArgumentException("The last block in " + target
+ " is not full; last block size = " + last.getNumBytes()
+ " but file block size = " + blockSize);
}
si.add(trgInode);
final short repl = trgInode.getFileReplication();
// now check the srcs
boolean endSrc = false; // final src file doesn't have to have full end block
for(int i=0; i< srcs.length; i++) {
String src = srcs[i];
if(i== srcs.length-1)
endSrc=true;
final INodeFile srcInode = INodeFile.valueOf(fsd.getINode4Write(src), src);
if(src.isEmpty()
|| srcInode.isUnderConstruction()
|| srcInode.numBlocks() == 0) {
throw new HadoopIllegalArgumentException("concat: source file " + src
+ " is invalid or empty or underConstruction");
}
// check replication and blocks size
if(repl != srcInode.getBlockReplication()) {
throw new HadoopIllegalArgumentException("concat: the source file "
+ src + " and the target file " + target
+ " should have the same replication: source replication is "
+ srcInode.getBlockReplication()
+ " but target replication is " + repl);
}
//boolean endBlock=false;
// verify that all the blocks are of the same length as target
// should be enough to check the end blocks
final BlockInfo[] srcBlocks = srcInode.getBlocks();
int idx = srcBlocks.length-1;
if(endSrc)
idx = srcBlocks.length-2; // end block of endSrc is OK not to be full
if(idx >= 0 && srcBlocks[idx].getNumBytes() != blockSize) {
throw new HadoopIllegalArgumentException("concat: the source file "
+ src + " and the target file " + target
+ " should have the same blocks sizes: target block size is "
+ blockSize + " but the size of source block " + idx + " is "
+ srcBlocks[idx].getNumBytes());
}
si.add(srcInode);
}
// make sure no two files are the same
if(si.size() < srcs.length+1) { // trg + srcs
// it means at least two files are the same
throw new HadoopIllegalArgumentException(
"concat: at least two of the source files are the same");
}
verifyTargetFile(fsd, target, targetIIP);
// check the srcs
INodeFile[] srcFiles = verifySrcFiles(fsd, srcs, targetIIP, pc);
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.concat: " +
@ -163,71 +64,139 @@ static HdfsFileStatus concat(
long timestamp = now();
fsd.writeLock();
try {
unprotectedConcat(fsd, target, srcs, timestamp);
unprotectedConcat(fsd, targetIIP, srcFiles, timestamp);
} finally {
fsd.writeUnlock();
}
fsd.getEditLog().logConcat(target, srcs, timestamp, logRetryCache);
return fsd.getAuditFileInfo(trgIip);
return fsd.getAuditFileInfo(targetIIP);
}
private static void verifyTargetFile(FSDirectory fsd, final String target,
final INodesInPath targetIIP) throws IOException {
// check the target
if (fsd.getEZForPath(targetIIP) != null) {
throw new HadoopIllegalArgumentException(
"concat can not be called for files in an encryption zone.");
}
final INodeFile targetINode = INodeFile.valueOf(targetIIP.getLastINode(),
target);
if(targetINode.isUnderConstruction()) {
throw new HadoopIllegalArgumentException("concat: target file "
+ target + " is under construction");
}
}
private static INodeFile[] verifySrcFiles(FSDirectory fsd, String[] srcs,
INodesInPath targetIIP, FSPermissionChecker pc) throws IOException {
// to make sure no two files are the same
Set<INodeFile> si = new HashSet<>();
final INodeFile targetINode = targetIIP.getLastINode().asFile();
final INodeDirectory targetParent = targetINode.getParent();
// now check the srcs
for(String src : srcs) {
final INodesInPath iip = fsd.getINodesInPath4Write(src);
// permission check for srcs
if (pc != null) {
fsd.checkPathAccess(pc, iip, FsAction.READ); // read the file
fsd.checkParentAccess(pc, iip, FsAction.WRITE); // for delete
}
final INode srcINode = iip.getLastINode();
final INodeFile srcINodeFile = INodeFile.valueOf(srcINode, src);
// make sure the src file and the target file are in the same dir
if (srcINodeFile.getParent() != targetParent) {
throw new HadoopIllegalArgumentException("Source file " + src
+ " is not in the same directory with the target "
+ targetIIP.getPath());
}
// make sure all the source files are not in snapshot
if (srcINode.isInLatestSnapshot(iip.getLatestSnapshotId())) {
throw new SnapshotException("Concat: the source file " + src
+ " is in snapshot");
}
// check if the file has other references.
if (srcINode.isReference() && ((INodeReference.WithCount)
srcINode.asReference().getReferredINode()).getReferenceCount() > 1) {
throw new SnapshotException("Concat: the source file " + src
+ " is referred by some other reference in some snapshot.");
}
if (srcINode == targetINode) {
throw new HadoopIllegalArgumentException("concat: the src file " + src
+ " is the same with the target file " + targetIIP.getPath());
}
if(srcINodeFile.isUnderConstruction() || srcINodeFile.numBlocks() == 0) {
throw new HadoopIllegalArgumentException("concat: source file " + src
+ " is invalid or empty or underConstruction");
}
si.add(srcINodeFile);
}
// make sure no two files are the same
if(si.size() < srcs.length) {
// it means at least two files are the same
throw new HadoopIllegalArgumentException(
"concat: at least two of the source files are the same");
}
return si.toArray(new INodeFile[si.size()]);
}
private static long computeQuotaDelta(INodeFile target, INodeFile[] srcList) {
long delta = 0;
short targetRepl = target.getBlockReplication();
for (INodeFile src : srcList) {
if (targetRepl != src.getBlockReplication()) {
delta += src.computeFileSize() *
(targetRepl - src.getBlockReplication());
}
}
return delta;
}
private static void verifyQuota(FSDirectory fsd, INodesInPath targetIIP,
long delta) throws QuotaExceededException {
if (!fsd.getFSNamesystem().isImageLoaded() || fsd.shouldSkipQuotaChecks()) {
// Do not check quota if editlog is still being processed
return;
}
FSDirectory.verifyQuota(targetIIP, targetIIP.length() - 1, 0, delta, null);
}
/**
* Concat all the blocks from srcs to trg and delete the srcs files
* @param fsd FSDirectory
* @param target target file to move the blocks to
* @param srcs list of file to move the blocks from
*/
static void unprotectedConcat(
FSDirectory fsd, String target, String[] srcs, long timestamp)
throws IOException {
static void unprotectedConcat(FSDirectory fsd, INodesInPath targetIIP,
INodeFile[] srcList, long timestamp) throws IOException {
assert fsd.hasWriteLock();
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* FSNamesystem.concat to "+target);
NameNode.stateChangeLog.debug("DIR* FSNamesystem.concat to "
+ targetIIP.getPath());
}
// do the move
final INodesInPath trgIIP = fsd.getINodesInPath4Write(target, true);
final INodeFile trgInode = trgIIP.getLastINode().asFile();
INodeDirectory trgParent = trgIIP.getINode(-2).asDirectory();
final int trgLatestSnapshot = trgIIP.getLatestSnapshotId();
final INodeFile trgInode = targetIIP.getLastINode().asFile();
long delta = computeQuotaDelta(trgInode, srcList);
verifyQuota(fsd, targetIIP, delta);
final INodeFile [] allSrcInodes = new INodeFile[srcs.length];
for(int i = 0; i < srcs.length; i++) {
final INodesInPath iip = fsd.getINodesInPath4Write(srcs[i]);
final int latest = iip.getLatestSnapshotId();
final INode inode = iip.getLastINode();
// check if the file in the latest snapshot
if (inode.isInLatestSnapshot(latest)) {
throw new SnapshotException("Concat: the source file " + srcs[i]
+ " is in snapshot " + latest);
}
// check if the file has other references.
if (inode.isReference() && ((INodeReference.WithCount)
inode.asReference().getReferredINode()).getReferenceCount() > 1) {
throw new SnapshotException("Concat: the source file " + srcs[i]
+ " is referred by some other reference in some snapshot.");
}
allSrcInodes[i] = inode.asFile();
}
trgInode.concatBlocks(allSrcInodes);
// the target file can be included in a snapshot
trgInode.recordModification(targetIIP.getLatestSnapshotId());
INodeDirectory trgParent = targetIIP.getINode(-2).asDirectory();
trgInode.concatBlocks(srcList);
// since we are in the same dir - we can use same parent to remove files
int count = 0;
for(INodeFile nodeToRemove: allSrcInodes) {
if(nodeToRemove == null) continue;
nodeToRemove.setBlocks(null);
trgParent.removeChild(nodeToRemove, trgLatestSnapshot);
fsd.getINodeMap().remove(nodeToRemove);
count++;
for (INodeFile nodeToRemove : srcList) {
if(nodeToRemove != null) {
nodeToRemove.setBlocks(null);
nodeToRemove.getParent().removeChild(nodeToRemove);
fsd.getINodeMap().remove(nodeToRemove);
count++;
}
}
trgInode.setModificationTime(timestamp, trgLatestSnapshot);
trgParent.updateModificationTime(timestamp, trgLatestSnapshot);
trgInode.setModificationTime(timestamp, targetIIP.getLatestSnapshotId());
trgParent.updateModificationTime(timestamp, targetIIP.getLatestSnapshotId());
// update quota on the parent directory ('count' files removed, 0 space)
FSDirectory.unprotectedUpdateCount(trgIIP, trgIIP.length() - 1, -count, 0);
FSDirectory.unprotectedUpdateCount(targetIIP, targetIIP.length() - 1,
-count, delta);
}
}

View File

@ -452,7 +452,7 @@ BlockInfo addBlock(String path, INodesInPath inodesInPath, Block block,
Preconditions.checkState(fileINode.isUnderConstruction());
// check quota limits and updated space consumed
updateCount(inodesInPath, 0, fileINode.getBlockDiskspace(), true);
updateCount(inodesInPath, 0, fileINode.getPreferredBlockDiskspace(), true);
// associate new last block for the file
BlockInfoUnderConstruction blockInfo =
@ -508,7 +508,7 @@ boolean unprotectedRemoveBlock(String path, INodesInPath iip,
}
// update space consumed
updateCount(iip, 0, -fileNode.getBlockDiskspace(), true);
updateCount(iip, 0, -fileNode.getPreferredBlockDiskspace(), true);
return true;
}

View File

@ -34,10 +34,10 @@
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.XAttr;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.fs.XAttr;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
@ -52,9 +52,11 @@
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddBlockOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCacheDirectiveInfoOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCachePoolOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCloseOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AllocateBlockIdOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AllowSnapshotOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AppendOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.CancelDelegationTokenOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.CloseOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ConcatDeleteOp;
@ -76,6 +78,7 @@
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameSnapshotOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenewDelegationTokenOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RollingUpgradeOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetAclOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetGenstampV1Op;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetGenstampV2Op;
@ -90,7 +93,6 @@
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TruncateOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateBlocksOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateMasterKeyOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RollingUpgradeOp;
import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
@ -702,7 +704,19 @@ private void logRpcIds(FSEditLogOp op, boolean toLogRpcIds) {
op.setRpcCallId(Server.getCallId());
}
}
public void logAppendFile(String path, INodeFile file, boolean newBlock,
boolean toLogRpcIds) {
FileUnderConstructionFeature uc = file.getFileUnderConstructionFeature();
assert uc != null;
AppendOp op = AppendOp.getInstance(cache.get()).setPath(path)
.setClientName(uc.getClientName())
.setClientMachine(uc.getClientMachine())
.setNewBlock(newBlock);
logRpcIds(op, toLogRpcIds);
logEdit(op);
}
/**
* Add open lease record to edit log.
* Records the block locations of the last block.

View File

@ -34,6 +34,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
@ -41,7 +42,6 @@
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.RollingUpgradeStartupOption;
@ -53,6 +53,7 @@
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCloseOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AllocateBlockIdOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AllowSnapshotOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AppendOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.BlockListUpdatingOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.CancelDelegationTokenOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ClearNSQuotaOp;
@ -68,6 +69,7 @@
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ReassignLeaseOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RemoveCacheDirectiveInfoOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RemoveCachePoolOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RemoveXAttrOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOldOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameSnapshotOp;
@ -83,7 +85,6 @@
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetReplicationOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetStoragePolicyOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetXAttrOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RemoveXAttrOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SymlinkOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TimesOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateBlocksOp;
@ -325,22 +326,22 @@ private long applyEditLogOp(FSEditLogOp op, FSDirectory fsDir,
LOG.trace("replaying edit log: " + op);
}
final boolean toAddRetryCache = fsNamesys.hasRetryCache() && op.hasRpcIds();
switch (op.opCode) {
case OP_ADD: {
AddCloseOp addCloseOp = (AddCloseOp)op;
final String path =
renameReservedPathsOnUpgrade(addCloseOp.path, logVersion);
if (LOG.isDebugEnabled()) {
LOG.debug(op.opCode + ": " + path +
if (FSNamesystem.LOG.isDebugEnabled()) {
FSNamesystem.LOG.debug(op.opCode + ": " + path +
" numblocks : " + addCloseOp.blocks.length +
" clientHolder " + addCloseOp.clientName +
" clientMachine " + addCloseOp.clientMachine);
}
// There three cases here:
// There are 3 cases here:
// 1. OP_ADD to create a new file
// 2. OP_ADD to update file blocks
// 3. OP_ADD to open file for append
// 3. OP_ADD to open file for append (old append)
// See if the file already exists (persistBlocks call)
INodesInPath iip = fsDir.getINodesInPath(path, true);
@ -383,19 +384,17 @@ private long applyEditLogOp(FSEditLogOp op, FSDirectory fsDir,
fsNamesys.addCacheEntryWithPayload(addCloseOp.rpcClientId,
addCloseOp.rpcCallId, stat);
}
} else { // This is OP_ADD on an existing file
} else { // This is OP_ADD on an existing file (old append)
if (!oldFile.isUnderConstruction()) {
// This is case 3: a call to append() on an already-closed file.
if (FSNamesystem.LOG.isDebugEnabled()) {
FSNamesystem.LOG.debug("Reopening an already-closed file " +
"for append");
}
// Note we do not replace the INodeFile when converting it to
// under-construction
LocatedBlock lb = fsNamesys.prepareFileForWrite(path, iip,
addCloseOp.clientName, addCloseOp.clientMachine, false, false);
// add the op into retry cache is necessary
LocatedBlock lb = fsNamesys.prepareFileForAppend(path, iip,
addCloseOp.clientName, addCloseOp.clientMachine, false, false,
false);
// add the op into retry cache if necessary
if (toAddRetryCache) {
HdfsFileStatus stat = FSDirStatAndListingOp.createFileStatus(
fsNamesys.dir,
@ -453,6 +452,34 @@ private long applyEditLogOp(FSEditLogOp op, FSDirectory fsDir,
}
break;
}
case OP_APPEND: {
AppendOp appendOp = (AppendOp) op;
final String path = renameReservedPathsOnUpgrade(appendOp.path,
logVersion);
if (FSNamesystem.LOG.isDebugEnabled()) {
FSNamesystem.LOG.debug(op.opCode + ": " + path +
" clientName " + appendOp.clientName +
" clientMachine " + appendOp.clientMachine +
" newBlock " + appendOp.newBlock);
}
INodesInPath iip = fsDir.getINodesInPath4Write(path);
INodeFile file = INodeFile.valueOf(iip.getLastINode(), path);
if (!file.isUnderConstruction()) {
LocatedBlock lb = fsNamesys.prepareFileForAppend(path, iip,
appendOp.clientName, appendOp.clientMachine, appendOp.newBlock,
false, false);
// add the op into retry cache if necessary
if (toAddRetryCache) {
HdfsFileStatus stat = FSDirStatAndListingOp.createFileStatus(
fsNamesys.dir, HdfsFileStatus.EMPTY_NAME, file,
BlockStoragePolicySuite.ID_UNSPECIFIED,
Snapshot.CURRENT_STATE_ID, false, iip);
fsNamesys.addCacheEntryWithPayload(appendOp.rpcClientId,
appendOp.rpcCallId, new LastBlockWithStatus(lb, stat));
}
}
break;
}
case OP_UPDATE_BLOCKS: {
UpdateBlocksOp updateOp = (UpdateBlocksOp)op;
final String path =
@ -499,7 +526,14 @@ private long applyEditLogOp(FSEditLogOp op, FSDirectory fsDir,
srcs[i] =
renameReservedPathsOnUpgrade(concatDeleteOp.srcs[i], logVersion);
}
FSDirConcatOp.unprotectedConcat(fsDir, trg, srcs, concatDeleteOp.timestamp);
INodesInPath targetIIP = fsDir.getINodesInPath4Write(trg);
INodeFile[] srcFiles = new INodeFile[srcs.length];
for (int i = 0; i < srcs.length; i++) {
INodesInPath srcIIP = fsDir.getINodesInPath4Write(srcs[i]);
srcFiles[i] = srcIIP.getLastINode().asFile();
}
FSDirConcatOp.unprotectedConcat(fsDir, targetIIP, srcFiles,
concatDeleteOp.timestamp);
if (toAddRetryCache) {
fsNamesys.addCacheEntry(concatDeleteOp.rpcClientId,

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ADD;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_APPEND;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ADD_BLOCK;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ADD_CACHE_DIRECTIVE;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ADD_CACHE_POOL;
@ -207,6 +208,7 @@ public OpInstanceCache() {
inst.put(OP_SET_XATTR, new SetXAttrOp());
inst.put(OP_REMOVE_XATTR, new RemoveXAttrOp());
inst.put(OP_SET_STORAGE_POLICY, new SetStoragePolicyOp());
inst.put(OP_APPEND, new AppendOp());
}
public FSEditLogOp get(FSEditLogOpCodes opcode) {
@ -428,7 +430,7 @@ static abstract class AddCloseOp extends FSEditLogOp implements BlockListUpdatin
private AddCloseOp(FSEditLogOpCodes opCode) {
super(opCode);
storagePolicyId = BlockStoragePolicySuite.ID_UNSPECIFIED;
assert(opCode == OP_ADD || opCode == OP_CLOSE);
assert(opCode == OP_ADD || opCode == OP_CLOSE || opCode == OP_APPEND);
}
@Override
@ -770,7 +772,7 @@ private AddOp() {
}
static AddOp getInstance(OpInstanceCache cache) {
return (AddOp)cache.get(OP_ADD);
return (AddOp) cache.get(OP_ADD);
}
@Override
@ -788,7 +790,7 @@ public String toString() {
}
/**
* Although {@link ClientProtocol#appendFile} may also log a close op, we do
* Although {@link ClientProtocol#append} may also log a close op, we do
* not need to record the rpc ids here since a successful appendFile op will
* finally log an AddOp.
*/
@ -814,6 +816,97 @@ public String toString() {
return builder.toString();
}
}
static class AppendOp extends FSEditLogOp {
String path;
String clientName;
String clientMachine;
boolean newBlock;
private AppendOp() {
super(OP_APPEND);
}
static AppendOp getInstance(OpInstanceCache cache) {
return (AppendOp) cache.get(OP_APPEND);
}
AppendOp setPath(String path) {
this.path = path;
return this;
}
AppendOp setClientName(String clientName) {
this.clientName = clientName;
return this;
}
AppendOp setClientMachine(String clientMachine) {
this.clientMachine = clientMachine;
return this;
}
AppendOp setNewBlock(boolean newBlock) {
this.newBlock = newBlock;
return this;
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("AppendOp ");
builder.append("[path=").append(path);
builder.append(", clientName=").append(clientName);
builder.append(", clientMachine=").append(clientMachine);
builder.append(", newBlock=").append(newBlock).append("]");
return builder.toString();
}
@Override
void resetSubFields() {
this.path = null;
this.clientName = null;
this.clientMachine = null;
this.newBlock = false;
}
@Override
void readFields(DataInputStream in, int logVersion) throws IOException {
this.path = FSImageSerialization.readString(in);
this.clientName = FSImageSerialization.readString(in);
this.clientMachine = FSImageSerialization.readString(in);
this.newBlock = FSImageSerialization.readBoolean(in);
readRpcIds(in, logVersion);
}
@Override
public void writeFields(DataOutputStream out) throws IOException {
FSImageSerialization.writeString(path, out);
FSImageSerialization.writeString(clientName, out);
FSImageSerialization.writeString(clientMachine, out);
FSImageSerialization.writeBoolean(newBlock, out);
writeRpcIds(rpcClientId, rpcCallId, out);
}
@Override
protected void toXml(ContentHandler contentHandler) throws SAXException {
XMLUtils.addSaxString(contentHandler, "PATH", path);
XMLUtils.addSaxString(contentHandler, "CLIENT_NAME", clientName);
XMLUtils.addSaxString(contentHandler, "CLIENT_MACHINE", clientMachine);
XMLUtils.addSaxString(contentHandler, "NEWBLOCK",
Boolean.toString(newBlock));
appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
}
@Override
void fromXml(Stanza st) throws InvalidXmlException {
this.path = st.getValue("PATH");
this.clientName = st.getValue("CLIENT_NAME");
this.clientMachine = st.getValue("CLIENT_MACHINE");
this.newBlock = Boolean.parseBoolean(st.getValue("NEWBLOCK"));
readRpcIdsFromXml(st);
}
}
static class AddBlockOp extends FSEditLogOp {
private String path;
@ -1643,7 +1736,7 @@ protected void toXml(ContentHandler contentHandler) throws SAXException {
* {@link ClientProtocol#updateBlockForPipeline},
* {@link ClientProtocol#recoverLease}, {@link ClientProtocol#addBlock}) or
* already bound with other editlog op which records rpc ids (
* {@link ClientProtocol#startFile}). Thus no need to record rpc ids here.
* {@link ClientProtocol#create}). Thus no need to record rpc ids here.
*/
static class SetGenstampV1Op extends FSEditLogOp {
long genStampV1;

View File

@ -74,6 +74,7 @@ public enum FSEditLogOpCodes {
OP_REMOVE_XATTR ((byte) 44),
OP_SET_STORAGE_POLICY ((byte) 45),
OP_TRUNCATE ((byte) 46),
OP_APPEND ((byte) 47),
// Note that the current range of the valid OP code is 0~127
OP_INVALID ((byte) -1);

View File

@ -250,6 +250,7 @@
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RetriableException;
@ -2586,12 +2587,12 @@ private void setNewINodeStoragePolicy(INodeFile inode,
* <p>
*
* For description of parameters and exceptions thrown see
* {@link ClientProtocol#append(String, String)}
*
* {@link ClientProtocol#append(String, String, EnumSetWritable)}
*
* @return the last block locations if the block is partial or null otherwise
*/
private LocatedBlock appendFileInternal(FSPermissionChecker pc,
INodesInPath iip, String holder, String clientMachine,
INodesInPath iip, String holder, String clientMachine, boolean newBlock,
boolean logRetryCache) throws IOException {
assert hasWriteLock();
// Verify that the destination does not exist as a directory already.
@ -2613,7 +2614,6 @@ private LocatedBlock appendFileInternal(FSPermissionChecker pc,
INodeFile myFile = INodeFile.valueOf(inode, src, true);
final BlockStoragePolicy lpPolicy =
blockManager.getStoragePolicy("LAZY_PERSIST");
if (lpPolicy != null &&
lpPolicy.getId() == myFile.getStoragePolicyID()) {
throw new UnsupportedOperationException(
@ -2629,8 +2629,8 @@ private LocatedBlock appendFileInternal(FSPermissionChecker pc,
throw new IOException("append: lastBlock=" + lastBlock +
" of src=" + src + " is not sufficiently replicated yet.");
}
return prepareFileForWrite(src, iip, holder, clientMachine, true,
logRetryCache);
return prepareFileForAppend(src, iip, holder, clientMachine, newBlock,
true, logRetryCache);
} catch (IOException ie) {
NameNode.stateChangeLog.warn("DIR* NameSystem.append: " +ie.getMessage());
throw ie;
@ -2644,6 +2644,7 @@ private LocatedBlock appendFileInternal(FSPermissionChecker pc,
* @param src path to the file
* @param leaseHolder identifier of the lease holder on this file
* @param clientMachine identifier of the client machine
* @param newBlock if the data is appended to a new block
* @param writeToEditLog whether to persist this change to the edit log
* @param logRetryCache whether to record RPC ids in editlog for retry cache
* rebuilding
@ -2651,26 +2652,34 @@ private LocatedBlock appendFileInternal(FSPermissionChecker pc,
* @throws UnresolvedLinkException
* @throws IOException
*/
LocatedBlock prepareFileForWrite(String src, INodesInPath iip,
String leaseHolder, String clientMachine, boolean writeToEditLog,
boolean logRetryCache) throws IOException {
LocatedBlock prepareFileForAppend(String src, INodesInPath iip,
String leaseHolder, String clientMachine, boolean newBlock,
boolean writeToEditLog, boolean logRetryCache) throws IOException {
final INodeFile file = iip.getLastINode().asFile();
file.recordModification(iip.getLatestSnapshotId());
file.toUnderConstruction(leaseHolder, clientMachine);
leaseManager.addLease(
file.getFileUnderConstructionFeature().getClientName(), src);
LocatedBlock ret =
blockManager.convertLastBlockToUnderConstruction(file, 0);
if (ret != null) {
// update the quota: use the preferred block size for UC block
final long diff = file.getPreferredBlockSize() - ret.getBlockSize();
dir.updateSpaceConsumed(iip, 0, diff * file.getBlockReplication());
LocatedBlock ret = null;
if (!newBlock) {
ret = blockManager.convertLastBlockToUnderConstruction(file, 0);
if (ret != null) {
// update the quota: use the preferred block size for UC block
final long diff = file.getPreferredBlockSize() - ret.getBlockSize();
dir.updateSpaceConsumed(iip, 0, diff * file.getBlockReplication());
}
} else {
BlockInfo lastBlock = file.getLastBlock();
if (lastBlock != null) {
ExtendedBlock blk = new ExtendedBlock(this.getBlockPoolId(), lastBlock);
ret = new LocatedBlock(blk, new DatanodeInfo[0]);
}
}
if (writeToEditLog) {
getEditLog().logOpenFile(src, file, false, logRetryCache);
getEditLog().logAppendFile(src, file, newBlock, logRetryCache);
}
return ret;
}
@ -2805,11 +2814,12 @@ void recoverLeaseInternal(INodesInPath iip,
/**
* Append to an existing file in the namespace.
*/
LastBlockWithStatus appendFile(
String src, String holder, String clientMachine, boolean logRetryCache)
LastBlockWithStatus appendFile(String src, String holder,
String clientMachine, EnumSet<CreateFlag> flag, boolean logRetryCache)
throws IOException {
try {
return appendFileInt(src, holder, clientMachine, logRetryCache);
return appendFileInt(src, holder, clientMachine,
flag.contains(CreateFlag.NEW_BLOCK), logRetryCache);
} catch (AccessControlException e) {
logAuditEvent(false, "append", src);
throw e;
@ -2817,7 +2827,8 @@ LastBlockWithStatus appendFile(
}
private LastBlockWithStatus appendFileInt(final String srcArg, String holder,
String clientMachine, boolean logRetryCache) throws IOException {
String clientMachine, boolean newBlock, boolean logRetryCache)
throws IOException {
String src = srcArg;
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.appendFile: src=" + src
@ -2836,7 +2847,8 @@ private LastBlockWithStatus appendFileInt(final String srcArg, String holder,
checkNameNodeSafeMode("Cannot append to file" + src);
src = dir.resolvePath(pc, src, pathComponents);
final INodesInPath iip = dir.getINodesInPath4Write(src);
lb = appendFileInternal(pc, iip, holder, clientMachine, logRetryCache);
lb = appendFileInternal(pc, iip, holder, clientMachine, newBlock,
logRetryCache);
stat = FSDirStatAndListingOp.getFileInfo(dir, src, false,
FSDirectory.isReservedRawName(srcArg), true);
} catch (StandbyException se) {

View File

@ -412,7 +412,7 @@ public long getHeaderLong() {
}
/** @return the diskspace required for a full block. */
final long getBlockDiskspace() {
final long getPreferredBlockDiskspace() {
return getPreferredBlockSize() * getBlockReplication();
}

View File

@ -65,6 +65,10 @@ public static EventBatch translate(FSEditLogOp op) {
FSEditLogOp.CloseOp cOp = (FSEditLogOp.CloseOp) op;
return new EventBatch(op.txid, new Event[] {
new Event.CloseEvent(cOp.path, getSize(cOp), cOp.mtime) });
case OP_APPEND:
FSEditLogOp.AppendOp appendOp = (FSEditLogOp.AppendOp) op;
return new EventBatch(op.txid, new Event[] {new Event.AppendEvent
.Builder().path(appendOp.path).newBlock(appendOp.newBlock).build()});
case OP_SET_REPLICATION:
FSEditLogOp.SetReplicationOp setRepOp = (FSEditLogOp.SetReplicationOp) op;
return new EventBatch(op.txid,

View File

@ -70,7 +70,8 @@ public static enum Feature implements LayoutFeature {
"creating file with overwrite"),
XATTRS_NAMESPACE_EXT(-59, "Increase number of xattr namespaces"),
BLOCK_STORAGE_POLICY(-60, "Block Storage policy"),
TRUNCATE(-61, "Truncate");
TRUNCATE(-61, "Truncate"),
APPEND_NEW_BLOCK(-62, "Support appending to new block");
private final FeatureInfo info;

View File

@ -633,15 +633,16 @@ public HdfsFileStatus create(String src, FsPermission masked,
}
@Override // ClientProtocol
public LastBlockWithStatus append(String src, String clientName)
throws IOException {
public LastBlockWithStatus append(String src, String clientName,
EnumSetWritable<CreateFlag> flag) throws IOException {
checkNNStartup();
String clientMachine = getClientMachine();
if (stateChangeLog.isDebugEnabled()) {
stateChangeLog.debug("*DIR* NameNode.append: file "
+src+" for "+clientName+" at "+clientMachine);
}
CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache, null);
CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache,
null);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return (LastBlockWithStatus) cacheEntry.getPayload();
}
@ -649,7 +650,7 @@ public LastBlockWithStatus append(String src, String clientName)
LastBlockWithStatus info = null;
boolean success = false;
try {
info = namesystem.appendFile(src, clientName, clientMachine,
info = namesystem.appendFile(src, clientName, clientMachine, flag.get(),
cacheEntry != null);
success = true;
} finally {

View File

@ -66,6 +66,7 @@ enum CreateFlagProto {
OVERWRITE = 0x02; // Truncate/overwrite a file. Same as POSIX O_TRUNC
APPEND = 0x04; // Append to a file
LAZY_PERSIST = 0x10; // File with reduced durability guarantees.
NEW_BLOCK = 0x20; // Write data to a new block when appending
}
message CreateRequestProto {
@ -86,6 +87,7 @@ message CreateResponseProto {
message AppendRequestProto {
required string src = 1;
required string clientName = 2;
optional uint32 flag = 3; // bits set using CreateFlag
}
message AppendResponseProto {

View File

@ -89,6 +89,7 @@ message CloseEventProto {
message AppendEventProto {
required string path = 1;
optional bool newBlock = 2 [default = false];
}
message RenameEventProto {

View File

@ -136,6 +136,22 @@ public static void check(FileSystem fs, Path p, long length) throws IOException
}
}
public static void check(DistributedFileSystem fs, Path p, int position,
int length) throws IOException {
byte[] buf = new byte[length];
int i = 0;
try {
FSDataInputStream in = fs.open(p);
in.read(position, buf, 0, buf.length);
for(i = position; i < length + position; i++) {
assertEquals((byte) i, buf[i - position]);
}
in.close();
} catch(IOException ioe) {
throw new IOException("p=" + p + ", length=" + length + ", i=" + i, ioe);
}
}
/**
* create a buffer that contains the entire test file data.
*/

View File

@ -1132,6 +1132,9 @@ public static void runOperations(MiniDFSCluster cluster,
FSDataOutputStream s = filesystem.create(pathFileCreate);
// OP_CLOSE 9
s.close();
// OP_APPEND 47
FSDataOutputStream s2 = filesystem.append(pathFileCreate, 4096, null);
s2.close();
// OP_SET_STORAGE_POLICY 45
filesystem.setStoragePolicy(pathFileCreate,
HdfsConstants.HOT_STORAGE_POLICY_NAME);

View File

@ -21,6 +21,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.XAttrSetFlag;
@ -71,7 +72,7 @@ private static long checkTxid(EventBatch batch, long prevTxid){
*/
@Test
public void testOpcodeCount() {
Assert.assertEquals(48, FSEditLogOpCodes.values().length);
Assert.assertEquals(49, FSEditLogOpCodes.values().length);
}
@ -109,7 +110,8 @@ public void testBasic() throws IOException, URISyntaxException,
os.write(new byte[BLOCK_SIZE]);
os.close(); // CloseOp -> CloseEvent
// AddOp -> AppendEvent
os = client.append("/file2", BLOCK_SIZE, null, null);
os = client.append("/file2", BLOCK_SIZE, EnumSet.of(CreateFlag.APPEND),
null, null);
os.write(new byte[BLOCK_SIZE]);
os.close(); // CloseOp -> CloseEvent
Thread.sleep(10); // so that the atime will get updated on the next line
@ -182,13 +184,14 @@ public void testBasic() throws IOException, URISyntaxException,
Assert.assertTrue(ce2.getFileSize() > 0);
Assert.assertTrue(ce2.getTimestamp() > 0);
// AddOp
// AppendOp
batch = waitForNextEvents(eis);
Assert.assertEquals(1, batch.getEvents().length);
txid = checkTxid(batch, txid);
Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.APPEND);
Event.AppendEvent append2 = (Event.AppendEvent)batch.getEvents()[0];
Assert.assertEquals("/file2", append2.getPath());
Assert.assertFalse(append2.toNewBlock());
// CloseOp
batch = waitForNextEvents(eis);

View File

@ -25,10 +25,12 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.EnumSet;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.HardLink;
@ -344,7 +346,46 @@ public void testAppendTwice() throws Exception {
cluster.shutdown();
}
}
/** Test two consecutive appends on a file with a full block. */
@Test
public void testAppend2Twice() throws Exception {
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
final DistributedFileSystem fs1 = cluster.getFileSystem();
final FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername(conf);
try {
final Path p = new Path("/testAppendTwice/foo");
final int len = 1 << 16;
final byte[] fileContents = AppendTestUtil.initBuffer(len);
{
// create a new file with a full block.
FSDataOutputStream out = fs2.create(p, true, 4096, (short)1, len);
out.write(fileContents, 0, len);
out.close();
}
//1st append does not add any data so that the last block remains full
//and the last block in INodeFileUnderConstruction is a BlockInfo
//but not BlockInfoUnderConstruction.
((DistributedFileSystem) fs2).append(p,
EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null);
// 2nd append should get AlreadyBeingCreatedException
fs1.append(p);
Assert.fail();
} catch(RemoteException re) {
AppendTestUtil.LOG.info("Got an exception:", re);
Assert.assertEquals(AlreadyBeingCreatedException.class.getName(),
re.getClassName());
} finally {
fs2.close();
fs1.close();
cluster.shutdown();
}
}
/** Tests appending after soft-limit expires. */
@Test
public void testAppendAfterSoftLimit()
@ -386,6 +427,54 @@ public void testAppendAfterSoftLimit()
}
}
/** Tests appending after soft-limit expires. */
@Test
public void testAppend2AfterSoftLimit() throws Exception {
Configuration conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
//Set small soft-limit for lease
final long softLimit = 1L;
final long hardLimit = 9999999L;
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
.build();
cluster.setLeasePeriod(softLimit, hardLimit);
cluster.waitActive();
DistributedFileSystem fs = cluster.getFileSystem();
DistributedFileSystem fs2 = new DistributedFileSystem();
fs2.initialize(fs.getUri(), conf);
final Path testPath = new Path("/testAppendAfterSoftLimit");
final byte[] fileContents = AppendTestUtil.initBuffer(32);
// create a new file without closing
FSDataOutputStream out = fs.create(testPath);
out.write(fileContents);
//Wait for > soft-limit
Thread.sleep(250);
try {
FSDataOutputStream appendStream2 = fs2.append(testPath,
EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null);
appendStream2.write(fileContents);
appendStream2.close();
assertEquals(fileContents.length, fs.getFileStatus(testPath).getLen());
// make sure we now have 1 block since the first writer was revoked
LocatedBlocks blks = fs.getClient().getLocatedBlocks(testPath.toString(),
0L);
assertEquals(1, blks.getLocatedBlocks().size());
for (LocatedBlock blk : blks.getLocatedBlocks()) {
assertEquals(fileContents.length, blk.getBlockSize());
}
} finally {
fs.close();
fs2.close();
cluster.shutdown();
}
}
/**
* Old replica of the block should not be accepted as valid for append/read
*/
@ -439,4 +528,77 @@ public void testFailedAppendBlockRejection() throws Exception {
}
}
/**
* Old replica of the block should not be accepted as valid for append/read
*/
@Test
public void testMultiAppend2() throws Exception {
Configuration conf = new HdfsConfiguration();
conf.set("dfs.client.block.write.replace-datanode-on-failure.enable",
"false");
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3)
.build();
DistributedFileSystem fs = null;
final String hello = "hello\n";
try {
fs = cluster.getFileSystem();
Path path = new Path("/test");
FSDataOutputStream out = fs.create(path);
out.writeBytes(hello);
out.close();
// stop one datanode
DataNodeProperties dnProp = cluster.stopDataNode(0);
String dnAddress = dnProp.datanode.getXferAddress().toString();
if (dnAddress.startsWith("/")) {
dnAddress = dnAddress.substring(1);
}
// append again to bump genstamps
for (int i = 0; i < 2; i++) {
out = fs.append(path,
EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null);
out.writeBytes(hello);
out.close();
}
// re-open and make the block state as underconstruction
out = fs.append(path, EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK),
4096, null);
cluster.restartDataNode(dnProp, true);
// wait till the block report comes
Thread.sleep(2000);
out.writeBytes(hello);
out.close();
// check the block locations
LocatedBlocks blocks = fs.getClient().getLocatedBlocks(path.toString(), 0L);
// since we append the file 3 time, we should be 4 blocks
assertEquals(4, blocks.getLocatedBlocks().size());
for (LocatedBlock block : blocks.getLocatedBlocks()) {
assertEquals(hello.length(), block.getBlockSize());
}
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 4; i++) {
sb.append(hello);
}
final byte[] content = sb.toString().getBytes();
AppendTestUtil.checkFullFile(fs, path, content.length, content,
"Read /test");
// restart namenode to make sure the editlog can be properly applied
cluster.restartNameNode(true);
cluster.waitActive();
AppendTestUtil.checkFullFile(fs, path, content.length, content,
"Read /test");
blocks = fs.getClient().getLocatedBlocks(path.toString(), 0L);
// since we append the file 3 time, we should be 4 blocks
assertEquals(4, blocks.getLocatedBlocks().size());
for (LocatedBlock block : blocks.getLocatedBlocks()) {
assertEquals(hello.length(), block.getBlockSize());
}
} finally {
IOUtils.closeStream(fs);
cluster.shutdown();
}
}
}

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@ -24,14 +25,18 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
@ -67,11 +72,7 @@ public class TestFileAppend2 {
final int numberOfFiles = 50;
final int numThreads = 10;
final int numAppendsPerThread = 20;
/***
int numberOfFiles = 1;
int numThreads = 1;
int numAppendsPerThread = 2000;
****/
Workload[] workload = null;
final ArrayList<Path> testFiles = new ArrayList<Path>();
volatile static boolean globalStatus = true;
@ -229,16 +230,170 @@ public void testSimpleAppend() throws IOException {
}
}
/**
* Creates one file, writes a few bytes to it and then closed it.
* Reopens the same file for appending using append2 API, write all blocks and
* then close. Verify that all data exists in file.
*/
@Test
public void testSimpleAppend2() throws Exception {
final Configuration conf = new HdfsConfiguration();
if (simulatedStorage) {
SimulatedFSDataset.setFactory(conf);
}
conf.setInt(DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_KEY, 50);
fileContents = AppendTestUtil.initBuffer(AppendTestUtil.FILE_SIZE);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
DistributedFileSystem fs = cluster.getFileSystem();
try {
{ // test appending to a file.
// create a new file.
Path file1 = new Path("/simpleAppend.dat");
FSDataOutputStream stm = AppendTestUtil.createFile(fs, file1, 1);
System.out.println("Created file simpleAppend.dat");
// write to file
int mid = 186; // io.bytes.per.checksum bytes
System.out.println("Writing " + mid + " bytes to file " + file1);
stm.write(fileContents, 0, mid);
stm.close();
System.out.println("Wrote and Closed first part of file.");
// write to file
int mid2 = 607; // io.bytes.per.checksum bytes
System.out.println("Writing " + mid + " bytes to file " + file1);
stm = fs.append(file1,
EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null);
stm.write(fileContents, mid, mid2-mid);
stm.close();
System.out.println("Wrote and Closed second part of file.");
// write the remainder of the file
stm = fs.append(file1,
EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null);
// ensure getPos is set to reflect existing size of the file
assertTrue(stm.getPos() > 0);
System.out.println("Writing " + (AppendTestUtil.FILE_SIZE - mid2) +
" bytes to file " + file1);
stm.write(fileContents, mid2, AppendTestUtil.FILE_SIZE - mid2);
System.out.println("Written second part of file");
stm.close();
System.out.println("Wrote and Closed second part of file.");
// verify that entire file is good
AppendTestUtil.checkFullFile(fs, file1, AppendTestUtil.FILE_SIZE,
fileContents, "Read 2");
// also make sure there three different blocks for the file
List<LocatedBlock> blocks = fs.getClient().getLocatedBlocks(
file1.toString(), 0L).getLocatedBlocks();
assertEquals(12, blocks.size()); // the block size is 1024
assertEquals(mid, blocks.get(0).getBlockSize());
assertEquals(mid2 - mid, blocks.get(1).getBlockSize());
for (int i = 2; i < 11; i++) {
assertEquals(AppendTestUtil.BLOCK_SIZE, blocks.get(i).getBlockSize());
}
assertEquals((AppendTestUtil.FILE_SIZE - mid2)
% AppendTestUtil.BLOCK_SIZE, blocks.get(11).getBlockSize());
}
{ // test appending to an non-existing file.
FSDataOutputStream out = null;
try {
out = fs.append(new Path("/non-existing.dat"),
EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null);
fail("Expected to have FileNotFoundException");
} catch(java.io.FileNotFoundException fnfe) {
System.out.println("Good: got " + fnfe);
fnfe.printStackTrace(System.out);
} finally {
IOUtils.closeStream(out);
}
}
{ // test append permission.
// set root to all writable
Path root = new Path("/");
fs.setPermission(root, new FsPermission((short)0777));
fs.close();
// login as a different user
final UserGroupInformation superuser =
UserGroupInformation.getCurrentUser();
String username = "testappenduser";
String group = "testappendgroup";
assertFalse(superuser.getShortUserName().equals(username));
assertFalse(Arrays.asList(superuser.getGroupNames()).contains(group));
UserGroupInformation appenduser = UserGroupInformation
.createUserForTesting(username, new String[] { group });
fs = (DistributedFileSystem) DFSTestUtil.getFileSystemAs(appenduser,
conf);
// create a file
Path dir = new Path(root, getClass().getSimpleName());
Path foo = new Path(dir, "foo.dat");
FSDataOutputStream out = null;
int offset = 0;
try {
out = fs.create(foo);
int len = 10 + AppendTestUtil.nextInt(100);
out.write(fileContents, offset, len);
offset += len;
} finally {
IOUtils.closeStream(out);
}
// change dir and foo to minimal permissions.
fs.setPermission(dir, new FsPermission((short)0100));
fs.setPermission(foo, new FsPermission((short)0200));
// try append, should success
out = null;
try {
out = fs.append(foo,
EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null);
int len = 10 + AppendTestUtil.nextInt(100);
out.write(fileContents, offset, len);
offset += len;
} finally {
IOUtils.closeStream(out);
}
// change dir and foo to all but no write on foo.
fs.setPermission(foo, new FsPermission((short)0577));
fs.setPermission(dir, new FsPermission((short)0777));
// try append, should fail
out = null;
try {
out = fs.append(foo,
EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null);
fail("Expected to have AccessControlException");
} catch(AccessControlException ace) {
System.out.println("Good: got " + ace);
ace.printStackTrace(System.out);
} finally {
IOUtils.closeStream(out);
}
}
} finally {
fs.close();
cluster.shutdown();
}
}
//
// an object that does a bunch of appends to files
//
class Workload extends Thread {
private final int id;
private final MiniDFSCluster cluster;
private final boolean appendToNewBlock;
Workload(MiniDFSCluster cluster, int threadIndex) {
Workload(MiniDFSCluster cluster, int threadIndex, boolean append2) {
id = threadIndex;
this.cluster = cluster;
this.appendToNewBlock = append2;
}
// create a bunch of files. Write to them and then verify.
@ -261,7 +416,7 @@ public void run() {
long len = 0;
int sizeToAppend = 0;
try {
FileSystem fs = cluster.getFileSystem();
DistributedFileSystem fs = cluster.getFileSystem();
// add a random number of bytes to file
len = fs.getFileStatus(testfile).getLen();
@ -285,7 +440,9 @@ public void run() {
" appending " + sizeToAppend + " bytes " +
" to file " + testfile +
" of size " + len);
FSDataOutputStream stm = fs.append(testfile);
FSDataOutputStream stm = appendToNewBlock ? fs.append(testfile,
EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null)
: fs.append(testfile);
stm.write(fileContents, (int)len, sizeToAppend);
stm.close();
@ -298,7 +455,7 @@ public void run() {
" expected size " + (len + sizeToAppend) +
" waiting for namenode metadata update.");
Thread.sleep(5000);
} catch (InterruptedException e) {;}
} catch (InterruptedException e) {}
}
assertTrue("File " + testfile + " size is " +
@ -306,7 +463,7 @@ public void run() {
" but expected " + (len + sizeToAppend),
fs.getFileStatus(testfile).getLen() == (len + sizeToAppend));
AppendTestUtil.checkFullFile(fs, testfile, (int)(len + sizeToAppend),
AppendTestUtil.checkFullFile(fs, testfile, (int) (len + sizeToAppend),
fileContents, "Read 2");
} catch (Throwable e) {
globalStatus = false;
@ -331,10 +488,8 @@ public void run() {
/**
* Test that appends to files at random offsets.
* @throws IOException an exception might be thrown
*/
@Test
public void testComplexAppend() throws IOException {
private void testComplexAppend(boolean appendToNewBlock) throws IOException {
fileContents = AppendTestUtil.initBuffer(AppendTestUtil.FILE_SIZE);
Configuration conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 2000);
@ -366,7 +521,7 @@ public void testComplexAppend() throws IOException {
// Create threads and make them run workload concurrently.
workload = new Workload[numThreads];
for (int i = 0; i < numThreads; i++) {
workload[i] = new Workload(cluster, i);
workload[i] = new Workload(cluster, i, appendToNewBlock);
workload[i].start();
}
@ -390,4 +545,14 @@ public void testComplexAppend() throws IOException {
//
assertTrue("testComplexAppend Worker encountered exceptions.", globalStatus);
}
@Test
public void testComplexAppend() throws IOException {
testComplexAppend(false);
}
@Test
public void testComplexAppend2() throws IOException {
testComplexAppend(true);
}
}

View File

@ -24,7 +24,10 @@
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.EnumSet;
import java.util.List;
import org.apache.hadoop.fs.CreateFlag;
import org.mockito.invocation.InvocationOnMock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
@ -36,8 +39,6 @@
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSClientAdapter;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@ -52,6 +53,7 @@
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
import org.apache.log4j.Level;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
@ -121,6 +123,32 @@ public void testTC1() throws Exception {
AppendTestUtil.check(fs, p, len1 + len2);
}
@Test
public void testTC1ForAppend2() throws Exception {
final Path p = new Path("/TC1/foo2");
//a. Create file and write one block of data. Close file.
final int len1 = (int) BLOCK_SIZE;
{
FSDataOutputStream out = fs.create(p, false, buffersize, REPLICATION,
BLOCK_SIZE);
AppendTestUtil.write(out, 0, len1);
out.close();
}
// Reopen file to append. Append half block of data. Close file.
final int len2 = (int) BLOCK_SIZE / 2;
{
FSDataOutputStream out = fs.append(p,
EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null);
AppendTestUtil.write(out, len1, len2);
out.close();
}
// b. Reopen file and read 1.5 blocks worth of data. Close file.
AppendTestUtil.check(fs, p, len1 + len2);
}
/**
* TC2: Append on non-block boundary.
* @throws IOException an exception might be thrown
@ -152,6 +180,40 @@ public void testTC2() throws Exception {
AppendTestUtil.check(fs, p, len1 + len2);
}
@Test
public void testTC2ForAppend2() throws Exception {
final Path p = new Path("/TC2/foo2");
//a. Create file with one and a half block of data. Close file.
final int len1 = (int) (BLOCK_SIZE + BLOCK_SIZE / 2);
{
FSDataOutputStream out = fs.create(p, false, buffersize, REPLICATION,
BLOCK_SIZE);
AppendTestUtil.write(out, 0, len1);
out.close();
}
AppendTestUtil.check(fs, p, len1);
// Reopen file to append quarter block of data. Close file.
final int len2 = (int) BLOCK_SIZE / 4;
{
FSDataOutputStream out = fs.append(p, EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK),
4096, null);
AppendTestUtil.write(out, len1, len2);
out.close();
}
// b. Reopen file and read 1.75 blocks of data. Close file.
AppendTestUtil.check(fs, p, len1 + len2);
List<LocatedBlock> blocks = fs.getClient().getLocatedBlocks(
p.toString(), 0L).getLocatedBlocks();
Assert.assertEquals(3, blocks.size());
Assert.assertEquals(BLOCK_SIZE, blocks.get(0).getBlockSize());
Assert.assertEquals(BLOCK_SIZE / 2, blocks.get(1).getBlockSize());
Assert.assertEquals(BLOCK_SIZE / 4, blocks.get(2).getBlockSize());
}
/**
* TC5: Only one simultaneous append.
* @throws IOException an exception might be thrown
@ -179,18 +241,63 @@ public void testTC5() throws Exception {
AppendTestUtil.LOG.info("GOOD: got an exception", ioe);
}
try {
((DistributedFileSystem) AppendTestUtil
.createHdfsWithDifferentUsername(conf)).append(p,
EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null);
fail("This should fail.");
} catch(IOException ioe) {
AppendTestUtil.LOG.info("GOOD: got an exception", ioe);
}
//d. On Machine M1, close file.
out.close();
}
@Test
public void testTC5ForAppend2() throws Exception {
final Path p = new Path("/TC5/foo2");
// a. Create file on Machine M1. Write half block to it. Close file.
{
FSDataOutputStream out = fs.create(p, false, buffersize, REPLICATION,
BLOCK_SIZE);
AppendTestUtil.write(out, 0, (int)(BLOCK_SIZE/2));
out.close();
}
// b. Reopen file in "append" mode on Machine M1.
FSDataOutputStream out = fs.append(p, EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK),
4096, null);
// c. On Machine M2, reopen file in "append" mode. This should fail.
try {
((DistributedFileSystem) AppendTestUtil
.createHdfsWithDifferentUsername(conf)).append(p,
EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null);
fail("This should fail.");
} catch(IOException ioe) {
AppendTestUtil.LOG.info("GOOD: got an exception", ioe);
}
try {
AppendTestUtil.createHdfsWithDifferentUsername(conf).append(p);
fail("This should fail.");
} catch(IOException ioe) {
AppendTestUtil.LOG.info("GOOD: got an exception", ioe);
}
// d. On Machine M1, close file.
out.close();
}
/**
* TC7: Corrupted replicas are present.
* @throws IOException an exception might be thrown
*/
@Test
public void testTC7() throws Exception {
private void testTC7(boolean appendToNewBlock) throws Exception {
final short repl = 2;
final Path p = new Path("/TC7/foo");
final Path p = new Path("/TC7/foo" + (appendToNewBlock ? "0" : "1"));
System.out.println("p=" + p);
//a. Create file with replication factor of 2. Write half block of data. Close file.
@ -224,7 +331,8 @@ public void testTC7() throws Exception {
//c. Open file in "append mode". Append a new block worth of data. Close file.
final int len2 = (int)BLOCK_SIZE;
{
FSDataOutputStream out = fs.append(p);
FSDataOutputStream out = appendToNewBlock ?
fs.append(p, EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null) : fs.append(p);
AppendTestUtil.write(out, len1, len2);
out.close();
}
@ -233,13 +341,21 @@ public void testTC7() throws Exception {
AppendTestUtil.check(fs, p, len1 + len2);
}
@Test
public void testTC7() throws Exception {
testTC7(false);
}
@Test
public void testTC7ForAppend2() throws Exception {
testTC7(true);
}
/**
* TC11: Racing rename
* @throws IOException an exception might be thrown
*/
@Test
public void testTC11() throws Exception {
final Path p = new Path("/TC11/foo");
private void testTC11(boolean appendToNewBlock) throws Exception {
final Path p = new Path("/TC11/foo" + (appendToNewBlock ? "0" : "1"));
System.out.println("p=" + p);
//a. Create file and write one block of data. Close file.
@ -251,7 +367,9 @@ public void testTC11() throws Exception {
}
//b. Reopen file in "append" mode. Append half block of data.
FSDataOutputStream out = fs.append(p);
FSDataOutputStream out = appendToNewBlock ?
fs.append(p, EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null) :
fs.append(p);
final int len2 = (int)BLOCK_SIZE/2;
AppendTestUtil.write(out, len1, len2);
out.hflush();
@ -283,13 +401,21 @@ public void testTC11() throws Exception {
}
}
@Test
public void testTC11() throws Exception {
testTC11(false);
}
@Test
public void testTC11ForAppend2() throws Exception {
testTC11(true);
}
/**
* TC12: Append to partial CRC chunk
* @throws IOException an exception might be thrown
*/
@Test
public void testTC12() throws Exception {
final Path p = new Path("/TC12/foo");
private void testTC12(boolean appendToNewBlock) throws Exception {
final Path p = new Path("/TC12/foo" + (appendToNewBlock ? "0" : "1"));
System.out.println("p=" + p);
//a. Create file with a block size of 64KB
@ -305,23 +431,43 @@ public void testTC12() throws Exception {
//b. Reopen file in "append" mode. Append another 5877 bytes of data. Close file.
final int len2 = 5877;
{
FSDataOutputStream out = fs.append(p);
FSDataOutputStream out = appendToNewBlock ?
fs.append(p, EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null) :
fs.append(p);
AppendTestUtil.write(out, len1, len2);
out.close();
}
//c. Reopen file and read 25687+5877 bytes of data from file. Close file.
AppendTestUtil.check(fs, p, len1 + len2);
if (appendToNewBlock) {
LocatedBlocks blks = fs.dfs.getLocatedBlocks(p.toString(), 0);
Assert.assertEquals(2, blks.getLocatedBlocks().size());
Assert.assertEquals(len1, blks.getLocatedBlocks().get(0).getBlockSize());
Assert.assertEquals(len2, blks.getLocatedBlocks().get(1).getBlockSize());
AppendTestUtil.check(fs, p, 0, len1);
AppendTestUtil.check(fs, p, len1, len2);
}
}
/** Append to a partial CRC chunk and
* the first write does not fill up the partial CRC trunk
* *
* @throws IOException
*/
@Test
public void testAppendToPartialChunk() throws IOException {
final Path p = new Path("/partialChunk/foo");
public void testTC12() throws Exception {
testTC12(false);
}
@Test
public void testTC12ForAppend2() throws Exception {
testTC12(true);
}
/**
* Append to a partial CRC chunk and the first write does not fill up the
* partial CRC trunk
*/
private void testAppendToPartialChunk(boolean appendToNewBlock)
throws IOException {
final Path p = new Path("/partialChunk/foo"
+ (appendToNewBlock ? "0" : "1"));
final int fileLen = 513;
System.out.println("p=" + p);
@ -336,7 +482,9 @@ public void testAppendToPartialChunk() throws IOException {
System.out.println("Wrote 1 byte and closed the file " + p);
// append to file
stm = fs.append(p);
stm = appendToNewBlock ?
fs.append(p, EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null) :
fs.append(p);
// Append to a partial CRC trunk
stm.write(fileContents, 1, 1);
stm.hflush();
@ -345,7 +493,9 @@ public void testAppendToPartialChunk() throws IOException {
System.out.println("Append 1 byte and closed the file " + p);
// write the remainder of the file
stm = fs.append(p);
stm = appendToNewBlock ?
fs.append(p, EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null) :
fs.append(p);
// ensure getPos is set to reflect existing size of the file
assertEquals(2, stm.getPos());
@ -444,4 +594,14 @@ public void run() {
// if append was called with a stale file stat.
doSmallAppends(file, fs, 20);
}
@Test
public void testAppendToPartialChunk() throws IOException {
testAppendToPartialChunk(false);
}
@Test
public void testAppendToPartialChunkforAppend2() throws IOException {
testAppendToPartialChunk(true);
}
}

View File

@ -99,10 +99,11 @@ public void testAppendRestart() throws Exception {
// OP_ADD to create file
// OP_ADD_BLOCK for first block
// OP_CLOSE to close file
// OP_ADD to reopen file
// OP_APPEND to reopen file
// OP_ADD_BLOCK for second block
// OP_CLOSE to close file
assertEquals(2, (int)counts.get(FSEditLogOpCodes.OP_ADD).held);
assertEquals(1, (int)counts.get(FSEditLogOpCodes.OP_ADD).held);
assertEquals(1, (int)counts.get(FSEditLogOpCodes.OP_APPEND).held);
assertEquals(2, (int)counts.get(FSEditLogOpCodes.OP_ADD_BLOCK).held);
assertEquals(2, (int)counts.get(FSEditLogOpCodes.OP_CLOSE).held);
@ -112,13 +113,14 @@ public void testAppendRestart() throws Exception {
// OP_ADD to create file
// OP_ADD_BLOCK for first block
// OP_CLOSE to close file
// OP_ADD to re-establish the lease
// OP_APPEND to re-establish the lease
// OP_UPDATE_BLOCKS from the updatePipeline call (increments genstamp of last block)
// OP_ADD_BLOCK at the start of the second block
// OP_CLOSE to close file
// Total: 2 OP_ADDs, 1 OP_UPDATE_BLOCKS, 2 OP_ADD_BLOCKs, and 2 OP_CLOSEs
// in addition to the ones above
assertEquals(2+2, (int)counts.get(FSEditLogOpCodes.OP_ADD).held);
assertEquals(2, (int)counts.get(FSEditLogOpCodes.OP_ADD).held);
assertEquals(2, (int)counts.get(FSEditLogOpCodes.OP_APPEND).held);
assertEquals(1, (int)counts.get(FSEditLogOpCodes.OP_UPDATE_BLOCKS).held);
assertEquals(2+2, (int)counts.get(FSEditLogOpCodes.OP_ADD_BLOCK).held);
assertEquals(2+2, (int)counts.get(FSEditLogOpCodes.OP_CLOSE).held);

View File

@ -31,7 +31,9 @@
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.io.IOUtils;
import org.apache.log4j.Level;
import org.junit.Test;
@ -121,7 +123,66 @@ public void hSyncUpdateLength_00() throws IOException {
cluster.shutdown();
}
}
/**
* Test hsync with END_BLOCK flag.
*/
@Test
public void hSyncEndBlock_00() throws IOException {
final int preferredBlockSize = 1024;
Configuration conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, preferredBlockSize);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
.build();
DistributedFileSystem fileSystem = cluster.getFileSystem();
FSDataOutputStream stm = null;
try {
Path path = new Path("/" + fName);
stm = fileSystem.create(path, true, 4096, (short) 2,
AppendTestUtil.BLOCK_SIZE);
System.out.println("Created file " + path.toString());
((DFSOutputStream) stm.getWrappedStream()).hsync(EnumSet
.of(SyncFlag.END_BLOCK));
long currentFileLength = fileSystem.getFileStatus(path).getLen();
assertEquals(0L, currentFileLength);
LocatedBlocks blocks = fileSystem.dfs.getLocatedBlocks(path.toString(), 0);
assertEquals(0, blocks.getLocatedBlocks().size());
// write a block and call hsync(end_block) at the block boundary
stm.write(new byte[preferredBlockSize]);
((DFSOutputStream) stm.getWrappedStream()).hsync(EnumSet
.of(SyncFlag.END_BLOCK));
currentFileLength = fileSystem.getFileStatus(path).getLen();
assertEquals(preferredBlockSize, currentFileLength);
blocks = fileSystem.dfs.getLocatedBlocks(path.toString(), 0);
assertEquals(1, blocks.getLocatedBlocks().size());
// call hsync then call hsync(end_block) immediately
stm.write(new byte[preferredBlockSize / 2]);
stm.hsync();
((DFSOutputStream) stm.getWrappedStream()).hsync(EnumSet
.of(SyncFlag.END_BLOCK));
currentFileLength = fileSystem.getFileStatus(path).getLen();
assertEquals(preferredBlockSize + preferredBlockSize / 2,
currentFileLength);
blocks = fileSystem.dfs.getLocatedBlocks(path.toString(), 0);
assertEquals(2, blocks.getLocatedBlocks().size());
stm.write(new byte[preferredBlockSize / 4]);
stm.hsync();
currentFileLength = fileSystem.getFileStatus(path).getLen();
assertEquals(preferredBlockSize + preferredBlockSize / 2
+ preferredBlockSize / 4, currentFileLength);
blocks = fileSystem.dfs.getLocatedBlocks(path.toString(), 0);
assertEquals(3, blocks.getLocatedBlocks().size());
} finally {
IOUtils.cleanup(null, stm, fileSystem);
if (cluster != null) {
cluster.shutdown();
}
}
}
/**
* The test calls
* {@link #doTheJob(Configuration, String, long, short, boolean, EnumSet)}
@ -133,6 +194,29 @@ public void hSyncUpdateLength_01() throws IOException {
(short) 2, true, EnumSet.of(SyncFlag.UPDATE_LENGTH));
}
/**
* The test calls
* {@link #doTheJob(Configuration, String, long, short, boolean, EnumSet)}
* while requiring the semantic of {@link SyncFlag#END_BLOCK}.
*/
@Test
public void hSyncEndBlock_01() throws IOException {
doTheJob(new HdfsConfiguration(), fName, AppendTestUtil.BLOCK_SIZE,
(short) 2, true, EnumSet.of(SyncFlag.END_BLOCK));
}
/**
* The test calls
* {@link #doTheJob(Configuration, String, long, short, boolean, EnumSet)}
* while requiring the semantic of {@link SyncFlag#END_BLOCK} and
* {@link SyncFlag#UPDATE_LENGTH}.
*/
@Test
public void hSyncEndBlockAndUpdateLength() throws IOException {
doTheJob(new HdfsConfiguration(), fName, AppendTestUtil.BLOCK_SIZE,
(short) 2, true, EnumSet.of(SyncFlag.END_BLOCK, SyncFlag.UPDATE_LENGTH));
}
/**
* The test calls
* {@link #doTheJob(Configuration, String, long, short, boolean, EnumSet)}
@ -152,7 +236,20 @@ public void hSyncUpdateLength_02() throws IOException {
doTheJob(conf, fName, customBlockSize, (short) 2, true,
EnumSet.of(SyncFlag.UPDATE_LENGTH));
}
@Test
public void hSyncEndBlock_02() throws IOException {
Configuration conf = new HdfsConfiguration();
int customPerChecksumSize = 512;
int customBlockSize = customPerChecksumSize * 3;
// Modify defaul filesystem settings
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, customPerChecksumSize);
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, customBlockSize);
doTheJob(conf, fName, customBlockSize, (short) 2, true,
EnumSet.of(SyncFlag.END_BLOCK));
}
/**
* The test calls
* {@link #doTheJob(Configuration, String, long, short, boolean, EnumSet)}
@ -173,7 +270,20 @@ public void hSyncUpdateLength_03() throws IOException {
doTheJob(conf, fName, customBlockSize, (short) 2, true,
EnumSet.of(SyncFlag.UPDATE_LENGTH));
}
@Test
public void hSyncEndBlock_03() throws IOException {
Configuration conf = new HdfsConfiguration();
int customPerChecksumSize = 400;
int customBlockSize = customPerChecksumSize * 3;
// Modify defaul filesystem settings
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, customPerChecksumSize);
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, customBlockSize);
doTheJob(conf, fName, customBlockSize, (short) 2, true,
EnumSet.of(SyncFlag.END_BLOCK));
}
/**
* The method starts new cluster with defined Configuration; creates a file
* with specified block_size and writes 10 equal sections in it; it also calls
@ -197,12 +307,13 @@ public static void doTheJob(Configuration conf, final String fileName,
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(replicas).build();
// Make sure we work with DFS in order to utilize all its functionality
DistributedFileSystem fileSystem =
cluster.getFileSystem();
DistributedFileSystem fileSystem = cluster.getFileSystem();
FSDataInputStream is;
try {
Path path = new Path(fileName);
final String pathName = new Path(fileSystem.getWorkingDirectory(), path)
.toUri().getPath();
FSDataOutputStream stm = fileSystem.create(path, false, 4096, replicas,
block_size);
System.out.println("Created file " + fileName);
@ -210,7 +321,8 @@ public static void doTheJob(Configuration conf, final String fileName,
int tenth = AppendTestUtil.FILE_SIZE/SECTIONS;
int rounding = AppendTestUtil.FILE_SIZE - tenth * SECTIONS;
for (int i=0; i<SECTIONS; i++) {
System.out.println("Writing " + (tenth * i) + " to " + (tenth * (i+1)) + " section to file " + fileName);
System.out.println("Writing " + (tenth * i) + " to "
+ (tenth * (i + 1)) + " section to file " + fileName);
// write to the file
stm.write(fileContent, tenth * i, tenth);
@ -227,7 +339,11 @@ public static void doTheJob(Configuration conf, final String fileName,
assertEquals(
"File size doesn't match for hsync/hflush with updating the length",
tenth * (i + 1), currentFileLength);
} else if (isSync && syncFlags.contains(SyncFlag.END_BLOCK)) {
LocatedBlocks blocks = fileSystem.dfs.getLocatedBlocks(pathName, 0);
assertEquals(i + 1, blocks.getLocatedBlocks().size());
}
byte [] toRead = new byte[tenth];
byte [] expected = new byte[tenth];
System.arraycopy(fileContent, tenth * i, expected, 0, tenth);

View File

@ -22,8 +22,10 @@
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.EnumSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -40,6 +42,7 @@
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.TestInterDatanodeProtocol;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.After;
import org.junit.Test;
@ -124,7 +127,8 @@ public void testBlockSynchronization() throws Exception {
}
DataNode.LOG.info("dfs.dfs.clientName=" + dfs.dfs.clientName);
cluster.getNameNodeRpc().append(filestr, dfs.dfs.clientName);
cluster.getNameNodeRpc().append(filestr, dfs.dfs.clientName,
new EnumSetWritable<>(EnumSet.of(CreateFlag.APPEND)));
// expire lease to trigger block recovery.
waitLeaseRecovery(cluster);

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@ -28,6 +29,7 @@
import org.junit.Test;
import java.io.IOException;
import java.util.EnumSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -234,7 +236,8 @@ public void testAppendIsDenied() throws IOException {
makeTestFile(path, BLOCK_SIZE, true);
try {
client.append(path.toString(), BUFFER_LENGTH, null, null).close();
client.append(path.toString(), BUFFER_LENGTH,
EnumSet.of(CreateFlag.APPEND), null, null).close();
fail("Append to LazyPersist file did not fail as expected");
} catch (Throwable t) {
LOG.info("Got expected exception ", t);

View File

@ -40,9 +40,12 @@
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -99,7 +102,7 @@ public void testConcat() throws IOException, InterruptedException {
HdfsFileStatus fStatus;
FSDataInputStream stm;
String trg = new String("/trg");
String trg = "/trg";
Path trgPath = new Path(trg);
DFSTestUtil.createFile(dfs, trgPath, fileLen, REPL_FACTOR, 1);
fStatus = nn.getFileInfo(trg);
@ -112,7 +115,7 @@ public void testConcat() throws IOException, InterruptedException {
long [] lens = new long [numFiles];
int i = 0;
int i;
for(i=0; i<files.length; i++) {
files[i] = new Path("/file"+i);
Path path = files[i];
@ -385,6 +388,75 @@ public void testIllegalArg() throws IOException {
} catch (Exception e) {
// exspected
}
}
/**
* make sure we update the quota correctly after concat
*/
@Test
public void testConcatWithQuotaDecrease() throws IOException {
final short srcRepl = 3; // note this is different with REPL_FACTOR
final int srcNum = 10;
final Path foo = new Path("/foo");
final Path[] srcs = new Path[srcNum];
final Path target = new Path(foo, "target");
DFSTestUtil.createFile(dfs, target, blockSize, REPL_FACTOR, 0L);
dfs.setQuota(foo, Long.MAX_VALUE - 1, Long.MAX_VALUE - 1);
for (int i = 0; i < srcNum; i++) {
srcs[i] = new Path(foo, "src" + i);
DFSTestUtil.createFile(dfs, srcs[i], blockSize * 2, srcRepl, 0L);
}
ContentSummary summary = dfs.getContentSummary(foo);
Assert.assertEquals(11, summary.getFileCount());
Assert.assertEquals(blockSize * REPL_FACTOR +
blockSize * 2 * srcRepl * srcNum, summary.getSpaceConsumed());
dfs.concat(target, srcs);
summary = dfs.getContentSummary(foo);
Assert.assertEquals(1, summary.getFileCount());
Assert.assertEquals(
blockSize * REPL_FACTOR + blockSize * 2 * REPL_FACTOR * srcNum,
summary.getSpaceConsumed());
}
@Test
public void testConcatWithQuotaIncrease() throws IOException {
final short repl = 3;
final int srcNum = 10;
final Path foo = new Path("/foo");
final Path bar = new Path(foo, "bar");
final Path[] srcs = new Path[srcNum];
final Path target = new Path(bar, "target");
DFSTestUtil.createFile(dfs, target, blockSize, repl, 0L);
final long dsQuota = blockSize * repl + blockSize * srcNum * REPL_FACTOR;
dfs.setQuota(foo, Long.MAX_VALUE - 1, dsQuota);
for (int i = 0; i < srcNum; i++) {
srcs[i] = new Path(bar, "src" + i);
DFSTestUtil.createFile(dfs, srcs[i], blockSize, REPL_FACTOR, 0L);
}
ContentSummary summary = dfs.getContentSummary(bar);
Assert.assertEquals(11, summary.getFileCount());
Assert.assertEquals(dsQuota, summary.getSpaceConsumed());
try {
dfs.concat(target, srcs);
fail("QuotaExceededException expected");
} catch (RemoteException e) {
Assert.assertTrue(
e.unwrapRemoteException() instanceof QuotaExceededException);
}
dfs.setQuota(foo, Long.MAX_VALUE - 1, Long.MAX_VALUE - 1);
dfs.concat(target, srcs);
summary = dfs.getContentSummary(bar);
Assert.assertEquals(1, summary.getFileCount());
Assert.assertEquals(blockSize * repl * (srcNum + 1),
summary.getSpaceConsumed());
}
}

View File

@ -232,14 +232,18 @@ public void testAppend() throws Exception {
// Retried append requests succeed
newCall();
LastBlockWithStatus b = nnRpc.append(src, "holder");
Assert.assertEquals(b, nnRpc.append(src, "holder"));
Assert.assertEquals(b, nnRpc.append(src, "holder"));
LastBlockWithStatus b = nnRpc.append(src, "holder",
new EnumSetWritable<>(EnumSet.of(CreateFlag.APPEND)));
Assert.assertEquals(b, nnRpc.append(src, "holder",
new EnumSetWritable<>(EnumSet.of(CreateFlag.APPEND))));
Assert.assertEquals(b, nnRpc.append(src, "holder",
new EnumSetWritable<>(EnumSet.of(CreateFlag.APPEND))));
// non-retried call fails
newCall();
try {
nnRpc.append(src, "holder");
nnRpc.append(src, "holder",
new EnumSetWritable<>(EnumSet.of(CreateFlag.APPEND)));
Assert.fail("testAppend - expected exception is not thrown");
} catch (Exception e) {
// Expected
@ -409,7 +413,7 @@ public void testRetryCacheRebuild() throws Exception {
LightWeightCache<CacheEntry, CacheEntry> cacheSet =
(LightWeightCache<CacheEntry, CacheEntry>) namesystem.getRetryCache().getCacheSet();
assertEquals(24, cacheSet.size());
assertEquals(25, cacheSet.size());
Map<CacheEntry, CacheEntry> oldEntries =
new HashMap<CacheEntry, CacheEntry>();
@ -428,7 +432,7 @@ public void testRetryCacheRebuild() throws Exception {
assertTrue(namesystem.hasRetryCache());
cacheSet = (LightWeightCache<CacheEntry, CacheEntry>) namesystem
.getRetryCache().getCacheSet();
assertEquals(24, cacheSet.size());
assertEquals(25, cacheSet.size());
iter = cacheSet.iterator();
while (iter.hasNext()) {
CacheEntry entry = iter.next();

View File

@ -163,7 +163,7 @@ public void testRetryCacheOnStandbyNN() throws Exception {
FSNamesystem fsn0 = cluster.getNamesystem(0);
LightWeightCache<CacheEntry, CacheEntry> cacheSet =
(LightWeightCache<CacheEntry, CacheEntry>) fsn0.getRetryCache().getCacheSet();
assertEquals(24, cacheSet.size());
assertEquals(25, cacheSet.size());
Map<CacheEntry, CacheEntry> oldEntries =
new HashMap<CacheEntry, CacheEntry>();
@ -184,7 +184,7 @@ public void testRetryCacheOnStandbyNN() throws Exception {
FSNamesystem fsn1 = cluster.getNamesystem(1);
cacheSet = (LightWeightCache<CacheEntry, CacheEntry>) fsn1
.getRetryCache().getCacheSet();
assertEquals(24, cacheSet.size());
assertEquals(25, cacheSet.size());
iter = cacheSet.iterator();
while (iter.hasNext()) {
CacheEntry entry = iter.next();
@ -438,7 +438,8 @@ void prepare() throws Exception {
@Override
void invoke() throws Exception {
lbk = client.getNamenode().append(fileName, client.getClientName());
lbk = client.getNamenode().append(fileName, client.getClientName(),
new EnumSetWritable<>(EnumSet.of(CreateFlag.APPEND)));
}
// check if the inode of the file is under construction
@ -701,7 +702,8 @@ void prepare() throws Exception {
final Path filePath = new Path(file);
DFSTestUtil.createFile(dfs, filePath, BlockSize, DataNodes, 0);
// append to the file and leave the last block under construction
out = this.client.append(file, BlockSize, null, null);
out = this.client.append(file, BlockSize, EnumSet.of(CreateFlag.APPEND),
null, null);
byte[] appendContent = new byte[100];
new Random().nextBytes(appendContent);
out.write(appendContent);