HDFS-7435. PB encoding of block reports is very inefficient. Contributed by Daryn Sharp.

This commit is contained in:
Kihwal Lee 2015-03-13 14:13:55 -05:00
parent f446669afb
commit d324164a51
26 changed files with 836 additions and 379 deletions

View File

@ -743,6 +743,9 @@ Release 2.7.0 - UNRELEASED
HDFS-7491. Add incremental blockreport latency to DN metrics.
(Ming Ma via cnauroth)
HDFS-7435. PB encoding of block reports is very inefficient.
(Daryn Sharp via kihwal)
OPTIMIZATIONS
HDFS-7454. Reduce memory footprint for AclEntries in NameNode.

View File

@ -17,342 +17,458 @@
*/
package org.apache.hadoop.hdfs.protocol;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.Replica;
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import com.google.protobuf.CodedInputStream;
import com.google.protobuf.CodedOutputStream;
/**
* This class provides an interface for accessing list of blocks that
* has been implemented as long[].
* This class is useful for block report. Rather than send block reports
* as a Block[] we can send it as a long[].
*
* The structure of the array is as follows:
* 0: the length of the finalized replica list;
* 1: the length of the under-construction replica list;
* - followed by finalized replica list where each replica is represented by
* 3 longs: one for the blockId, one for the block length, and one for
* the generation stamp;
* - followed by the invalid replica represented with three -1s;
* - followed by the under-construction replica list where each replica is
* represented by 4 longs: three for the block id, length, generation
* stamp, and the fourth for the replica state.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class BlockListAsLongs implements Iterable<Block> {
/**
* A finalized block as 3 longs
* block-id and block length and generation stamp
*/
private static final int LONGS_PER_FINALIZED_BLOCK = 3;
/**
* An under-construction block as 4 longs
* block-id and block length, generation stamp and replica state
*/
private static final int LONGS_PER_UC_BLOCK = 4;
/** Number of longs in the header */
private static final int HEADER_SIZE = 2;
/**
* Returns the index of the first long in blockList
* belonging to the specified block.
* The first long contains the block id.
*/
private int index2BlockId(int blockIndex) {
if(blockIndex < 0 || blockIndex > getNumberOfBlocks())
return -1;
int finalizedSize = getNumberOfFinalizedReplicas();
if(blockIndex < finalizedSize)
return HEADER_SIZE + blockIndex * LONGS_PER_FINALIZED_BLOCK;
return HEADER_SIZE + (finalizedSize + 1) * LONGS_PER_FINALIZED_BLOCK
+ (blockIndex - finalizedSize) * LONGS_PER_UC_BLOCK;
}
private final long[] blockList;
/**
* Create block report from finalized and under construction lists of blocks.
*
* @param finalized - list of finalized blocks
* @param uc - list of under construction blocks
*/
public BlockListAsLongs(final List<? extends Replica> finalized,
final List<? extends Replica> uc) {
int finalizedSize = finalized == null ? 0 : finalized.size();
int ucSize = uc == null ? 0 : uc.size();
int len = HEADER_SIZE
+ (finalizedSize + 1) * LONGS_PER_FINALIZED_BLOCK
+ ucSize * LONGS_PER_UC_BLOCK;
blockList = new long[len];
// set the header
blockList[0] = finalizedSize;
blockList[1] = ucSize;
// set finalized blocks
for (int i = 0; i < finalizedSize; i++) {
setBlock(i, finalized.get(i));
}
// set invalid delimiting block
setDelimitingBlock(finalizedSize);
// set under construction blocks
for (int i = 0; i < ucSize; i++) {
setBlock(finalizedSize + i, uc.get(i));
}
}
/**
* Create block report from a list of finalized blocks. Used by
* NNThroughputBenchmark.
*
* @param blocks - list of finalized blocks
*/
public BlockListAsLongs(final List<? extends Block> blocks) {
int finalizedSize = blocks == null ? 0 : blocks.size();
int len = HEADER_SIZE
+ (finalizedSize + 1) * LONGS_PER_FINALIZED_BLOCK;
blockList = new long[len];
// set the header
blockList[0] = finalizedSize;
blockList[1] = 0;
// set finalized blocks
for (int i = 0; i < finalizedSize; i++) {
setBlock(i, blocks.get(i));
}
// set invalid delimiting block
setDelimitingBlock(finalizedSize);
}
public BlockListAsLongs() {
this((long[])null);
}
/**
* Constructor
* @param iBlockList - BlockListALongs create from this long[] parameter
*/
public BlockListAsLongs(final long[] iBlockList) {
if (iBlockList == null) {
blockList = new long[HEADER_SIZE];
return;
}
blockList = iBlockList;
}
public long[] getBlockListAsLongs() {
return blockList;
}
/**
* Iterates over blocks in the block report.
* Avoids object allocation on each iteration.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class BlockReportIterator implements Iterator<Block> {
private int currentBlockIndex;
private final Block block;
private ReplicaState currentReplicaState;
BlockReportIterator() {
this.currentBlockIndex = 0;
this.block = new Block();
this.currentReplicaState = null;
}
public abstract class BlockListAsLongs implements Iterable<BlockReportReplica> {
private final static int CHUNK_SIZE = 64*1024; // 64K
private static long[] EMPTY_LONGS = new long[]{0, 0};
public static BlockListAsLongs EMPTY = new BlockListAsLongs() {
@Override
public boolean hasNext() {
return currentBlockIndex < getNumberOfBlocks();
public int getNumberOfBlocks() {
return 0;
}
@Override
public Block next() {
block.set(blockId(currentBlockIndex),
blockLength(currentBlockIndex),
blockGenerationStamp(currentBlockIndex));
currentReplicaState = blockReplicaState(currentBlockIndex);
currentBlockIndex++;
return block;
public ByteString getBlocksBuffer() {
return ByteString.EMPTY;
}
@Override
public void remove() {
throw new UnsupportedOperationException("Sorry. can't remove.");
public long[] getBlockListAsLongs() {
return EMPTY_LONGS;
}
@Override
public Iterator<BlockReportReplica> iterator() {
return Collections.emptyIterator();
}
};
/**
* Get the state of the current replica.
* The state corresponds to the replica returned
* by the latest {@link #next()}.
*/
public ReplicaState getCurrentReplicaState() {
return currentReplicaState;
}
/**
* Prepare an instance to in-place decode the given ByteString buffer
* @param numBlocks - blocks in the buffer
* @param blocksBuf - ByteString encoded varints
* @return BlockListAsLongs
*/
public static BlockListAsLongs decodeBuffer(final int numBlocks,
final ByteString blocksBuf) {
return new BufferDecoder(numBlocks, blocksBuf);
}
/**
* Returns an iterator over blocks in the block report.
* Prepare an instance to in-place decode the given ByteString buffers
* @param numBlocks - blocks in the buffers
* @param blocksBufs - list of ByteString encoded varints
* @return BlockListAsLongs
*/
@Override
public Iterator<Block> iterator() {
return getBlockReportIterator();
public static BlockListAsLongs decodeBuffers(final int numBlocks,
final List<ByteString> blocksBufs) {
// this doesn't actually copy the data
return decodeBuffer(numBlocks, ByteString.copyFrom(blocksBufs));
}
/**
* Returns {@link BlockReportIterator}.
* Prepare an instance to in-place decode the given list of Longs. Note
* it's much more efficient to decode ByteString buffers and only exists
* for compatibility.
* @param blocksList - list of longs
* @return BlockListAsLongs
*/
public BlockReportIterator getBlockReportIterator() {
return new BlockReportIterator();
public static BlockListAsLongs decodeLongs(List<Long> blocksList) {
return blocksList.isEmpty() ? EMPTY : new LongsDecoder(blocksList);
}
/**
* Prepare an instance to encode the collection of replicas into an
* efficient ByteString.
* @param replicas - replicas to encode
* @return BlockListAsLongs
*/
public static BlockListAsLongs encode(
final Collection<? extends Replica> replicas) {
BlockListAsLongs.Builder builder = builder();
for (Replica replica : replicas) {
builder.add(replica);
}
return builder.build();
}
public static Builder builder() {
return new BlockListAsLongs.Builder();
}
/**
* The number of blocks
* @return - the number of blocks
*/
public int getNumberOfBlocks() {
assert blockList.length == HEADER_SIZE +
(blockList[0] + 1) * LONGS_PER_FINALIZED_BLOCK +
blockList[1] * LONGS_PER_UC_BLOCK :
"Number of blocks is inconcistent with the array length";
return getNumberOfFinalizedReplicas() + getNumberOfUCReplicas();
}
abstract public int getNumberOfBlocks();
/**
* Returns the number of finalized replicas in the block report.
* Very efficient encoding of the block report into a ByteString to avoid
* the overhead of protobuf repeating fields. Primitive repeating fields
* require re-allocs of an ArrayList<Long> and the associated (un)boxing
* overhead which puts pressure on GC.
*
* The structure of the buffer is as follows:
* - each replica is represented by 4 longs:
* blockId, block length, genstamp, replica state
*
* @return ByteString encoded block report
*/
private int getNumberOfFinalizedReplicas() {
return (int)blockList[0];
}
abstract public ByteString getBlocksBuffer();
/**
* Returns the number of under construction replicas in the block report.
* List of ByteStrings that encode this block report
*
* @return ByteStrings
*/
private int getNumberOfUCReplicas() {
return (int)blockList[1];
}
/**
* Returns the id of the specified replica of the block report.
*/
private long blockId(int index) {
return blockList[index2BlockId(index)];
}
/**
* Returns the length of the specified replica of the block report.
*/
private long blockLength(int index) {
return blockList[index2BlockId(index) + 1];
}
/**
* Returns the generation stamp of the specified replica of the block report.
*/
private long blockGenerationStamp(int index) {
return blockList[index2BlockId(index) + 2];
}
/**
* Returns the state of the specified replica of the block report.
*/
private ReplicaState blockReplicaState(int index) {
if(index < getNumberOfFinalizedReplicas())
return ReplicaState.FINALIZED;
return ReplicaState.getState((int)blockList[index2BlockId(index) + 3]);
}
/**
* Corrupt the generation stamp of the block with the given index.
* Not meant to be used outside of tests.
*/
@VisibleForTesting
public long corruptBlockGSForTesting(final int blockIndex, Random rand) {
long oldGS = blockList[index2BlockId(blockIndex) + 2];
while (blockList[index2BlockId(blockIndex) + 2] == oldGS) {
blockList[index2BlockId(blockIndex) + 2] = rand.nextInt();
}
return oldGS;
}
/**
* Corrupt the length of the block with the given index by truncation.
* Not meant to be used outside of tests.
*/
@VisibleForTesting
public long corruptBlockLengthForTesting(final int blockIndex, Random rand) {
long oldLength = blockList[index2BlockId(blockIndex) + 1];
blockList[index2BlockId(blockIndex) + 1] =
rand.nextInt((int) oldLength - 1);
return oldLength;
}
/**
* Set the indexTh block
* @param index - the index of the block to set
* @param r - the block is set to the value of the this Replica
*/
private void setBlock(final int index, final Replica r) {
int pos = index2BlockId(index);
blockList[pos] = r.getBlockId();
blockList[pos + 1] = r.getNumBytes();
blockList[pos + 2] = r.getGenerationStamp();
if(index < getNumberOfFinalizedReplicas())
return;
assert r.getState() != ReplicaState.FINALIZED :
"Must be under-construction replica.";
blockList[pos + 3] = r.getState().getValue();
}
/**
* Set the indexTh block
* @param index - the index of the block to set
* @param b - the block is set to the value of the this Block
*/
private void setBlock(final int index, final Block b) {
int pos = index2BlockId(index);
blockList[pos] = b.getBlockId();
blockList[pos + 1] = b.getNumBytes();
blockList[pos + 2] = b.getGenerationStamp();
}
/**
* Set the invalid delimiting block between the finalized and
* the under-construction lists.
* The invalid block has all three fields set to -1.
* @param finalizedSzie - the size of the finalized list
*/
private void setDelimitingBlock(final int finalizedSzie) {
int idx = HEADER_SIZE + finalizedSzie * LONGS_PER_FINALIZED_BLOCK;
blockList[idx] = -1;
blockList[idx+1] = -1;
blockList[idx+2] = -1;
}
public long getMaxGsInBlockList() {
long maxGs = -1;
Iterator<Block> iter = getBlockReportIterator();
while (iter.hasNext()) {
Block b = iter.next();
if (b.getGenerationStamp() > maxGs) {
maxGs = b.getGenerationStamp();
public List<ByteString> getBlocksBuffers() {
final ByteString blocksBuf = getBlocksBuffer();
final List<ByteString> buffers;
final int size = blocksBuf.size();
if (size <= CHUNK_SIZE) {
buffers = Collections.singletonList(blocksBuf);
} else {
buffers = new ArrayList<ByteString>();
for (int pos=0; pos < size; pos += CHUNK_SIZE) {
// this doesn't actually copy the data
buffers.add(blocksBuf.substring(pos, Math.min(pos+CHUNK_SIZE, size)));
}
}
return maxGs;
return buffers;
}
/**
* Convert block report to old-style list of longs. Only used to
* re-encode the block report when the DN detects an older NN. This is
* inefficient, but in practice a DN is unlikely to be upgraded first
*
* The structure of the array is as follows:
* 0: the length of the finalized replica list;
* 1: the length of the under-construction replica list;
* - followed by finalized replica list where each replica is represented by
* 3 longs: one for the blockId, one for the block length, and one for
* the generation stamp;
* - followed by the invalid replica represented with three -1s;
* - followed by the under-construction replica list where each replica is
* represented by 4 longs: three for the block id, length, generation
* stamp, and the fourth for the replica state.
* @return list of longs
*/
abstract public long[] getBlockListAsLongs();
/**
* Returns a singleton iterator over blocks in the block report. Do not
* add the returned blocks to a collection.
* @return Iterator
*/
abstract public Iterator<BlockReportReplica> iterator();
public static class Builder {
private final ByteString.Output out;
private final CodedOutputStream cos;
private int numBlocks = 0;
private int numFinalized = 0;
Builder() {
out = ByteString.newOutput(64*1024);
cos = CodedOutputStream.newInstance(out);
}
public void add(Replica replica) {
try {
// zig-zag to reduce size of legacy blocks
cos.writeSInt64NoTag(replica.getBlockId());
cos.writeRawVarint64(replica.getBytesOnDisk());
cos.writeRawVarint64(replica.getGenerationStamp());
ReplicaState state = replica.getState();
// although state is not a 64-bit value, using a long varint to
// allow for future use of the upper bits
cos.writeRawVarint64(state.getValue());
if (state == ReplicaState.FINALIZED) {
numFinalized++;
}
numBlocks++;
} catch (IOException ioe) {
// shouldn't happen, ByteString.Output doesn't throw IOE
throw new IllegalStateException(ioe);
}
}
public int getNumberOfBlocks() {
return numBlocks;
}
public BlockListAsLongs build() {
try {
cos.flush();
} catch (IOException ioe) {
// shouldn't happen, ByteString.Output doesn't throw IOE
throw new IllegalStateException(ioe);
}
return new BufferDecoder(numBlocks, numFinalized, out.toByteString());
}
}
// decode new-style ByteString buffer based block report
private static class BufferDecoder extends BlockListAsLongs {
// reserve upper bits for future use. decoding masks off these bits to
// allow compatibility for the current through future release that may
// start using the bits
private static long NUM_BYTES_MASK = (-1L) >>> (64 - 48);
private static long REPLICA_STATE_MASK = (-1L) >>> (64 - 4);
private final ByteString buffer;
private final int numBlocks;
private int numFinalized;
BufferDecoder(final int numBlocks, final ByteString buf) {
this(numBlocks, -1, buf);
}
BufferDecoder(final int numBlocks, final int numFinalized,
final ByteString buf) {
this.numBlocks = numBlocks;
this.numFinalized = numFinalized;
this.buffer = buf;
}
@Override
public int getNumberOfBlocks() {
return numBlocks;
}
@Override
public ByteString getBlocksBuffer() {
return buffer;
}
@Override
public long[] getBlockListAsLongs() {
// terribly inefficient but only occurs if server tries to transcode
// an undecoded buffer into longs - ie. it will never happen but let's
// handle it anyway
if (numFinalized == -1) {
int n = 0;
for (Replica replica : this) {
if (replica.getState() == ReplicaState.FINALIZED) {
n++;
}
}
numFinalized = n;
}
int numUc = numBlocks - numFinalized;
int size = 2 + 3*(numFinalized+1) + 4*(numUc);
long[] longs = new long[size];
longs[0] = numFinalized;
longs[1] = numUc;
int idx = 2;
int ucIdx = idx + 3*numFinalized;
// delimiter block
longs[ucIdx++] = -1;
longs[ucIdx++] = -1;
longs[ucIdx++] = -1;
for (BlockReportReplica block : this) {
switch (block.getState()) {
case FINALIZED: {
longs[idx++] = block.getBlockId();
longs[idx++] = block.getNumBytes();
longs[idx++] = block.getGenerationStamp();
break;
}
default: {
longs[ucIdx++] = block.getBlockId();
longs[ucIdx++] = block.getNumBytes();
longs[ucIdx++] = block.getGenerationStamp();
longs[ucIdx++] = block.getState().getValue();
break;
}
}
}
return longs;
}
@Override
public Iterator<BlockReportReplica> iterator() {
return new Iterator<BlockReportReplica>() {
final BlockReportReplica block = new BlockReportReplica();
final CodedInputStream cis = buffer.newCodedInput();
private int currentBlockIndex = 0;
@Override
public boolean hasNext() {
return currentBlockIndex < numBlocks;
}
@Override
public BlockReportReplica next() {
currentBlockIndex++;
try {
// zig-zag to reduce size of legacy blocks and mask off bits
// we don't (yet) understand
block.setBlockId(cis.readSInt64());
block.setNumBytes(cis.readRawVarint64() & NUM_BYTES_MASK);
block.setGenerationStamp(cis.readRawVarint64());
long state = cis.readRawVarint64() & REPLICA_STATE_MASK;
block.setState(ReplicaState.getState((int)state));
} catch (IOException e) {
throw new IllegalStateException(e);
}
return block;
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
}
// decode old style block report of longs
private static class LongsDecoder extends BlockListAsLongs {
private final List<Long> values;
private final int finalizedBlocks;
private final int numBlocks;
// set the header
LongsDecoder(List<Long> values) {
this.values = values.subList(2, values.size());
this.finalizedBlocks = values.get(0).intValue();
this.numBlocks = finalizedBlocks + values.get(1).intValue();
}
@Override
public int getNumberOfBlocks() {
return numBlocks;
}
@Override
public ByteString getBlocksBuffer() {
Builder builder = builder();
for (Replica replica : this) {
builder.add(replica);
}
return builder.build().getBlocksBuffer();
}
@Override
public long[] getBlockListAsLongs() {
long[] longs = new long[2+values.size()];
longs[0] = finalizedBlocks;
longs[1] = numBlocks - finalizedBlocks;
for (int i=0; i < longs.length; i++) {
longs[i] = values.get(i);
}
return longs;
}
@Override
public Iterator<BlockReportReplica> iterator() {
return new Iterator<BlockReportReplica>() {
private final BlockReportReplica block = new BlockReportReplica();
final Iterator<Long> iter = values.iterator();
private int currentBlockIndex = 0;
@Override
public boolean hasNext() {
return currentBlockIndex < numBlocks;
}
@Override
public BlockReportReplica next() {
if (currentBlockIndex == finalizedBlocks) {
// verify the presence of the delimiter block
readBlock();
Preconditions.checkArgument(block.getBlockId() == -1 &&
block.getNumBytes() == -1 &&
block.getGenerationStamp() == -1,
"Invalid delimiter block");
}
readBlock();
if (currentBlockIndex++ < finalizedBlocks) {
block.setState(ReplicaState.FINALIZED);
} else {
block.setState(ReplicaState.getState(iter.next().intValue()));
}
return block;
}
private void readBlock() {
block.setBlockId(iter.next());
block.setNumBytes(iter.next());
block.setGenerationStamp(iter.next());
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
}
@InterfaceAudience.Private
public static class BlockReportReplica extends Block implements Replica {
private ReplicaState state;
private BlockReportReplica() {
}
public BlockReportReplica(Block block) {
super(block);
if (block instanceof BlockReportReplica) {
this.state = ((BlockReportReplica)block).getState();
} else {
this.state = ReplicaState.FINALIZED;
}
}
public void setState(ReplicaState state) {
this.state = state;
}
@Override
public ReplicaState getState() {
return state;
}
@Override
public long getBytesOnDisk() {
return getNumBytes();
}
@Override
public long getVisibleLength() {
throw new UnsupportedOperationException();
}
@Override
public String getStorageUuid() {
throw new UnsupportedOperationException();
}
@Override
public boolean isOnTransientStorage() {
throw new UnsupportedOperationException();
}
@Override
public boolean equals(Object o) {
return super.equals(o);
}
@Override
public int hashCode() {
return super.hashCode();
}
}
}

View File

@ -26,6 +26,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@ -51,6 +52,7 @@
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo.Capability;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
@ -64,6 +66,7 @@
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
@ -83,6 +86,11 @@ public class DatanodeProtocolClientSideTranslatorPB implements
VersionRequestProto.newBuilder().build();
private final static RpcController NULL_CONTROLLER = null;
@VisibleForTesting
public DatanodeProtocolClientSideTranslatorPB(DatanodeProtocolPB rpcProxy) {
this.rpcProxy = rpcProxy;
}
public DatanodeProtocolClientSideTranslatorPB(InetSocketAddress nameNodeAddr,
Configuration conf) throws IOException {
RPC.setProtocolEngine(conf, DatanodeProtocolPB.class,
@ -166,12 +174,20 @@ public DatanodeCommand blockReport(DatanodeRegistration registration,
.newBuilder().setRegistration(PBHelper.convert(registration))
.setBlockPoolId(poolId);
boolean useBlocksBuffer = registration.getNamespaceInfo()
.isCapabilitySupported(Capability.STORAGE_BLOCK_REPORT_BUFFERS);
for (StorageBlockReport r : reports) {
StorageBlockReportProto.Builder reportBuilder = StorageBlockReportProto
.newBuilder().setStorage(PBHelper.convert(r.getStorage()));
long[] blocks = r.getBlocks();
for (int i = 0; i < blocks.length; i++) {
reportBuilder.addBlocks(blocks[i]);
BlockListAsLongs blocks = r.getBlocks();
if (useBlocksBuffer) {
reportBuilder.setNumberOfBlocks(blocks.getNumberOfBlocks());
reportBuilder.addAllBlocksBuffers(blocks.getBlocksBuffers());
} else {
for (long value : blocks.getBlockListAsLongs()) {
reportBuilder.addBlocks(value);
}
}
builder.addReports(reportBuilder.build());
}

View File

@ -21,6 +21,7 @@
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.RollingUpgradeStatus;
@ -58,6 +59,7 @@
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import com.google.common.base.Preconditions;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
@ -145,10 +147,14 @@ public BlockReportResponseProto blockReport(RpcController controller,
int index = 0;
for (StorageBlockReportProto s : request.getReportsList()) {
List<Long> blockIds = s.getBlocksList();
long[] blocks = new long[blockIds.size()];
for (int i = 0; i < blockIds.size(); i++) {
blocks[i] = blockIds.get(i);
final BlockListAsLongs blocks;
if (s.hasNumberOfBlocks()) { // new style buffer based reports
int num = (int)s.getNumberOfBlocks();
Preconditions.checkState(s.getBlocksCount() == 0,
"cannot send both blocks list and buffers");
blocks = BlockListAsLongs.decodeBuffers(num, s.getBlocksBuffersList());
} else {
blocks = BlockListAsLongs.decodeLongs(s.getBlocksList());
}
report[index++] = new StorageBlockReport(PBHelper.convert(s.getStorage()),
blocks);

View File

@ -573,7 +573,7 @@ public static NamespaceInfo convert(NamespaceInfoProto info) {
StorageInfoProto storage = info.getStorageInfo();
return new NamespaceInfo(storage.getNamespceID(), storage.getClusterID(),
info.getBlockPoolID(), storage.getCTime(), info.getBuildVersion(),
info.getSoftwareVersion());
info.getSoftwareVersion(), info.getCapabilities());
}
public static NamenodeCommand convert(NamenodeCommandProto cmd) {
@ -1233,7 +1233,9 @@ public static NamespaceInfoProto convert(NamespaceInfo info) {
.setBuildVersion(info.getBuildVersion())
.setUnused(0)
.setStorageInfo(PBHelper.convert((StorageInfo)info))
.setSoftwareVersion(info.getSoftwareVersion()).build();
.setSoftwareVersion(info.getSoftwareVersion())
.setCapabilities(info.getCapabilities())
.build();
}
// Located Block Arrays and Lists

View File

@ -47,7 +47,7 @@
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportIterator;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@ -1968,11 +1968,9 @@ private void processFirstBlockReport(
if (report == null) return;
assert (namesystem.hasWriteLock());
assert (storageInfo.numBlocks() == 0);
BlockReportIterator itBR = report.getBlockReportIterator();
while(itBR.hasNext()) {
Block iblk = itBR.next();
ReplicaState reportedState = itBR.getCurrentReplicaState();
for (BlockReportReplica iblk : report) {
ReplicaState reportedState = iblk.getState();
if (shouldPostponeBlocksFromFuture &&
namesystem.isGenStampInFuture(iblk)) {
@ -2042,13 +2040,11 @@ private void reportDiff(DatanodeStorageInfo storageInfo,
int curIndex;
if (newReport == null) {
newReport = new BlockListAsLongs();
newReport = BlockListAsLongs.EMPTY;
}
// scan the report and process newly reported blocks
BlockReportIterator itBR = newReport.getBlockReportIterator();
while(itBR.hasNext()) {
Block iblk = itBR.next();
ReplicaState iState = itBR.getCurrentReplicaState();
for (BlockReportReplica iblk : newReport) {
ReplicaState iState = iblk.getState();
BlockInfoContiguous storedBlock = processReportedBlock(storageInfo,
iblk, iState, toAdd, toInvalidate, toCorrupt, toUC);

View File

@ -228,7 +228,7 @@ private void connectToNNAndHandshake() throws IOException {
bpos.verifyAndSetNamespaceInfo(nsInfo);
// Second phase of the handshake with the NN.
register();
register(nsInfo);
}
// This is useful to make sure NN gets Heartbeat before Blockreport
@ -468,8 +468,7 @@ List<DatanodeCommand> blockReport() throws IOException {
for(Map.Entry<DatanodeStorage, BlockListAsLongs> kvPair : perVolumeBlockLists.entrySet()) {
BlockListAsLongs blockList = kvPair.getValue();
reports[i++] = new StorageBlockReport(
kvPair.getKey(), blockList.getBlockListAsLongs());
reports[i++] = new StorageBlockReport(kvPair.getKey(), blockList);
totalBlockCount += blockList.getNumberOfBlocks();
}
@ -774,10 +773,11 @@ private void offerService() throws Exception {
*
* issued by the namenode to recognize registered datanodes.
*
* @param nsInfo current NamespaceInfo
* @see FSNamesystem#registerDatanode(DatanodeRegistration)
* @throws IOException
*/
void register() throws IOException {
void register(NamespaceInfo nsInfo) throws IOException {
// The handshake() phase loaded the block pool storage
// off disk - so update the bpRegistration object from that info
bpRegistration = bpos.createRegistration();
@ -788,6 +788,7 @@ void register() throws IOException {
try {
// Use returned registration from namenode with updated fields
bpRegistration = bpNamenode.registerDatanode(bpRegistration);
bpRegistration.setNamespaceInfo(nsInfo);
break;
} catch(EOFException e) { // namenode might have just restarted
LOG.info("Problem connecting to server: " + nnAddr + " :"
@ -915,9 +916,9 @@ void reRegister() throws IOException {
if (shouldRun()) {
// re-retrieve namespace info to make sure that, if the NN
// was restarted, we still match its version (HDFS-2120)
retrieveNamespaceInfo();
NamespaceInfo nsInfo = retrieveNamespaceInfo();
// and re-register
register();
register(nsInfo);
scheduleHeartbeat();
}
}

View File

@ -1575,30 +1575,26 @@ public Map<DatanodeStorage, BlockListAsLongs> getBlockReports(String bpid) {
Map<DatanodeStorage, BlockListAsLongs> blockReportsMap =
new HashMap<DatanodeStorage, BlockListAsLongs>();
Map<String, ArrayList<ReplicaInfo>> finalized =
new HashMap<String, ArrayList<ReplicaInfo>>();
Map<String, ArrayList<ReplicaInfo>> uc =
new HashMap<String, ArrayList<ReplicaInfo>>();
Map<String, BlockListAsLongs.Builder> builders =
new HashMap<String, BlockListAsLongs.Builder>();
List<FsVolumeImpl> curVolumes = getVolumes();
for (FsVolumeSpi v : curVolumes) {
finalized.put(v.getStorageID(), new ArrayList<ReplicaInfo>());
uc.put(v.getStorageID(), new ArrayList<ReplicaInfo>());
builders.put(v.getStorageID(), BlockListAsLongs.builder());
}
synchronized(this) {
for (ReplicaInfo b : volumeMap.replicas(bpid)) {
switch(b.getState()) {
case FINALIZED:
finalized.get(b.getVolume().getStorageID()).add(b);
break;
case RBW:
case RWR:
uc.get(b.getVolume().getStorageID()).add(b);
builders.get(b.getVolume().getStorageID()).add(b);
break;
case RUR:
ReplicaUnderRecovery rur = (ReplicaUnderRecovery)b;
uc.get(rur.getVolume().getStorageID()).add(rur.getOriginalReplica());
builders.get(rur.getVolume().getStorageID())
.add(rur.getOriginalReplica());
break;
case TEMPORARY:
break;
@ -1609,10 +1605,8 @@ public Map<DatanodeStorage, BlockListAsLongs> getBlockReports(String bpid) {
}
for (FsVolumeImpl v : curVolumes) {
ArrayList<ReplicaInfo> finalizedList = finalized.get(v.getStorageID());
ArrayList<ReplicaInfo> ucList = uc.get(v.getStorageID());
blockReportsMap.put(v.toDatanodeStorage(),
new BlockListAsLongs(finalizedList, ucList));
builders.get(v.getStorageID()).build());
}
return blockReportsMap;

View File

@ -1302,7 +1302,7 @@ public DatanodeCommand blockReport(DatanodeRegistration nodeReg,
final BlockManager bm = namesystem.getBlockManager();
boolean noStaleStorages = false;
for(StorageBlockReport r : reports) {
final BlockListAsLongs blocks = new BlockListAsLongs(r.getBlocks());
final BlockListAsLongs blocks = r.getBlocks();
//
// BlockManager.processReport accumulates information of prior calls
// for the same node and storage, so the value returned by the last

View File

@ -40,6 +40,7 @@ public class DatanodeRegistration extends DatanodeID
private final StorageInfo storageInfo;
private ExportedBlockKeys exportedKeys;
private final String softwareVersion;
private NamespaceInfo nsInfo;
@VisibleForTesting
public DatanodeRegistration(String uuid, DatanodeRegistration dnr) {
@ -78,6 +79,14 @@ public int getVersion() {
return storageInfo.getLayoutVersion();
}
public void setNamespaceInfo(NamespaceInfo nsInfo) {
this.nsInfo = nsInfo;
}
public NamespaceInfo getNamespaceInfo() {
return nsInfo;
}
@Override // NodeRegistration
public String getRegistrationID() {
return Storage.getRegistrationID(storageInfo);

View File

@ -29,6 +29,9 @@
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
import org.apache.hadoop.util.VersionInfo;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
/**
* NamespaceInfo is returned by the name-node in reply
* to a data-node handshake.
@ -40,19 +43,52 @@ public class NamespaceInfo extends StorageInfo {
final String buildVersion;
String blockPoolID = ""; // id of the block pool
String softwareVersion;
long capabilities;
// only authoritative on the server-side to determine advertisement to
// clients. enum will update the supported values
private static long CAPABILITIES_SUPPORTED = 0;
public enum Capability {
UNKNOWN(false),
STORAGE_BLOCK_REPORT_BUFFERS(true); // use optimized ByteString buffers
private final long mask;
Capability(boolean isSupported) {
int bits = ordinal() - 1;
mask = (bits < 0) ? 0 : (1L << bits);
if (isSupported) {
CAPABILITIES_SUPPORTED |= mask;
}
}
public long getMask() {
return mask;
}
}
// defaults to enabled capabilites since this ctor is for server
public NamespaceInfo() {
super(NodeType.NAME_NODE);
buildVersion = null;
capabilities = CAPABILITIES_SUPPORTED;
}
// defaults to enabled capabilites since this ctor is for server
public NamespaceInfo(int nsID, String clusterID, String bpID,
long cT, String buildVersion, String softwareVersion) {
this(nsID, clusterID, bpID, cT, buildVersion, softwareVersion,
CAPABILITIES_SUPPORTED);
}
// for use by server and/or client
public NamespaceInfo(int nsID, String clusterID, String bpID,
long cT, String buildVersion, String softwareVersion,
long capabilities) {
super(HdfsConstants.NAMENODE_LAYOUT_VERSION, nsID, clusterID, cT,
NodeType.NAME_NODE);
blockPoolID = bpID;
this.buildVersion = buildVersion;
this.softwareVersion = softwareVersion;
this.capabilities = capabilities;
}
public NamespaceInfo(int nsID, String clusterID, String bpID,
@ -61,6 +97,22 @@ public NamespaceInfo(int nsID, String clusterID, String bpID,
VersionInfo.getVersion());
}
public long getCapabilities() {
return capabilities;
}
@VisibleForTesting
public void setCapabilities(long capabilities) {
this.capabilities = capabilities;
}
public boolean isCapabilitySupported(Capability capability) {
Preconditions.checkArgument(capability != Capability.UNKNOWN,
"cannot test for unknown capability");
long mask = capability.getMask();
return (capabilities & mask) == mask;
}
public String getBuildVersion() {
return buildVersion;
}

View File

@ -18,14 +18,16 @@
package org.apache.hadoop.hdfs.server.protocol;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
/**
* Block report for a Datanode storage
*/
public class StorageBlockReport {
private final DatanodeStorage storage;
private final long[] blocks;
private final BlockListAsLongs blocks;
public StorageBlockReport(DatanodeStorage storage, long[] blocks) {
public StorageBlockReport(DatanodeStorage storage, BlockListAsLongs blocks) {
this.storage = storage;
this.blocks = blocks;
}
@ -34,7 +36,7 @@ public DatanodeStorage getStorage() {
return storage;
}
public long[] getBlocks() {
public BlockListAsLongs getBlocks() {
return blocks;
}
}

View File

@ -237,6 +237,8 @@ message BlockReportRequestProto {
message StorageBlockReportProto {
required DatanodeStorageProto storage = 1; // Storage
repeated uint64 blocks = 2 [packed=true];
optional uint64 numberOfBlocks = 3;
repeated bytes blocksBuffers = 4;
}
/**

View File

@ -517,6 +517,7 @@ message NamespaceInfoProto {
required string blockPoolID = 3; // block pool used by the namespace
required StorageInfoProto storageInfo = 4;// Node information
required string softwareVersion = 5; // Software version number (e.g. 2.0.0)
optional uint64 capabilities = 6 [default = 0]; // feature flags
}
/**

View File

@ -0,0 +1,237 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocol;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportResponseProto;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolPB;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
import org.apache.hadoop.hdfs.server.datanode.Replica;
import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo.Capability;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import com.google.protobuf.ByteString;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
public class TestBlockListAsLongs {
static Block b1 = new Block(1, 11, 111);
static Block b2 = new Block(2, 22, 222);
static Block b3 = new Block(3, 33, 333);
static Block b4 = new Block(4, 44, 444);
@Test
public void testEmptyReport() {
BlockListAsLongs blocks = checkReport();
assertArrayEquals(
new long[] {
0, 0,
-1, -1, -1 },
blocks.getBlockListAsLongs());
}
@Test
public void testFinalized() {
BlockListAsLongs blocks = checkReport(
new FinalizedReplica(b1, null, null));
assertArrayEquals(
new long[] {
1, 0,
1, 11, 111,
-1, -1, -1 },
blocks.getBlockListAsLongs());
}
@Test
public void testUc() {
BlockListAsLongs blocks = checkReport(
new ReplicaBeingWritten(b1, null, null, null));
assertArrayEquals(
new long[] {
0, 1,
-1, -1, -1,
1, 11, 111, ReplicaState.RBW.getValue() },
blocks.getBlockListAsLongs());
}
@Test
public void testMix() {
BlockListAsLongs blocks = checkReport(
new FinalizedReplica(b1, null, null),
new FinalizedReplica(b2, null, null),
new ReplicaBeingWritten(b3, null, null, null),
new ReplicaWaitingToBeRecovered(b4, null, null));
assertArrayEquals(
new long[] {
2, 2,
1, 11, 111,
2, 22, 222,
-1, -1, -1,
3, 33, 333, ReplicaState.RBW.getValue(),
4, 44, 444, ReplicaState.RWR.getValue() },
blocks.getBlockListAsLongs());
}
@Test
public void testFuzz() throws InterruptedException {
Replica[] replicas = new Replica[100000];
Random rand = new Random(0);
for (int i=0; i<replicas.length; i++) {
Block b = new Block(rand.nextLong(), i, i<<4);
switch (rand.nextInt(2)) {
case 0:
replicas[i] = new FinalizedReplica(b, null, null);
break;
case 1:
replicas[i] = new ReplicaBeingWritten(b, null, null, null);
break;
case 2:
replicas[i] = new ReplicaWaitingToBeRecovered(b, null, null);
break;
}
}
checkReport(replicas);
}
private BlockListAsLongs checkReport(Replica...replicas) {
Map<Long, Replica> expectedReplicas = new HashMap<>();
for (Replica replica : replicas) {
expectedReplicas.put(replica.getBlockId(), replica);
}
expectedReplicas = Collections.unmodifiableMap(expectedReplicas);
// encode the blocks and extract the buffers
BlockListAsLongs blocks =
BlockListAsLongs.encode(expectedReplicas.values());
List<ByteString> buffers = blocks.getBlocksBuffers();
// convert to old-style list of longs
List<Long> longs = new ArrayList<Long>();
for (long value : blocks.getBlockListAsLongs()) {
longs.add(value);
}
// decode the buffers and verify its contents
BlockListAsLongs decodedBlocks =
BlockListAsLongs.decodeBuffers(expectedReplicas.size(), buffers);
checkReplicas(expectedReplicas, decodedBlocks);
// decode the long and verify its contents
BlockListAsLongs decodedList = BlockListAsLongs.decodeLongs(longs);
checkReplicas(expectedReplicas, decodedList);
return blocks;
}
private void checkReplicas(Map<Long,Replica> expectedReplicas,
BlockListAsLongs decodedBlocks) {
assertEquals(expectedReplicas.size(), decodedBlocks.getNumberOfBlocks());
Map<Long, Replica> reportReplicas = new HashMap<>(expectedReplicas);
for (BlockReportReplica replica : decodedBlocks) {
assertNotNull(replica);
Replica expected = reportReplicas.remove(replica.getBlockId());
assertNotNull(expected);
assertEquals("wrong bytes",
expected.getNumBytes(), replica.getNumBytes());
assertEquals("wrong genstamp",
expected.getGenerationStamp(), replica.getGenerationStamp());
assertEquals("wrong replica state",
expected.getState(), replica.getState());
}
assertTrue(reportReplicas.isEmpty());
}
@Test
public void testDatanodeDetect() throws ServiceException, IOException {
final AtomicReference<BlockReportRequestProto> request =
new AtomicReference<>();
// just capture the outgoing PB
DatanodeProtocolPB mockProxy = mock(DatanodeProtocolPB.class);
doAnswer(new Answer<BlockReportResponseProto>() {
public BlockReportResponseProto answer(InvocationOnMock invocation) {
Object[] args = invocation.getArguments();
request.set((BlockReportRequestProto) args[1]);
return BlockReportResponseProto.newBuilder().build();
}
}).when(mockProxy).blockReport(any(RpcController.class),
any(BlockReportRequestProto.class));
@SuppressWarnings("resource")
DatanodeProtocolClientSideTranslatorPB nn =
new DatanodeProtocolClientSideTranslatorPB(mockProxy);
DatanodeRegistration reg = DFSTestUtil.getLocalDatanodeRegistration();
NamespaceInfo nsInfo = new NamespaceInfo(1, "cluster", "bp", 1);
reg.setNamespaceInfo(nsInfo);
Replica r = new FinalizedReplica(new Block(1, 2, 3), null, null);
BlockListAsLongs bbl = BlockListAsLongs.encode(Collections.singleton(r));
DatanodeStorage storage = new DatanodeStorage("s1");
StorageBlockReport[] sbr = { new StorageBlockReport(storage, bbl) };
// check DN sends new-style BR
request.set(null);
nsInfo.setCapabilities(Capability.STORAGE_BLOCK_REPORT_BUFFERS.getMask());
nn.blockReport(reg, "pool", sbr);
BlockReportRequestProto proto = request.get();
assertNotNull(proto);
assertTrue(proto.getReports(0).getBlocksList().isEmpty());
assertFalse(proto.getReports(0).getBlocksBuffersList().isEmpty());
// back up to prior version and check DN sends old-style BR
request.set(null);
nsInfo.setCapabilities(Capability.UNKNOWN.getMask());
nn.blockReport(reg, "pool", sbr);
proto = request.get();
assertNotNull(proto);
assertFalse(proto.getReports(0).getBlocksList().isEmpty());
assertTrue(proto.getReports(0).getBlocksBuffersList().isEmpty());
}
}

View File

@ -555,12 +555,12 @@ public void testSafeModeIBR() throws Exception {
reset(node);
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
new BlockListAsLongs(null, null));
BlockListAsLongs.EMPTY);
assertEquals(1, ds.getBlockReportCount());
// send block report again, should NOT be processed
reset(node);
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
new BlockListAsLongs(null, null));
BlockListAsLongs.EMPTY);
assertEquals(1, ds.getBlockReportCount());
// re-register as if node restarted, should update existing node
@ -571,7 +571,7 @@ public void testSafeModeIBR() throws Exception {
// send block report, should be processed after restart
reset(node);
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
new BlockListAsLongs(null, null));
BlockListAsLongs.EMPTY);
// Reinitialize as registration with empty storage list pruned
// node.storageMap.
ds = node.getStorageInfos()[0];
@ -600,7 +600,7 @@ public void testSafeModeIBRAfterIncremental() throws Exception {
reset(node);
doReturn(1).when(node).numBlocks();
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
new BlockListAsLongs(null, null));
BlockListAsLongs.EMPTY);
assertEquals(1, ds.getBlockReportCount());
}

View File

@ -46,6 +46,7 @@
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
@ -146,22 +147,32 @@ private static StorageBlockReport[] getBlockReports(
// Walk the list of blocks until we find one each to corrupt the
// generation stamp and length, if so requested.
for (int i = 0; i < blockList.getNumberOfBlocks(); ++i) {
BlockListAsLongs.Builder builder = BlockListAsLongs.builder();
for (BlockReportReplica block : blockList) {
if (corruptOneBlockGs && !corruptedGs) {
blockList.corruptBlockGSForTesting(i, rand);
LOG.info("Corrupted the GS for block ID " + i);
long gsOld = block.getGenerationStamp();
long gsNew;
do {
gsNew = rand.nextInt();
} while (gsNew == gsOld);
block.setGenerationStamp(gsNew);
LOG.info("Corrupted the GS for block ID " + block);
corruptedGs = true;
} else if (corruptOneBlockLen && !corruptedLen) {
blockList.corruptBlockLengthForTesting(i, rand);
LOG.info("Corrupted the length for block ID " + i);
long lenOld = block.getNumBytes();
long lenNew;
do {
lenNew = rand.nextInt((int)lenOld - 1);
} while (lenNew == lenOld);
block.setNumBytes(lenNew);
LOG.info("Corrupted the length for block ID " + block);
corruptedLen = true;
} else {
break;
}
builder.add(new BlockReportReplica(block));
}
reports[reportIndex++] =
new StorageBlockReport(dnStorage, blockList.getBlockListAsLongs());
new StorageBlockReport(dnStorage, builder.build());
}
return reports;

View File

@ -52,7 +52,6 @@
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
@ -271,7 +270,7 @@ synchronized public long getVisibleLength() {
@Override
public ReplicaState getState() {
return null;
return finalized ? ReplicaState.FINALIZED : ReplicaState.RBW;
}
@Override
@ -529,7 +528,7 @@ public SimulatedFSDataset(DataStorage storage, Configuration conf) {
}
public synchronized void injectBlocks(String bpid,
Iterable<Block> injectBlocks) throws IOException {
Iterable<? extends Block> injectBlocks) throws IOException {
ExtendedBlock blk = new ExtendedBlock();
if (injectBlocks != null) {
for (Block b: injectBlocks) { // if any blocks in list is bad, reject list
@ -582,16 +581,16 @@ public synchronized void unfinalizeBlock(ExtendedBlock b) throws IOException{
}
synchronized BlockListAsLongs getBlockReport(String bpid) {
final List<Replica> blocks = new ArrayList<Replica>();
BlockListAsLongs.Builder report = BlockListAsLongs.builder();
final Map<Block, BInfo> map = blockMap.get(bpid);
if (map != null) {
for (BInfo b : map.values()) {
if (b.isFinalized()) {
blocks.add(b);
report.add(b);
}
}
}
return new BlockListAsLongs(blocks, null);
return report.build();
}
@Override

View File

@ -107,17 +107,18 @@ public void testBlockHasMultipleReplicasOnSameDN() throws IOException {
StorageBlockReport reports[] =
new StorageBlockReport[cluster.getStoragesPerDatanode()];
ArrayList<Block> blocks = new ArrayList<Block>();
ArrayList<Replica> blocks = new ArrayList<Replica>();
for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) {
blocks.add(locatedBlock.getBlock().getLocalBlock());
Block localBlock = locatedBlock.getBlock().getLocalBlock();
blocks.add(new FinalizedReplica(localBlock, null, null));
}
BlockListAsLongs bll = BlockListAsLongs.encode(blocks);
for (int i = 0; i < cluster.getStoragesPerDatanode(); ++i) {
BlockListAsLongs bll = new BlockListAsLongs(blocks);
FsVolumeSpi v = dn.getFSDataset().getVolumes().get(i);
DatanodeStorage dns = new DatanodeStorage(v.getStorageID());
reports[i] = new StorageBlockReport(dns, bll.getBlockListAsLongs());
reports[i] = new StorageBlockReport(dns, bll);
}
// Should not assert!

View File

@ -25,11 +25,9 @@
import static org.junit.Assume.assumeTrue;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -188,7 +186,7 @@ public void testVolumeFailure() throws Exception {
DatanodeStorage dnStorage = kvPair.getKey();
BlockListAsLongs blockList = kvPair.getValue();
reports[reportIndex++] =
new StorageBlockReport(dnStorage, blockList.getBlockListAsLongs());
new StorageBlockReport(dnStorage, blockList);
}
cluster.getNameNodeRpc().blockReport(dnR, bpid, reports);

View File

@ -98,7 +98,7 @@ private void verifyCapturedArguments(
assertThat(reports.length, is(expectedReportsPerCall));
for (StorageBlockReport report : reports) {
BlockListAsLongs blockList = new BlockListAsLongs(report.getBlocks());
BlockListAsLongs blockList = report.getBlocks();
numBlocksReported += blockList.getNumberOfBlocks();
}
}

View File

@ -195,7 +195,7 @@ public Map<DatanodeStorage, BlockListAsLongs> getBlockReports(String bpid) {
final Map<DatanodeStorage, BlockListAsLongs> result =
new HashMap<DatanodeStorage, BlockListAsLongs>();
result.put(storage, new BlockListAsLongs(null, null));
result.put(storage, BlockListAsLongs.EMPTY);
return result;
}

View File

@ -26,6 +26,7 @@
import java.util.List;
import com.google.common.base.Preconditions;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -36,6 +37,7 @@
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@ -891,9 +893,9 @@ private static class TinyDatanode implements Comparable<String> {
NamespaceInfo nsInfo;
DatanodeRegistration dnRegistration;
DatanodeStorage storage; //only one storage
final ArrayList<Block> blocks;
final ArrayList<BlockReportReplica> blocks;
int nrBlocks; // actual number of blocks
long[] blockReportList;
BlockListAsLongs blockReportList;
final int dnIdx;
private static int getNodePort(int num) throws IOException {
@ -904,7 +906,7 @@ private static int getNodePort(int num) throws IOException {
TinyDatanode(int dnIdx, int blockCapacity) throws IOException {
this.dnIdx = dnIdx;
this.blocks = new ArrayList<Block>(blockCapacity);
this.blocks = new ArrayList<BlockReportReplica>(blockCapacity);
this.nrBlocks = 0;
}
@ -934,8 +936,7 @@ void register() throws IOException {
//first block reports
storage = new DatanodeStorage(DatanodeStorage.generateUuid());
final StorageBlockReport[] reports = {
new StorageBlockReport(storage,
new BlockListAsLongs(null, null).getBlockListAsLongs())
new StorageBlockReport(storage, BlockListAsLongs.EMPTY)
};
nameNodeProto.blockReport(dnRegistration,
nameNode.getNamesystem().getBlockPoolId(), reports);
@ -968,19 +969,21 @@ boolean addBlock(Block blk) {
}
return false;
}
blocks.set(nrBlocks, blk);
blocks.set(nrBlocks, new BlockReportReplica(blk));
nrBlocks++;
return true;
}
void formBlockReport() {
// fill remaining slots with blocks that do not exist
for(int idx = blocks.size()-1; idx >= nrBlocks; idx--)
blocks.set(idx, new Block(blocks.size() - idx, 0, 0));
blockReportList = new BlockListAsLongs(blocks).getBlockListAsLongs();
for (int idx = blocks.size()-1; idx >= nrBlocks; idx--) {
Block block = new Block(blocks.size() - idx, 0, 0);
blocks.set(idx, new BlockReportReplica(block));
}
blockReportList = BlockListAsLongs.EMPTY;
}
long[] getBlockReportList() {
BlockListAsLongs getBlockReportList() {
return blockReportList;
}

View File

@ -30,6 +30,7 @@
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
@ -104,7 +105,7 @@ public void testDeadDatanode() throws Exception {
// Ensure blockReport from dead datanode is rejected with IOException
StorageBlockReport[] report = { new StorageBlockReport(
new DatanodeStorage(reg.getDatanodeUuid()),
new long[] { 0L, 0L, 0L }) };
BlockListAsLongs.EMPTY) };
try {
dnp.blockReport(reg, poolId, report);
fail("Expected IOException is not thrown");

View File

@ -41,6 +41,7 @@
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
import org.apache.hadoop.hdfs.util.MD5FileUtils;
import org.apache.hadoop.test.GenericTestUtils;
@ -219,6 +220,7 @@ public void testZeroBlockSize() throws Exception {
.manageDataDfsDirs(false)
.manageNameDfsDirs(false)
.waitSafeMode(false)
.startupOption(StartupOption.UPGRADE)
.build();
try {
FileSystem fs = cluster.getFileSystem();

View File

@ -32,6 +32,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes;
import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion;
import org.apache.hadoop.hdfs.server.namenode.OfflineEditsViewerHelper;
import org.apache.hadoop.hdfs.tools.offlineEditsViewer.OfflineEditsViewer.Flags;
import org.apache.hadoop.test.PathUtils;
@ -140,8 +141,8 @@ public void testRecoveryMode() throws IOException {
assertEquals(0, runOev(editsReparsed, editsParsedXml2, "xml", false));
// judgment time
assertTrue("Test round trip",
filesEqualIgnoreTrailingZeros(editsParsedXml, editsParsedXml2));
assertTrue("Test round trip", FileUtils.contentEqualsIgnoreEOL(
new File(editsParsedXml), new File(editsParsedXml2), "UTF-8"));
os.close();
}
@ -238,6 +239,10 @@ private boolean filesEqualIgnoreTrailingZeros(String filenameSmall,
ByteBuffer small = ByteBuffer.wrap(DFSTestUtil.loadFile(filenameSmall));
ByteBuffer large = ByteBuffer.wrap(DFSTestUtil.loadFile(filenameLarge));
// OEV outputs with the latest layout version, so tweak the old file's
// contents to have latest version so checkedin binary files don't
// require frequent updates
small.put(3, (byte)NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
// now correct if it's otherwise
if (small.capacity() > large.capacity()) {