HDFS-7936. Erasure coding: resolving conflicts in the branch when merging trunk changes (this commit mainly addresses HDFS-8081 and HDFS-8048. Contributed by Zhe Zhang.
This commit is contained in:
parent
f53e402635
commit
35797b0889
@ -1109,7 +1109,7 @@ private void actualGetFromOneDataNode(final DNAddrPair datanode,
|
|||||||
int offset, Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
|
int offset, Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
final int length = (int) (end - start + 1);
|
final int length = (int) (end - start + 1);
|
||||||
actualGetFromOneDataNode(datanode, block, start, end, buf,
|
actualGetFromOneDataNode(datanode, blockStartOffset, start, end, buf,
|
||||||
new int[]{offset}, new int[]{length}, corruptedBlockMap);
|
new int[]{offset}, new int[]{length}, corruptedBlockMap);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1128,7 +1128,7 @@ private void actualGetFromOneDataNode(final DNAddrPair datanode,
|
|||||||
* block replica
|
* block replica
|
||||||
*/
|
*/
|
||||||
void actualGetFromOneDataNode(final DNAddrPair datanode,
|
void actualGetFromOneDataNode(final DNAddrPair datanode,
|
||||||
LocatedBlock block, final long startInBlk, final long endInBlk,
|
long blockStartOffset, final long startInBlk, final long endInBlk,
|
||||||
byte[] buf, int[] offsets, int[] lengths,
|
byte[] buf, int[] offsets, int[] lengths,
|
||||||
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
|
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
@ -224,7 +224,7 @@ private LocatedBlock getBlockGroupAt(long offset) throws IOException {
|
|||||||
* Real implementation of pread.
|
* Real implementation of pread.
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
protected void fetchBlockByteRange(LocatedBlock block, long start,
|
protected void fetchBlockByteRange(long blockStartOffset, long start,
|
||||||
long end, byte[] buf, int offset,
|
long end, byte[] buf, int offset,
|
||||||
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
|
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
@ -234,7 +234,7 @@ protected void fetchBlockByteRange(LocatedBlock block, long start,
|
|||||||
int len = (int) (end - start + 1);
|
int len = (int) (end - start + 1);
|
||||||
|
|
||||||
// Refresh the striped block group
|
// Refresh the striped block group
|
||||||
block = getBlockGroupAt(block.getStartOffset());
|
LocatedBlock block = getBlockGroupAt(blockStartOffset);
|
||||||
assert block instanceof LocatedStripedBlock : "NameNode" +
|
assert block instanceof LocatedStripedBlock : "NameNode" +
|
||||||
" should return a LocatedStripedBlock for a striped file";
|
" should return a LocatedStripedBlock for a striped file";
|
||||||
LocatedStripedBlock blockGroup = (LocatedStripedBlock) block;
|
LocatedStripedBlock blockGroup = (LocatedStripedBlock) block;
|
||||||
@ -254,9 +254,11 @@ protected void fetchBlockByteRange(LocatedBlock block, long start,
|
|||||||
DatanodeInfo loc = blks[i].getLocations()[0];
|
DatanodeInfo loc = blks[i].getLocations()[0];
|
||||||
StorageType type = blks[i].getStorageTypes()[0];
|
StorageType type = blks[i].getStorageTypes()[0];
|
||||||
DNAddrPair dnAddr = new DNAddrPair(loc, NetUtils.createSocketAddr(
|
DNAddrPair dnAddr = new DNAddrPair(loc, NetUtils.createSocketAddr(
|
||||||
loc.getXferAddr(dfsClient.getConf().connectToDnViaHostname)), type);
|
loc.getXferAddr(dfsClient.getConf().isConnectToDnViaHostname())),
|
||||||
Callable<Void> readCallable = getFromOneDataNode(dnAddr, blks[i],
|
type);
|
||||||
rp.startOffsetInBlock, rp.startOffsetInBlock + rp.readLength - 1, buf,
|
Callable<Void> readCallable = getFromOneDataNode(dnAddr,
|
||||||
|
blks[i].getStartOffset(), rp.startOffsetInBlock,
|
||||||
|
rp.startOffsetInBlock + rp.readLength - 1, buf,
|
||||||
rp.getOffsets(), rp.getLengths(), corruptedBlockMap, i);
|
rp.getOffsets(), rp.getLengths(), corruptedBlockMap, i);
|
||||||
Future<Void> getFromDNRequest = stripedReadsService.submit(readCallable);
|
Future<Void> getFromDNRequest = stripedReadsService.submit(readCallable);
|
||||||
DFSClient.LOG.debug("Submitting striped read request for " + blks[i]);
|
DFSClient.LOG.debug("Submitting striped read request for " + blks[i]);
|
||||||
@ -272,7 +274,7 @@ protected void fetchBlockByteRange(LocatedBlock block, long start,
|
|||||||
}
|
}
|
||||||
|
|
||||||
private Callable<Void> getFromOneDataNode(final DNAddrPair datanode,
|
private Callable<Void> getFromOneDataNode(final DNAddrPair datanode,
|
||||||
final LocatedBlock block, final long start, final long end,
|
final long blockStartOffset, final long start, final long end,
|
||||||
final byte[] buf, final int[] offsets, final int[] lengths,
|
final byte[] buf, final int[] offsets, final int[] lengths,
|
||||||
final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap,
|
final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap,
|
||||||
final int hedgedReadId) {
|
final int hedgedReadId) {
|
||||||
@ -283,7 +285,7 @@ public Void call() throws Exception {
|
|||||||
TraceScope scope =
|
TraceScope scope =
|
||||||
Trace.startSpan("Parallel reading " + hedgedReadId, parentSpan);
|
Trace.startSpan("Parallel reading " + hedgedReadId, parentSpan);
|
||||||
try {
|
try {
|
||||||
actualGetFromOneDataNode(datanode, block, start,
|
actualGetFromOneDataNode(datanode, blockStartOffset, start,
|
||||||
end, buf, offsets, lengths, corruptedBlockMap);
|
end, buf, offsets, lengths, corruptedBlockMap);
|
||||||
} finally {
|
} finally {
|
||||||
scope.close();
|
scope.close();
|
||||||
|
@ -284,7 +284,8 @@ synchronized void abort() throws IOException {
|
|||||||
}
|
}
|
||||||
for (StripedDataStreamer streamer : streamers) {
|
for (StripedDataStreamer streamer : streamers) {
|
||||||
streamer.setLastException(new IOException("Lease timeout of "
|
streamer.setLastException(new IOException("Lease timeout of "
|
||||||
+ (dfsClient.getHdfsTimeout()/1000) + " seconds expired."));
|
+ (dfsClient.getConf().getHdfsTimeout()/1000) +
|
||||||
|
" seconds expired."));
|
||||||
}
|
}
|
||||||
closeThreads(true);
|
closeThreads(true);
|
||||||
dfsClient.endFileLease(fileId);
|
dfsClient.endFileLease(fileId);
|
||||||
|
@ -5,6 +5,7 @@
|
|||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
|
||||||
import org.apache.hadoop.hdfs.net.Peer;
|
import org.apache.hadoop.hdfs.net.Peer;
|
||||||
import org.apache.hadoop.hdfs.net.TcpPeerServer;
|
import org.apache.hadoop.hdfs.net.TcpPeerServer;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||||
@ -241,7 +242,7 @@ private void testOneFile(String src, int writeBytes)
|
|||||||
}
|
}
|
||||||
|
|
||||||
block.setNumBytes(lenOfBlock);
|
block.setNumBytes(lenOfBlock);
|
||||||
BlockReader blockReader = new BlockReaderFactory(new DFSClient.Conf(conf)).
|
BlockReader blockReader = new BlockReaderFactory(new DfsClientConf(conf)).
|
||||||
setFileName(src).
|
setFileName(src).
|
||||||
setBlock(block).
|
setBlock(block).
|
||||||
setBlockToken(lblock.getBlockToken()).
|
setBlockToken(lblock.getBlockToken()).
|
||||||
|
Loading…
Reference in New Issue
Block a user