HDFS-10312. Large block reports may fail to decode at NameNode due to 64 MB protobuf maximum length restriction. Contributed by Chris Nauroth.

This commit is contained in:
Chris Nauroth 2016-04-20 13:39:44 -07:00
parent ad36fa6f42
commit 63ac2db59a
7 changed files with 258 additions and 25 deletions

View File

@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hdfs.protocol;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@ -31,6 +33,7 @@
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.Replica;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import com.google.protobuf.CodedInputStream;
@ -63,26 +66,42 @@ public Iterator<BlockReportReplica> iterator() {
};
/**
* Prepare an instance to in-place decode the given ByteString buffer
* Prepare an instance to in-place decode the given ByteString buffer.
* @param numBlocks - blocks in the buffer
* @param blocksBuf - ByteString encoded varints
* @param maxDataLength - maximum allowable data size in protobuf message
* @return BlockListAsLongs
*/
public static BlockListAsLongs decodeBuffer(final int numBlocks,
final ByteString blocksBuf) {
return new BufferDecoder(numBlocks, blocksBuf);
final ByteString blocksBuf, final int maxDataLength) {
return new BufferDecoder(numBlocks, blocksBuf, maxDataLength);
}
/**
* Prepare an instance to in-place decode the given ByteString buffers
* Prepare an instance to in-place decode the given ByteString buffers.
* @param numBlocks - blocks in the buffers
* @param blocksBufs - list of ByteString encoded varints
* @return BlockListAsLongs
*/
@VisibleForTesting
public static BlockListAsLongs decodeBuffers(final int numBlocks,
final List<ByteString> blocksBufs) {
return decodeBuffers(numBlocks, blocksBufs,
IPC_MAXIMUM_DATA_LENGTH_DEFAULT);
}
/**
* Prepare an instance to in-place decode the given ByteString buffers.
* @param numBlocks - blocks in the buffers
* @param blocksBufs - list of ByteString encoded varints
* @param maxDataLength - maximum allowable data size in protobuf message
* @return BlockListAsLongs
*/
public static BlockListAsLongs decodeBuffers(final int numBlocks,
final List<ByteString> blocksBufs, final int maxDataLength) {
// this doesn't actually copy the data
return decodeBuffer(numBlocks, ByteString.copyFrom(blocksBufs));
return decodeBuffer(numBlocks, ByteString.copyFrom(blocksBufs),
maxDataLength);
}
/**
@ -93,7 +112,21 @@ public static BlockListAsLongs decodeBuffers(final int numBlocks,
* @return BlockListAsLongs
*/
public static BlockListAsLongs decodeLongs(List<Long> blocksList) {
return blocksList.isEmpty() ? EMPTY : new LongsDecoder(blocksList);
return decodeLongs(blocksList, IPC_MAXIMUM_DATA_LENGTH_DEFAULT);
}
/**
* Prepare an instance to in-place decode the given list of Longs. Note
* it's much more efficient to decode ByteString buffers and only exists
* for compatibility.
* @param blocksList - list of longs
* @param maxDataLength - maximum allowable data size in protobuf message
* @return BlockListAsLongs
*/
public static BlockListAsLongs decodeLongs(List<Long> blocksList,
int maxDataLength) {
return blocksList.isEmpty() ? EMPTY :
new LongsDecoder(blocksList, maxDataLength);
}
/**
@ -102,17 +135,22 @@ public static BlockListAsLongs decodeLongs(List<Long> blocksList) {
* @param replicas - replicas to encode
* @return BlockListAsLongs
*/
@VisibleForTesting
public static BlockListAsLongs encode(
final Collection<? extends Replica> replicas) {
BlockListAsLongs.Builder builder = builder();
BlockListAsLongs.Builder builder = builder(IPC_MAXIMUM_DATA_LENGTH_DEFAULT);
for (Replica replica : replicas) {
builder.add(replica);
}
return builder.build();
}
public static BlockListAsLongs readFrom(InputStream is) throws IOException {
public static BlockListAsLongs readFrom(InputStream is, int maxDataLength)
throws IOException {
CodedInputStream cis = CodedInputStream.newInstance(is);
if (maxDataLength != IPC_MAXIMUM_DATA_LENGTH_DEFAULT) {
cis.setSizeLimit(maxDataLength);
}
int numBlocks = -1;
ByteString blocksBuf = null;
while (!cis.isAtEnd()) {
@ -133,7 +171,7 @@ public static BlockListAsLongs readFrom(InputStream is) throws IOException {
}
}
if (numBlocks != -1 && blocksBuf != null) {
return decodeBuffer(numBlocks, blocksBuf);
return decodeBuffer(numBlocks, blocksBuf, maxDataLength);
}
return null;
}
@ -145,8 +183,13 @@ public void writeTo(OutputStream os) throws IOException {
cos.flush();
}
@VisibleForTesting
public static Builder builder() {
return new BlockListAsLongs.Builder();
return builder(IPC_MAXIMUM_DATA_LENGTH_DEFAULT);
}
public static Builder builder(int maxDataLength) {
return new BlockListAsLongs.Builder(maxDataLength);
}
/**
@ -221,10 +264,12 @@ public static class Builder {
private final CodedOutputStream cos;
private int numBlocks = 0;
private int numFinalized = 0;
private final int maxDataLength;
Builder() {
Builder(int maxDataLength) {
out = ByteString.newOutput(64*1024);
cos = CodedOutputStream.newInstance(out);
this.maxDataLength = maxDataLength;
}
public void add(Replica replica) {
@ -258,7 +303,8 @@ public BlockListAsLongs build() {
// shouldn't happen, ByteString.Output doesn't throw IOE
throw new IllegalStateException(ioe);
}
return new BufferDecoder(numBlocks, numFinalized, out.toByteString());
return new BufferDecoder(numBlocks, numFinalized, out.toByteString(),
maxDataLength);
}
}
@ -273,16 +319,19 @@ private static class BufferDecoder extends BlockListAsLongs {
private final ByteString buffer;
private final int numBlocks;
private int numFinalized;
private final int maxDataLength;
BufferDecoder(final int numBlocks, final ByteString buf) {
this(numBlocks, -1, buf);
BufferDecoder(final int numBlocks, final ByteString buf,
final int maxDataLength) {
this(numBlocks, -1, buf, maxDataLength);
}
BufferDecoder(final int numBlocks, final int numFinalized,
final ByteString buf) {
final ByteString buf, final int maxDataLength) {
this.numBlocks = numBlocks;
this.numFinalized = numFinalized;
this.buffer = buf;
this.maxDataLength = maxDataLength;
}
@Override
@ -349,6 +398,12 @@ public Iterator<BlockReportReplica> iterator() {
final CodedInputStream cis = buffer.newCodedInput();
private int currentBlockIndex = 0;
{
if (maxDataLength != IPC_MAXIMUM_DATA_LENGTH_DEFAULT) {
cis.setSizeLimit(maxDataLength);
}
}
@Override
public boolean hasNext() {
return currentBlockIndex < numBlocks;
@ -384,12 +439,14 @@ private static class LongsDecoder extends BlockListAsLongs {
private final List<Long> values;
private final int finalizedBlocks;
private final int numBlocks;
private final int maxDataLength;
// set the header
LongsDecoder(List<Long> values) {
LongsDecoder(List<Long> values, int maxDataLength) {
this.values = values.subList(2, values.size());
this.finalizedBlocks = values.get(0).intValue();
this.numBlocks = finalizedBlocks + values.get(1).intValue();
this.maxDataLength = maxDataLength;
}
@Override
@ -399,7 +456,7 @@ public int getNumberOfBlocks() {
@Override
public ByteString getBlocksBuffer() {
Builder builder = builder();
Builder builder = builder(maxDataLength);
for (Replica replica : this) {
builder.add(replica);
}

View File

@ -68,6 +68,8 @@ public class DatanodeProtocolServerSideTranslatorPB implements
DatanodeProtocolPB {
private final DatanodeProtocol impl;
private final int maxDataLength;
private static final ErrorReportResponseProto
VOID_ERROR_REPORT_RESPONSE_PROTO =
ErrorReportResponseProto.newBuilder().build();
@ -81,8 +83,10 @@ public class DatanodeProtocolServerSideTranslatorPB implements
VOID_COMMIT_BLOCK_SYNCHRONIZATION_RESPONSE_PROTO =
CommitBlockSynchronizationResponseProto.newBuilder().build();
public DatanodeProtocolServerSideTranslatorPB(DatanodeProtocol impl) {
public DatanodeProtocolServerSideTranslatorPB(DatanodeProtocol impl,
int maxDataLength) {
this.impl = impl;
this.maxDataLength = maxDataLength;
}
@Override
@ -162,9 +166,10 @@ public BlockReportResponseProto blockReport(RpcController controller,
int num = (int)s.getNumberOfBlocks();
Preconditions.checkState(s.getBlocksCount() == 0,
"cannot send both blocks list and buffers");
blocks = BlockListAsLongs.decodeBuffers(num, s.getBlocksBuffersList());
blocks = BlockListAsLongs.decodeBuffers(num, s.getBlocksBuffersList(),
maxDataLength);
} else {
blocks = BlockListAsLongs.decodeLongs(s.getBlocksList());
blocks = BlockListAsLongs.decodeLongs(s.getBlocksList(), maxDataLength);
}
report[index++] = new StorageBlockReport(PBHelperClient.convert(s.getStorage()),
blocks);

View File

@ -37,6 +37,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CachingGetSpaceUsed;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.GetSpaceUsed;
import org.apache.hadoop.hdfs.DFSConfigKeys;
@ -91,6 +92,7 @@ class BlockPoolSlice {
private AtomicLong numOfBlocks = new AtomicLong();
private final long cachedDfsUsedCheckTime;
private final Timer timer;
private final int maxDataLength;
// TODO:FEDERATION scalability issue - a thread per DU is needed
private final GetSpaceUsed dfsUsage;
@ -128,6 +130,11 @@ class BlockPoolSlice {
conf.getLong(
DFSConfigKeys.DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_MS,
DFSConfigKeys.DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_DEFAULT_MS);
this.maxDataLength = conf.getInt(
CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH,
CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT);
this.timer = timer;
// Files that were being written when the datanode was last shutdown
@ -760,7 +767,8 @@ private boolean readReplicasFromCache(ReplicaMap volumeMap,
FileInputStream inputStream = null;
try {
inputStream = new FileInputStream(replicaFile);
BlockListAsLongs blocksList = BlockListAsLongs.readFrom(inputStream);
BlockListAsLongs blocksList =
BlockListAsLongs.readFrom(inputStream, maxDataLength);
Iterator<BlockReportReplica> iterator = blocksList.iterator();
while (iterator.hasNext()) {
BlockReportReplica replica = iterator.next();

View File

@ -50,6 +50,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
@ -255,6 +256,7 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b)
final LocalFileSystem localFS;
private boolean blockPinningEnabled;
private final int maxDataLength;
/**
* An FSDataset has a directory where it loads its data files.
@ -333,6 +335,9 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b)
blockPinningEnabled = conf.getBoolean(
DFSConfigKeys.DFS_DATANODE_BLOCK_PINNING_ENABLED,
DFSConfigKeys.DFS_DATANODE_BLOCK_PINNING_ENABLED_DEFAULT);
maxDataLength = conf.getInt(
CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH,
CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT);
}
/**
@ -1733,7 +1738,7 @@ public Map<DatanodeStorage, BlockListAsLongs> getBlockReports(String bpid) {
List<FsVolumeImpl> curVolumes = volumes.getVolumes();
for (FsVolumeSpi v : curVolumes) {
builders.put(v.getStorageID(), BlockListAsLongs.builder());
builders.put(v.getStorageID(), BlockListAsLongs.builder(maxDataLength));
}
synchronized(this) {

View File

@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER_COUNT_KEY;
@ -255,8 +257,10 @@ public NameNodeRpcServer(Configuration conf, NameNode nn)
BlockingService clientNNPbService = ClientNamenodeProtocol.
newReflectiveBlockingService(clientProtocolServerTranslator);
int maxDataLength = conf.getInt(IPC_MAXIMUM_DATA_LENGTH,
IPC_MAXIMUM_DATA_LENGTH_DEFAULT);
DatanodeProtocolServerSideTranslatorPB dnProtoPbTranslator =
new DatanodeProtocolServerSideTranslatorPB(this);
new DatanodeProtocolServerSideTranslatorPB(this, maxDataLength);
BlockingService dnProtoPbService = DatanodeProtocolService
.newReflectiveBlockingService(dnProtoPbTranslator);

View File

@ -0,0 +1,143 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH;
import static org.junit.Assert.*;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.datanode.BPOfferService;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetImplTestUtils;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.log4j.Level;
import org.junit.After;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* Tests that very large block reports can pass through the RPC server and
* deserialization layers successfully if configured.
*/
public class TestLargeBlockReport {
private final HdfsConfiguration conf = new HdfsConfiguration();
private MiniDFSCluster cluster;
private DataNode dn;
private BPOfferService bpos;
private DatanodeProtocolClientSideTranslatorPB nnProxy;
private DatanodeRegistration bpRegistration;
private String bpId;
private DatanodeStorage dnStorage;
private final long reportId = 1;
private final long fullBrLeaseId = 0;
private final boolean sorted = true;
@BeforeClass
public static void init() {
DFSTestUtil.setNameNodeLogLevel(Level.WARN);
FsDatasetImplTestUtils.setFsDatasetImplLogLevel(Level.WARN);
}
@After
public void tearDown() {
if (cluster != null) {
cluster.shutdown();
}
}
@Test
public void testBlockReportExceedsLengthLimit() throws Exception {
initCluster();
// Create a large enough report that we expect it will go beyond the RPC
// server's length validation, and also protobuf length validation.
StorageBlockReport[] reports = createReports(6000000);
try {
nnProxy.blockReport(bpRegistration, bpId, reports,
new BlockReportContext(1, 0, reportId, fullBrLeaseId, sorted));
fail("Should have failed because of the too long RPC data length");
} catch (Exception e) {
// Expected. We can't reliably assert anything about the exception type
// or the message. The NameNode just disconnects, and the details are
// buried in the NameNode log.
}
}
@Test
public void testBlockReportSucceedsWithLargerLengthLimit() throws Exception {
conf.setInt(IPC_MAXIMUM_DATA_LENGTH, 128 * 1024 * 1024); // 128 MB
initCluster();
StorageBlockReport[] reports = createReports(6000000);
nnProxy.blockReport(bpRegistration, bpId, reports,
new BlockReportContext(1, 0, reportId, fullBrLeaseId, sorted));
}
/**
* Creates storage block reports, consisting of a single report with the
* requested number of blocks. The block data is fake, because the tests just
* need to validate that the messages can pass correctly. This intentionally
* uses the old-style decoding method as a helper. The test needs to cover
* the new-style encoding technique. Passing through that code path here
* would trigger an exception before the test is ready to deal with it.
*
* @param numBlocks requested number of blocks
* @return storage block reports
*/
private StorageBlockReport[] createReports(int numBlocks) {
int longsPerBlock = 3;
int blockListSize = 2 + numBlocks * longsPerBlock;
List<Long> longs = new ArrayList<Long>(blockListSize);
longs.add(Long.valueOf(numBlocks));
longs.add(0L);
for (int i = 0; i < blockListSize; ++i) {
longs.add(Long.valueOf(i));
}
BlockListAsLongs blockList = BlockListAsLongs.decodeLongs(longs);
StorageBlockReport[] reports = new StorageBlockReport[] {
new StorageBlockReport(dnStorage, blockList) };
return reports;
}
/**
* Start a mini-cluster, and set up everything the tests need to use it.
*
* @throws Exception if initialization fails
*/
private void initCluster() throws Exception {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
dn = cluster.getDataNodes().get(0);
bpos = dn.getAllBpOs().get(0);
nnProxy = bpos.getActiveNN();
bpRegistration = bpos.bpRegistration;
bpId = bpos.getBlockPoolId();
dnStorage = dn.getFSDataset().getBlockReports(bpId).keySet().iterator()
.next();
}
}

View File

@ -42,6 +42,8 @@
import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
import java.io.File;
import java.io.FileNotFoundException;
@ -465,4 +467,13 @@ public void verifyBlockPoolMissing(String bpid) throws IOException {
String.format("Block pool directory %s exists", bpDir));
}
}
/**
* Change the log level used by FsDatasetImpl.
*
* @param level the level to set
*/
public static void setFsDatasetImplLogLevel(Level level) {
GenericTestUtils.setLogLevel(FsDatasetImpl.LOG, level);
}
}