HDFS-8468. 2 RPC calls for every file read in DFSClient#open(..) resulting in double Audit log entries (Contributed by Vinayakumar B)
This commit is contained in:
parent
093907d728
commit
0b7af27b9a
@ -24,6 +24,7 @@
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.fs.FileEncryptionInfo;
|
||||
import org.apache.hadoop.io.erasurecode.ECSchema;
|
||||
|
||||
/**
|
||||
* Collection of blocks with their locations and the file length.
|
||||
@ -37,6 +38,8 @@ public class LocatedBlocks {
|
||||
private final LocatedBlock lastLocatedBlock;
|
||||
private final boolean isLastBlockComplete;
|
||||
private final FileEncryptionInfo fileEncryptionInfo;
|
||||
private final ECSchema ecSchema;
|
||||
private final int stripeCellSize;
|
||||
|
||||
public LocatedBlocks() {
|
||||
fileLength = 0;
|
||||
@ -45,17 +48,22 @@ public LocatedBlocks() {
|
||||
lastLocatedBlock = null;
|
||||
isLastBlockComplete = false;
|
||||
fileEncryptionInfo = null;
|
||||
ecSchema = null;
|
||||
stripeCellSize = 0;
|
||||
}
|
||||
|
||||
public LocatedBlocks(long flength, boolean isUnderConstuction,
|
||||
List<LocatedBlock> blks, LocatedBlock lastBlock,
|
||||
boolean isLastBlockCompleted, FileEncryptionInfo feInfo) {
|
||||
List<LocatedBlock> blks, LocatedBlock lastBlock,
|
||||
boolean isLastBlockCompleted, FileEncryptionInfo feInfo,
|
||||
ECSchema ecSchema, int stripeCellSize) {
|
||||
fileLength = flength;
|
||||
blocks = blks;
|
||||
underConstruction = isUnderConstuction;
|
||||
this.lastLocatedBlock = lastBlock;
|
||||
this.isLastBlockComplete = isLastBlockCompleted;
|
||||
this.fileEncryptionInfo = feInfo;
|
||||
this.ecSchema = ecSchema;
|
||||
this.stripeCellSize = stripeCellSize;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -111,6 +119,20 @@ public FileEncryptionInfo getFileEncryptionInfo() {
|
||||
return fileEncryptionInfo;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return The ECSchema for ErasureCoded file, null otherwise.
|
||||
*/
|
||||
public ECSchema getECSchema() {
|
||||
return ecSchema;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Stripe Cell size for ErasureCoded file, 0 otherwise.
|
||||
*/
|
||||
public int getStripeCellSize() {
|
||||
return stripeCellSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* Find block containing specified offset.
|
||||
*
|
||||
|
@ -479,7 +479,7 @@ static LocatedBlocks toLocatedBlocks(
|
||||
(Map<?, ?>) m.get("lastLocatedBlock"));
|
||||
final boolean isLastBlockComplete = (Boolean)m.get("isLastBlockComplete");
|
||||
return new LocatedBlocks(fileLength, isUnderConstruction, locatedBlocks,
|
||||
lastLocatedBlock, isLastBlockComplete, null);
|
||||
lastLocatedBlock, isLastBlockComplete, null, null, 0);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -323,3 +323,6 @@
|
||||
|
||||
HDFS-8253. DFSStripedOutputStream.closeThreads releases cellBuffers
|
||||
multiple times. (Kai Sasaki via szetszwo)
|
||||
|
||||
HDFS-8468. 2 RPC calls for every file read in DFSClient#open(..) resulting in
|
||||
double Audit log entries (vinayakumarb)
|
@ -1192,15 +1192,17 @@ public DFSInputStream open(String src, int buffersize, boolean verifyChecksum)
|
||||
// Get block info from namenode
|
||||
TraceScope scope = getPathTraceScope("newDFSInputStream", src);
|
||||
try {
|
||||
HdfsFileStatus fileInfo = getFileInfo(src);
|
||||
if (fileInfo != null) {
|
||||
ECSchema schema = fileInfo.getECSchema();
|
||||
LocatedBlocks locatedBlocks = getLocatedBlocks(src, 0);
|
||||
if (locatedBlocks != null) {
|
||||
ECSchema schema = locatedBlocks.getECSchema();
|
||||
if (schema != null) {
|
||||
return new DFSStripedInputStream(this, src, verifyChecksum, schema,
|
||||
fileInfo.getStripeCellSize());
|
||||
locatedBlocks.getStripeCellSize(), locatedBlocks);
|
||||
}
|
||||
return new DFSInputStream(this, src, verifyChecksum, locatedBlocks);
|
||||
} else {
|
||||
throw new IOException("Cannot open filename " + src);
|
||||
}
|
||||
return new DFSInputStream(this, src, verifyChecksum);
|
||||
} finally {
|
||||
scope.close();
|
||||
}
|
||||
|
@ -253,24 +253,28 @@ void addToDeadNodes(DatanodeInfo dnInfo) {
|
||||
deadNodes.put(dnInfo, dnInfo);
|
||||
}
|
||||
|
||||
DFSInputStream(DFSClient dfsClient, String src, boolean verifyChecksum
|
||||
) throws IOException, UnresolvedLinkException {
|
||||
DFSInputStream(DFSClient dfsClient, String src, boolean verifyChecksum,
|
||||
LocatedBlocks locatedBlocks) throws IOException, UnresolvedLinkException {
|
||||
this.dfsClient = dfsClient;
|
||||
this.verifyChecksum = verifyChecksum;
|
||||
this.src = src;
|
||||
synchronized (infoLock) {
|
||||
this.cachingStrategy = dfsClient.getDefaultReadCachingStrategy();
|
||||
}
|
||||
openInfo();
|
||||
this.locatedBlocks = locatedBlocks;
|
||||
openInfo(false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Grab the open-file info from namenode
|
||||
* @param refreshLocatedBlocks whether to re-fetch locatedblocks
|
||||
*/
|
||||
void openInfo() throws IOException, UnresolvedLinkException {
|
||||
void openInfo(boolean refreshLocatedBlocks) throws IOException,
|
||||
UnresolvedLinkException {
|
||||
final DfsClientConf conf = dfsClient.getConf();
|
||||
synchronized(infoLock) {
|
||||
lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
|
||||
lastBlockBeingWrittenLength =
|
||||
fetchLocatedBlocksAndGetLastBlockLength(refreshLocatedBlocks);
|
||||
int retriesForLastBlockLength = conf.getRetryTimesForGetLastBlockLength();
|
||||
while (retriesForLastBlockLength > 0) {
|
||||
// Getting last block length as -1 is a special case. When cluster
|
||||
@ -282,7 +286,8 @@ void openInfo() throws IOException, UnresolvedLinkException {
|
||||
+ "Datanodes might not have reported blocks completely."
|
||||
+ " Will retry for " + retriesForLastBlockLength + " times");
|
||||
waitFor(conf.getRetryIntervalForGetLastBlockLength());
|
||||
lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
|
||||
lastBlockBeingWrittenLength =
|
||||
fetchLocatedBlocksAndGetLastBlockLength(true);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
@ -303,8 +308,12 @@ private void waitFor(int waitTime) throws IOException {
|
||||
}
|
||||
}
|
||||
|
||||
private long fetchLocatedBlocksAndGetLastBlockLength() throws IOException {
|
||||
final LocatedBlocks newInfo = dfsClient.getLocatedBlocks(src, 0);
|
||||
private long fetchLocatedBlocksAndGetLastBlockLength(boolean refresh)
|
||||
throws IOException {
|
||||
LocatedBlocks newInfo = locatedBlocks;
|
||||
if (locatedBlocks == null || refresh) {
|
||||
newInfo = dfsClient.getLocatedBlocks(src, 0);
|
||||
}
|
||||
if (DFSClient.LOG.isDebugEnabled()) {
|
||||
DFSClient.LOG.debug("newInfo = " + newInfo);
|
||||
}
|
||||
@ -1015,7 +1024,7 @@ private DNAddrPair chooseDataNode(LocatedBlock block,
|
||||
} catch (InterruptedException iex) {
|
||||
}
|
||||
deadNodes.clear(); //2nd option is to remove only nodes[blockId]
|
||||
openInfo();
|
||||
openInfo(true);
|
||||
block = refreshLocatedBlock(block);
|
||||
failures++;
|
||||
}
|
||||
|
@ -24,6 +24,7 @@
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockIdManager;
|
||||
@ -139,9 +140,10 @@ boolean include(long pos) {
|
||||
private final CompletionService<Void> readingService;
|
||||
private ReaderRetryPolicy retry;
|
||||
|
||||
DFSStripedInputStream(DFSClient dfsClient, String src, boolean verifyChecksum,
|
||||
ECSchema schema, int cellSize) throws IOException {
|
||||
super(dfsClient, src, verifyChecksum);
|
||||
DFSStripedInputStream(DFSClient dfsClient, String src,
|
||||
boolean verifyChecksum, ECSchema schema, int cellSize,
|
||||
LocatedBlocks locatedBlocks) throws IOException {
|
||||
super(dfsClient, src, verifyChecksum, locatedBlocks);
|
||||
|
||||
assert schema != null;
|
||||
this.schema = schema;
|
||||
|
@ -1338,8 +1338,9 @@ public static LocatedBlocks convert(LocatedBlocksProto lb) {
|
||||
lb.hasLastBlock() ?
|
||||
PBHelper.convertLocatedBlockProto(lb.getLastBlock()) : null,
|
||||
lb.getIsLastBlockComplete(),
|
||||
lb.hasFileEncryptionInfo() ? convert(lb.getFileEncryptionInfo()) :
|
||||
null);
|
||||
lb.hasFileEncryptionInfo() ? convert(lb.getFileEncryptionInfo()) : null,
|
||||
lb.hasECSchema() ? convertECSchema(lb.getECSchema()) : null,
|
||||
lb.hasStripeCellSize() ? lb.getStripeCellSize() : 0);
|
||||
}
|
||||
|
||||
public static LocatedBlocksProto convert(LocatedBlocks lb) {
|
||||
@ -1355,6 +1356,12 @@ public static LocatedBlocksProto convert(LocatedBlocks lb) {
|
||||
if (lb.getFileEncryptionInfo() != null) {
|
||||
builder.setFileEncryptionInfo(convert(lb.getFileEncryptionInfo()));
|
||||
}
|
||||
if (lb.getECSchema() != null) {
|
||||
builder.setECSchema(convertECSchema(lb.getECSchema()));
|
||||
}
|
||||
if (lb.getStripeCellSize() != 0) {
|
||||
builder.setStripeCellSize(lb.getStripeCellSize());
|
||||
}
|
||||
return builder.setFileLength(lb.getFileLength())
|
||||
.setUnderConstruction(lb.isUnderConstruction())
|
||||
.addAllBlocks(PBHelper.convertLocatedBlocks2(lb.getLocatedBlocks()))
|
||||
|
@ -938,14 +938,18 @@ public LocatedBlocks createLocatedBlocks(final BlockInfo[] blocks,
|
||||
final long fileSizeExcludeBlocksUnderConstruction,
|
||||
final boolean isFileUnderConstruction, final long offset,
|
||||
final long length, final boolean needBlockToken,
|
||||
final boolean inSnapshot, FileEncryptionInfo feInfo)
|
||||
final boolean inSnapshot, FileEncryptionInfo feInfo,
|
||||
ErasureCodingZone ecZone)
|
||||
throws IOException {
|
||||
assert namesystem.hasReadLock();
|
||||
final ECSchema schema = ecZone != null ? ecZone.getSchema() : null;
|
||||
final int cellSize = ecZone != null ? ecZone.getCellSize() : 0;
|
||||
if (blocks == null) {
|
||||
return null;
|
||||
} else if (blocks.length == 0) {
|
||||
return new LocatedBlocks(0, isFileUnderConstruction,
|
||||
Collections.<LocatedBlock>emptyList(), null, false, feInfo);
|
||||
Collections.<LocatedBlock> emptyList(), null, false, feInfo, schema,
|
||||
cellSize);
|
||||
} else {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("blocks = " + java.util.Arrays.asList(blocks));
|
||||
@ -968,9 +972,9 @@ public LocatedBlocks createLocatedBlocks(final BlockInfo[] blocks,
|
||||
fileSizeExcludeBlocksUnderConstruction, mode);
|
||||
isComplete = true;
|
||||
}
|
||||
return new LocatedBlocks(
|
||||
fileSizeExcludeBlocksUnderConstruction, isFileUnderConstruction,
|
||||
locatedblocks, lastlb, isComplete, feInfo);
|
||||
return new LocatedBlocks(fileSizeExcludeBlocksUnderConstruction,
|
||||
isFileUnderConstruction, locatedblocks, lastlb, isComplete, feInfo,
|
||||
schema, cellSize);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -445,6 +445,8 @@ private static HdfsLocatedFileStatus createLocatedFileStatus(
|
||||
final boolean isEncrypted;
|
||||
final FileEncryptionInfo feInfo = isRawPath ? null :
|
||||
fsd.getFileEncryptionInfo(node, snapshot, iip);
|
||||
final ErasureCodingZone ecZone = FSDirErasureCodingOp.getErasureCodingZone(
|
||||
fsd.getFSNamesystem(), iip);
|
||||
if (node.isFile()) {
|
||||
final INodeFile fileNode = node.asFile();
|
||||
size = fileNode.computeFileSize(snapshot);
|
||||
@ -458,7 +460,7 @@ private static HdfsLocatedFileStatus createLocatedFileStatus(
|
||||
|
||||
loc = fsd.getFSNamesystem().getBlockManager().createLocatedBlocks(
|
||||
fileNode.getBlocks(snapshot), fileSize, isUc, 0L, size, false,
|
||||
inSnapshot, feInfo);
|
||||
inSnapshot, feInfo, ecZone);
|
||||
if (loc == null) {
|
||||
loc = new LocatedBlocks();
|
||||
}
|
||||
@ -469,8 +471,6 @@ private static HdfsLocatedFileStatus createLocatedFileStatus(
|
||||
}
|
||||
int childrenNum = node.isDirectory() ?
|
||||
node.asDirectory().getChildrenNum(snapshot) : 0;
|
||||
final ErasureCodingZone ecZone = FSDirErasureCodingOp.getErasureCodingZone(
|
||||
fsd.getFSNamesystem(), iip);
|
||||
final ECSchema schema = ecZone != null ? ecZone.getSchema() : null;
|
||||
final int cellSize = ecZone != null ? ecZone.getCellSize() : 0;
|
||||
|
||||
|
@ -1873,10 +1873,12 @@ private GetBlockLocationsResult getBlockLocationsInt(
|
||||
final FileEncryptionInfo feInfo =
|
||||
FSDirectory.isReservedRawName(srcArg) ? null
|
||||
: dir.getFileEncryptionInfo(inode, iip.getPathSnapshotId(), iip);
|
||||
final ErasureCodingZone ecZone = FSDirErasureCodingOp.getErasureCodingZone(
|
||||
this, iip);
|
||||
|
||||
final LocatedBlocks blocks = blockManager.createLocatedBlocks(
|
||||
inode.getBlocks(iip.getPathSnapshotId()), fileSize, isUc, offset,
|
||||
length, needBlockToken, iip.isSnapshot(), feInfo);
|
||||
length, needBlockToken, iip.isSnapshot(), feInfo, ecZone);
|
||||
|
||||
// Set caching information for the located blocks.
|
||||
for (LocatedBlock lb : blocks.getLocatedBlocks()) {
|
||||
|
@ -301,6 +301,10 @@ message LocatedBlocksProto {
|
||||
optional LocatedBlockProto lastBlock = 4;
|
||||
required bool isLastBlockComplete = 5;
|
||||
optional FileEncryptionInfoProto fileEncryptionInfo = 6;
|
||||
|
||||
// Optional field for erasure coding
|
||||
optional ECSchemaProto eCSchema = 7;
|
||||
optional uint32 stripeCellSize = 8;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -343,7 +343,7 @@ public void testFailuresArePerOperation() throws Exception
|
||||
// we're starting a new operation on the user level.
|
||||
doAnswer(new FailNTimesAnswer(preSpyNN, maxBlockAcquires))
|
||||
.when(spyNN).getBlockLocations(anyString(), anyLong(), anyLong());
|
||||
is.openInfo();
|
||||
is.openInfo(true);
|
||||
// Seek to beginning forces a reopen of the BlockReader - otherwise it'll
|
||||
// just keep reading on the existing stream and the fact that we've poisoned
|
||||
// the block info won't do anything.
|
||||
@ -496,7 +496,7 @@ private LocatedBlocks makeBadBlockList(LocatedBlocks goodBlockList) {
|
||||
badBlocks.add(badLocatedBlock);
|
||||
return new LocatedBlocks(goodBlockList.getFileLength(), false,
|
||||
badBlocks, null, true,
|
||||
null);
|
||||
null, null, 0);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -101,7 +101,7 @@ public void testRefreshBlock() throws Exception {
|
||||
LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(
|
||||
filePath.toString(), 0, BLOCK_GROUP_SIZE * numBlocks);
|
||||
final DFSStripedInputStream in = new DFSStripedInputStream(fs.getClient(),
|
||||
filePath.toString(), false, schema, CELLSIZE);
|
||||
filePath.toString(), false, schema, CELLSIZE, null);
|
||||
|
||||
List<LocatedBlock> lbList = lbs.getLocatedBlocks();
|
||||
for (LocatedBlock aLbList : lbList) {
|
||||
@ -153,7 +153,7 @@ public void testPread() throws Exception {
|
||||
}
|
||||
}
|
||||
DFSStripedInputStream in = new DFSStripedInputStream(fs.getClient(),
|
||||
filePath.toString(), false, schema, CELLSIZE);
|
||||
filePath.toString(), false, schema, CELLSIZE, null);
|
||||
|
||||
int[] startOffsets = {0, 1, CELLSIZE - 102, CELLSIZE, CELLSIZE + 102,
|
||||
CELLSIZE*DATA_BLK_NUM, CELLSIZE*DATA_BLK_NUM + 102,
|
||||
@ -195,7 +195,7 @@ public void testPreadWithDNFailure() throws Exception {
|
||||
}
|
||||
DFSStripedInputStream in =
|
||||
new DFSStripedInputStream(fs.getClient(), filePath.toString(), false,
|
||||
ErasureCodingSchemaManager.getSystemDefaultSchema(), CELLSIZE);
|
||||
ErasureCodingSchemaManager.getSystemDefaultSchema(), CELLSIZE, null);
|
||||
int readSize = BLOCK_GROUP_SIZE;
|
||||
byte[] readBuffer = new byte[readSize];
|
||||
byte[] expected = new byte[readSize];
|
||||
@ -293,7 +293,7 @@ private void testStatefulRead(boolean useByteBuffer, boolean cellMisalignPacket)
|
||||
|
||||
DFSStripedInputStream in =
|
||||
new DFSStripedInputStream(fs.getClient(), filePath.toString(),
|
||||
false, schema, CELLSIZE);
|
||||
false, schema, CELLSIZE, null);
|
||||
|
||||
byte[] expected = new byte[fileSize];
|
||||
|
||||
|
@ -110,7 +110,7 @@ public void testLocatedBlocks2Locations() {
|
||||
l2.setCorrupt(true);
|
||||
|
||||
List<LocatedBlock> ls = Arrays.asList(l1, l2);
|
||||
LocatedBlocks lbs = new LocatedBlocks(10, false, ls, l2, true, null);
|
||||
LocatedBlocks lbs = new LocatedBlocks(10, false, ls, l2, true, null, null, 0);
|
||||
|
||||
BlockLocation[] bs = DFSUtilClient.locatedBlocks2Locations(lbs);
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user