svn merge -c -1428729 . for reverting HDFS-4352. Encapsulate arguments to BlockReaderFactory in a class
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1430663 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
837e17b2ea
commit
fab2cbc2c1
@ -177,9 +177,6 @@ Trunk (Unreleased)
|
||||
HDFS-4346. Add SequentialNumber as a base class for INodeId and
|
||||
GenerationStamp. (szetszwo)
|
||||
|
||||
HDFS-4352. Encapsulate arguments to BlockReaderFactory in a class
|
||||
(Colin Patrick McCabe via todd)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
@ -22,6 +22,7 @@
|
||||
import java.net.Socket;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.DFSClient.Conf;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
|
||||
@ -39,150 +40,71 @@
|
||||
@InterfaceAudience.Private
|
||||
public class BlockReaderFactory {
|
||||
/**
|
||||
* Parameters for creating a BlockReader.
|
||||
*
|
||||
* Before you add something to here: think about whether it's already included
|
||||
* in Conf (or should be).
|
||||
* @see #newBlockReader(Conf, Socket, String, ExtendedBlock, Token, long, long, int, boolean, String)
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public static class Params {
|
||||
private final Conf conf;
|
||||
private Socket socket = null;
|
||||
private String file = null;
|
||||
private ExtendedBlock block = null;
|
||||
private Token<BlockTokenIdentifier> blockToken = null;
|
||||
private long startOffset = 0;
|
||||
private long len = -1;
|
||||
private int bufferSize;
|
||||
private boolean verifyChecksum = true;
|
||||
private boolean shortCircuitLocalReads = false;
|
||||
private String clientName = "";
|
||||
private DataEncryptionKey encryptionKey = null;
|
||||
private IOStreamPair ioStreamPair = null;
|
||||
|
||||
public Params(Conf conf) {
|
||||
this.conf = conf;
|
||||
this.bufferSize = conf.ioBufferSize;
|
||||
}
|
||||
public Conf getConf() {
|
||||
return conf;
|
||||
}
|
||||
public Socket getSocket() {
|
||||
return socket;
|
||||
}
|
||||
public Params setSocket(Socket socket) {
|
||||
this.socket = socket;
|
||||
return this;
|
||||
}
|
||||
public String getFile() {
|
||||
return file;
|
||||
}
|
||||
public Params setFile(String file) {
|
||||
this.file = file;
|
||||
return this;
|
||||
}
|
||||
public ExtendedBlock getBlock() {
|
||||
return block;
|
||||
}
|
||||
public Params setBlock(ExtendedBlock block) {
|
||||
this.block = block;
|
||||
return this;
|
||||
}
|
||||
public Token<BlockTokenIdentifier> getBlockToken() {
|
||||
return blockToken;
|
||||
}
|
||||
public Params setBlockToken(Token<BlockTokenIdentifier> blockToken) {
|
||||
this.blockToken = blockToken;
|
||||
return this;
|
||||
}
|
||||
public long getStartOffset() {
|
||||
return startOffset;
|
||||
}
|
||||
public Params setStartOffset(long startOffset) {
|
||||
this.startOffset = startOffset;
|
||||
return this;
|
||||
}
|
||||
public long getLen() {
|
||||
return len;
|
||||
}
|
||||
public Params setLen(long len) {
|
||||
this.len = len;
|
||||
return this;
|
||||
}
|
||||
public int getBufferSize() {
|
||||
return bufferSize;
|
||||
}
|
||||
public Params setBufferSize(int bufferSize) {
|
||||
this.bufferSize = bufferSize;
|
||||
return this;
|
||||
}
|
||||
public boolean getVerifyChecksum() {
|
||||
return verifyChecksum;
|
||||
}
|
||||
public Params setVerifyChecksum(boolean verifyChecksum) {
|
||||
this.verifyChecksum = verifyChecksum;
|
||||
return this;
|
||||
}
|
||||
public boolean getShortCircuitLocalReads() {
|
||||
return shortCircuitLocalReads;
|
||||
}
|
||||
public Params setShortCircuitLocalReads(boolean on) {
|
||||
this.shortCircuitLocalReads = on;
|
||||
return this;
|
||||
}
|
||||
public String getClientName() {
|
||||
return clientName;
|
||||
}
|
||||
public Params setClientName(String clientName) {
|
||||
this.clientName = clientName;
|
||||
return this;
|
||||
}
|
||||
public Params setEncryptionKey(DataEncryptionKey encryptionKey) {
|
||||
this.encryptionKey = encryptionKey;
|
||||
return this;
|
||||
}
|
||||
public DataEncryptionKey getEncryptionKey() {
|
||||
return encryptionKey;
|
||||
}
|
||||
public IOStreamPair getIoStreamPair() {
|
||||
return ioStreamPair;
|
||||
}
|
||||
public Params setIoStreamPair(IOStreamPair ioStreamPair) {
|
||||
this.ioStreamPair = ioStreamPair;
|
||||
return this;
|
||||
}
|
||||
public static BlockReader newBlockReader(
|
||||
Configuration conf,
|
||||
Socket sock, String file,
|
||||
ExtendedBlock block, Token<BlockTokenIdentifier> blockToken,
|
||||
long startOffset, long len, DataEncryptionKey encryptionKey)
|
||||
throws IOException {
|
||||
int bufferSize = conf.getInt(DFSConfigKeys.IO_FILE_BUFFER_SIZE_KEY,
|
||||
DFSConfigKeys.IO_FILE_BUFFER_SIZE_DEFAULT);
|
||||
return newBlockReader(new Conf(conf),
|
||||
sock, file, block, blockToken, startOffset,
|
||||
len, bufferSize, true, "", encryptionKey, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new BlockReader specifically to satisfy a read.
|
||||
* This method also sends the OP_READ_BLOCK request.
|
||||
*
|
||||
* @param params The parameters
|
||||
*
|
||||
* @return New BlockReader instance
|
||||
* @throws IOException If there was an error creating the BlockReader
|
||||
* @param conf the DFSClient configuration
|
||||
* @param sock An established Socket to the DN. The BlockReader will not close it normally
|
||||
* @param file File location
|
||||
* @param block The block object
|
||||
* @param blockToken The block token for security
|
||||
* @param startOffset The read offset, relative to block head
|
||||
* @param len The number of bytes to read
|
||||
* @param bufferSize The IO buffer size (not the client buffer size)
|
||||
* @param verifyChecksum Whether to verify checksum
|
||||
* @param clientName Client name
|
||||
* @return New BlockReader instance, or null on error.
|
||||
*/
|
||||
@SuppressWarnings("deprecation")
|
||||
public static BlockReader newBlockReader(Params params) throws IOException {
|
||||
if (params.getConf().useLegacyBlockReader) {
|
||||
if (params.getEncryptionKey() != null) {
|
||||
public static BlockReader newBlockReader(
|
||||
Conf conf,
|
||||
Socket sock, String file,
|
||||
ExtendedBlock block,
|
||||
Token<BlockTokenIdentifier> blockToken,
|
||||
long startOffset, long len,
|
||||
int bufferSize, boolean verifyChecksum,
|
||||
String clientName,
|
||||
DataEncryptionKey encryptionKey,
|
||||
IOStreamPair ioStreams)
|
||||
throws IOException {
|
||||
|
||||
if (conf.useLegacyBlockReader) {
|
||||
if (encryptionKey != null) {
|
||||
throw new RuntimeException("Encryption is not supported with the legacy block reader.");
|
||||
}
|
||||
return RemoteBlockReader.newBlockReader(params);
|
||||
return RemoteBlockReader.newBlockReader(
|
||||
sock, file, block, blockToken, startOffset, len, bufferSize, verifyChecksum, clientName);
|
||||
} else {
|
||||
Socket sock = params.getSocket();
|
||||
if (params.getIoStreamPair() == null) {
|
||||
params.setIoStreamPair(new IOStreamPair(NetUtils.getInputStream(sock),
|
||||
NetUtils.getOutputStream(sock, HdfsServerConstants.WRITE_TIMEOUT)));
|
||||
if (params.getEncryptionKey() != null) {
|
||||
if (ioStreams == null) {
|
||||
ioStreams = new IOStreamPair(NetUtils.getInputStream(sock),
|
||||
NetUtils.getOutputStream(sock, HdfsServerConstants.WRITE_TIMEOUT));
|
||||
if (encryptionKey != null) {
|
||||
IOStreamPair encryptedStreams =
|
||||
DataTransferEncryptor.getEncryptedStreams(
|
||||
params.getIoStreamPair().out, params.getIoStreamPair().in,
|
||||
params.getEncryptionKey());
|
||||
params.setIoStreamPair(encryptedStreams);
|
||||
ioStreams.out, ioStreams.in, encryptionKey);
|
||||
ioStreams = encryptedStreams;
|
||||
}
|
||||
}
|
||||
return RemoteBlockReader2.newBlockReader(params);
|
||||
|
||||
return RemoteBlockReader2.newBlockReader(
|
||||
sock, file, block, blockToken, startOffset, len, bufferSize,
|
||||
verifyChecksum, clientName, encryptionKey, ioStreams);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -200,8 +200,7 @@ public class DFSClient implements java.io.Closeable {
|
||||
/**
|
||||
* DFSClient configuration
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public static class Conf {
|
||||
static class Conf {
|
||||
final int maxFailoverAttempts;
|
||||
final int failoverSleepBaseMillis;
|
||||
final int failoverSleepMaxMillis;
|
||||
@ -229,7 +228,7 @@ public static class Conf {
|
||||
final int getFileBlockStorageLocationsNumThreads;
|
||||
final int getFileBlockStorageLocationsTimeout;
|
||||
|
||||
public Conf(Configuration conf) {
|
||||
Conf(Configuration conf) {
|
||||
maxFailoverAttempts = conf.getInt(
|
||||
DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY,
|
||||
DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT);
|
||||
|
@ -934,15 +934,15 @@ protected BlockReader getBlockReader(InetSocketAddress dnAddr,
|
||||
|
||||
try {
|
||||
// The OP_READ_BLOCK request is sent as we make the BlockReader
|
||||
BlockReader reader = BlockReaderFactory.
|
||||
newBlockReader(new BlockReaderFactory.Params(dfsClient.getConf()).
|
||||
setFile(file).setBlock(block).setBlockToken(blockToken).
|
||||
setStartOffset(startOffset).setLen(len).
|
||||
setBufferSize(bufferSize).setVerifyChecksum(verifyChecksum).
|
||||
setClientName(clientName).
|
||||
setEncryptionKey(dfsClient.getDataEncryptionKey()).
|
||||
setIoStreamPair(sockAndStreams == null ? null : sockAndStreams.ioStreams).
|
||||
setSocket(sock));
|
||||
BlockReader reader =
|
||||
BlockReaderFactory.newBlockReader(dfsClient.getConf(),
|
||||
sock, file, block,
|
||||
blockToken,
|
||||
startOffset, len,
|
||||
bufferSize, verifyChecksum,
|
||||
clientName,
|
||||
dfsClient.getDataEncryptionKey(),
|
||||
sockAndStreams == null ? null : sockAndStreams.ioStreams);
|
||||
return reader;
|
||||
} catch (IOException ex) {
|
||||
// Our socket is no good.
|
||||
|
@ -349,6 +349,13 @@ private RemoteBlockReader(String file, String bpid, long blockId,
|
||||
checksumSize = this.checksum.getChecksumSize();
|
||||
}
|
||||
|
||||
public static RemoteBlockReader newBlockReader(Socket sock, String file,
|
||||
ExtendedBlock block, Token<BlockTokenIdentifier> blockToken,
|
||||
long startOffset, long len, int bufferSize) throws IOException {
|
||||
return newBlockReader(sock, file, block, blockToken, startOffset,
|
||||
len, bufferSize, true, "");
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new BlockReader specifically to satisfy a read.
|
||||
* This method also sends the OP_READ_BLOCK request.
|
||||
@ -364,26 +371,29 @@ private RemoteBlockReader(String file, String bpid, long blockId,
|
||||
* @param clientName Client name
|
||||
* @return New BlockReader instance, or null on error.
|
||||
*/
|
||||
public static RemoteBlockReader newBlockReader(BlockReaderFactory.Params params)
|
||||
throws IOException {
|
||||
public static RemoteBlockReader newBlockReader( Socket sock, String file,
|
||||
ExtendedBlock block,
|
||||
Token<BlockTokenIdentifier> blockToken,
|
||||
long startOffset, long len,
|
||||
int bufferSize, boolean verifyChecksum,
|
||||
String clientName)
|
||||
throws IOException {
|
||||
// in and out will be closed when sock is closed (by the caller)
|
||||
Socket sock = params.getSocket();
|
||||
final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
|
||||
NetUtils.getOutputStream(sock, HdfsServerConstants.WRITE_TIMEOUT)));
|
||||
new Sender(out).readBlock(params.getBlock(), params.getBlockToken(),
|
||||
params.getClientName(), params.getStartOffset(), params.getLen());
|
||||
|
||||
new Sender(out).readBlock(block, blockToken, clientName, startOffset, len);
|
||||
|
||||
//
|
||||
// Get bytes in block, set streams
|
||||
//
|
||||
|
||||
DataInputStream in = new DataInputStream(
|
||||
new BufferedInputStream(NetUtils.getInputStream(sock),
|
||||
params.getBufferSize()));
|
||||
|
||||
bufferSize));
|
||||
|
||||
BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
|
||||
vintPrefixed(in));
|
||||
RemoteBlockReader2.checkSuccess(status, sock, params.getBlock(),
|
||||
params.getFile());
|
||||
RemoteBlockReader2.checkSuccess(status, sock, block, file);
|
||||
ReadOpChecksumInfoProto checksumInfo =
|
||||
status.getReadOpChecksumInfo();
|
||||
DataChecksum checksum = DataTransferProtoUtil.fromProto(
|
||||
@ -393,16 +403,15 @@ public static RemoteBlockReader newBlockReader(BlockReaderFactory.Params params)
|
||||
// Read the first chunk offset.
|
||||
long firstChunkOffset = checksumInfo.getChunkOffset();
|
||||
|
||||
if ( firstChunkOffset < 0 || firstChunkOffset > params.getStartOffset() ||
|
||||
firstChunkOffset <= (params.getStartOffset() - checksum.getBytesPerChecksum())) {
|
||||
if ( firstChunkOffset < 0 || firstChunkOffset > startOffset ||
|
||||
firstChunkOffset <= (startOffset - checksum.getBytesPerChecksum())) {
|
||||
throw new IOException("BlockReader: error in first chunk offset (" +
|
||||
firstChunkOffset + ") startOffset is " +
|
||||
params.getStartOffset() + " for file " + params.getFile());
|
||||
startOffset + " for file " + file);
|
||||
}
|
||||
|
||||
return new RemoteBlockReader(params.getFile(), params.getBlock().getBlockPoolId(),
|
||||
params.getBlock().getBlockId(), in, checksum, params.getVerifyChecksum(),
|
||||
params.getStartOffset(), firstChunkOffset, params.getLen(), sock);
|
||||
return new RemoteBlockReader(file, block.getBlockPoolId(), block.getBlockId(),
|
||||
in, checksum, verifyChecksum, startOffset, firstChunkOffset, len, sock);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -246,22 +246,24 @@ private void readTrailingEmptyPacket() throws IOException {
|
||||
}
|
||||
}
|
||||
|
||||
protected RemoteBlockReader2(BlockReaderFactory.Params params,
|
||||
DataChecksum checksum, long firstChunkOffset, ReadableByteChannel in) {
|
||||
protected RemoteBlockReader2(String file, String bpid, long blockId,
|
||||
ReadableByteChannel in, DataChecksum checksum, boolean verifyChecksum,
|
||||
long startOffset, long firstChunkOffset, long bytesToRead, Socket dnSock,
|
||||
IOStreamPair ioStreams) {
|
||||
// Path is used only for printing block and file information in debug
|
||||
this.dnSock = params.getSocket();
|
||||
this.ioStreams = params.getIoStreamPair();
|
||||
this.dnSock = dnSock;
|
||||
this.ioStreams = ioStreams;
|
||||
this.in = in;
|
||||
this.checksum = checksum;
|
||||
this.verifyChecksum = params.getVerifyChecksum();
|
||||
this.startOffset = Math.max( params.getStartOffset(), 0 );
|
||||
this.filename = params.getFile();
|
||||
this.verifyChecksum = verifyChecksum;
|
||||
this.startOffset = Math.max( startOffset, 0 );
|
||||
this.filename = file;
|
||||
|
||||
// The total number of bytes that we need to transfer from the DN is
|
||||
// the amount that the user wants (bytesToRead), plus the padding at
|
||||
// the beginning in order to chunk-align. Note that the DN may elect
|
||||
// to send more than this amount if the read starts/ends mid-chunk.
|
||||
this.bytesNeededToFinish = params.getLen() + (startOffset - firstChunkOffset);
|
||||
this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset);
|
||||
bytesPerChecksum = this.checksum.getBytesPerChecksum();
|
||||
checksumSize = this.checksum.getChecksumSize();
|
||||
}
|
||||
@ -371,9 +373,16 @@ public void readFully(byte[] buf, int off, int len) throws IOException {
|
||||
* @param clientName Client name
|
||||
* @return New BlockReader instance, or null on error.
|
||||
*/
|
||||
public static BlockReader newBlockReader(BlockReaderFactory.Params params)
|
||||
public static BlockReader newBlockReader(Socket sock, String file,
|
||||
ExtendedBlock block,
|
||||
Token<BlockTokenIdentifier> blockToken,
|
||||
long startOffset, long len,
|
||||
int bufferSize, boolean verifyChecksum,
|
||||
String clientName,
|
||||
DataEncryptionKey encryptionKey,
|
||||
IOStreamPair ioStreams)
|
||||
throws IOException {
|
||||
IOStreamPair ioStreams = params.getIoStreamPair();
|
||||
|
||||
ReadableByteChannel ch;
|
||||
if (ioStreams.in instanceof SocketInputWrapper) {
|
||||
ch = ((SocketInputWrapper)ioStreams.in).getReadableByteChannel();
|
||||
@ -384,8 +393,7 @@ public static BlockReader newBlockReader(BlockReaderFactory.Params params)
|
||||
// in and out will be closed when sock is closed (by the caller)
|
||||
final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
|
||||
ioStreams.out));
|
||||
new Sender(out).readBlock(params.getBlock(), params.getBlockToken(),
|
||||
params.getClientName(), params.getStartOffset(), params.getLen());
|
||||
new Sender(out).readBlock(block, blockToken, clientName, startOffset, len);
|
||||
|
||||
//
|
||||
// Get bytes in block
|
||||
@ -394,8 +402,7 @@ public static BlockReader newBlockReader(BlockReaderFactory.Params params)
|
||||
|
||||
BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
|
||||
vintPrefixed(in));
|
||||
checkSuccess(status, params.getSocket(), params.getBlock(),
|
||||
params.getFile());
|
||||
checkSuccess(status, sock, block, file);
|
||||
ReadOpChecksumInfoProto checksumInfo =
|
||||
status.getReadOpChecksumInfo();
|
||||
DataChecksum checksum = DataTransferProtoUtil.fromProto(
|
||||
@ -405,14 +412,16 @@ public static BlockReader newBlockReader(BlockReaderFactory.Params params)
|
||||
// Read the first chunk offset.
|
||||
long firstChunkOffset = checksumInfo.getChunkOffset();
|
||||
|
||||
if ( firstChunkOffset < 0 || firstChunkOffset > params.getStartOffset() ||
|
||||
firstChunkOffset <= (params.getStartOffset() - checksum.getBytesPerChecksum())) {
|
||||
if ( firstChunkOffset < 0 || firstChunkOffset > startOffset ||
|
||||
firstChunkOffset <= (startOffset - checksum.getBytesPerChecksum())) {
|
||||
throw new IOException("BlockReader: error in first chunk offset (" +
|
||||
firstChunkOffset + ") startOffset is " +
|
||||
params.getStartOffset() + " for file " + params.getFile());
|
||||
firstChunkOffset + ") startOffset is " +
|
||||
startOffset + " for file " + file);
|
||||
}
|
||||
|
||||
return new RemoteBlockReader2(params, checksum, firstChunkOffset, ch);
|
||||
return new RemoteBlockReader2(file, block.getBlockPoolId(), block.getBlockId(),
|
||||
ch, checksum, verifyChecksum, startOffset, firstChunkOffset, len, sock,
|
||||
ioStreams);
|
||||
}
|
||||
|
||||
static void checkSuccess(
|
||||
|
@ -44,7 +44,6 @@
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.BlockReader;
|
||||
import org.apache.hadoop.hdfs.BlockReaderFactory;
|
||||
import org.apache.hadoop.hdfs.DFSClient.Conf;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
@ -203,16 +202,14 @@ public static void streamBlockInAscii(InetSocketAddress addr, String poolId,
|
||||
s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
|
||||
|
||||
int amtToRead = (int)Math.min(chunkSizeToView, blockSize - offsetIntoBlock);
|
||||
|
||||
// Use the block name for file name.
|
||||
|
||||
// Use the block name for file name.
|
||||
String file = BlockReaderFactory.getFileName(addr, poolId, blockId);
|
||||
BlockReader blockReader = BlockReaderFactory.newBlockReader(
|
||||
new BlockReaderFactory.Params(new Conf(conf)).
|
||||
setSocket(s).
|
||||
setBlockToken(blockToken).setStartOffset(offsetIntoBlock).
|
||||
setLen(amtToRead).
|
||||
setEncryptionKey(encryptionKey).
|
||||
setFile(BlockReaderFactory.getFileName(addr, poolId, blockId)).
|
||||
setBlock(new ExtendedBlock(poolId, blockId, 0, genStamp)));
|
||||
conf, s, file,
|
||||
new ExtendedBlock(poolId, blockId, 0, genStamp), blockToken,
|
||||
offsetIntoBlock, amtToRead, encryptionKey);
|
||||
|
||||
byte[] buf = new byte[(int)amtToRead];
|
||||
int readOffset = 0;
|
||||
int retries = 2;
|
||||
|
@ -40,7 +40,6 @@
|
||||
import org.apache.hadoop.hdfs.BlockReader;
|
||||
import org.apache.hadoop.hdfs.BlockReaderFactory;
|
||||
import org.apache.hadoop.hdfs.DFSClient;
|
||||
import org.apache.hadoop.hdfs.DFSClient.Conf;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
@ -557,14 +556,12 @@ private void copyBlock(DFSClient dfs, LocatedBlock lblock,
|
||||
s.connect(targetAddr, HdfsServerConstants.READ_TIMEOUT);
|
||||
s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
|
||||
|
||||
String file = BlockReaderFactory.getFileName(targetAddr, block.getBlockPoolId(),
|
||||
block.getBlockId());
|
||||
blockReader = BlockReaderFactory.newBlockReader(
|
||||
new BlockReaderFactory.Params(new Conf(conf)).
|
||||
setSocket(s).setBlock(block).
|
||||
setFile(BlockReaderFactory.getFileName(targetAddr,
|
||||
block.getBlockPoolId(), block.getBlockId())).
|
||||
setBlockToken(lblock.getBlockToken()).
|
||||
setEncryptionKey(namenode.getRpcServer().getDataEncryptionKey()).
|
||||
setLen(-1));
|
||||
conf, s, file, block, lblock
|
||||
.getBlockToken(), 0, -1,
|
||||
namenode.getRpcServer().getDataEncryptionKey());
|
||||
|
||||
} catch (IOException ex) {
|
||||
// Put chosen node into dead list, continue
|
||||
|
@ -31,7 +31,6 @@
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.DFSClient.Conf;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
@ -151,14 +150,12 @@ public BlockReader getBlockReader(LocatedBlock testBlock, int offset, int lenToR
|
||||
sock.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
|
||||
|
||||
return BlockReaderFactory.newBlockReader(
|
||||
new BlockReaderFactory.Params(new Conf(conf)).
|
||||
setSocket(sock).
|
||||
setFile(targetAddr.toString() + ":" + block.getBlockId()).
|
||||
setBlock(block).setBlockToken(testBlock.getBlockToken()).
|
||||
setStartOffset(offset).setLen(lenToRead).
|
||||
setBufferSize(conf.getInt(
|
||||
CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096)).
|
||||
setVerifyChecksum(true));
|
||||
new DFSClient.Conf(conf),
|
||||
sock, targetAddr.toString()+ ":" + block.getBlockId(), block,
|
||||
testBlock.getBlockToken(),
|
||||
offset, lenToRead,
|
||||
conf.getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096),
|
||||
true, "", null, null);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -39,7 +39,6 @@
|
||||
import org.apache.hadoop.hdfs.BlockReader;
|
||||
import org.apache.hadoop.hdfs.BlockReaderFactory;
|
||||
import org.apache.hadoop.hdfs.DFSClient;
|
||||
import org.apache.hadoop.hdfs.DFSClient.Conf;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
@ -146,10 +145,9 @@ private static void tryRead(Configuration conf, LocatedBlock lblock,
|
||||
String file = BlockReaderFactory.getFileName(targetAddr,
|
||||
"test-blockpoolid", block.getBlockId());
|
||||
blockReader = BlockReaderFactory.newBlockReader(
|
||||
new BlockReaderFactory.Params(new Conf(conf)).
|
||||
setSocket(s).setBlock(block).setFile(file).
|
||||
setBlockToken(lblock.getBlockToken()).setStartOffset(0).
|
||||
setLen(-1));
|
||||
conf, s, file, block,
|
||||
lblock.getBlockToken(), 0, -1, null);
|
||||
|
||||
} catch (IOException ex) {
|
||||
if (ex instanceof InvalidBlockTokenException) {
|
||||
assertFalse("OP_READ_BLOCK: access token is invalid, "
|
||||
|
@ -32,9 +32,7 @@
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.BlockReader;
|
||||
import org.apache.hadoop.hdfs.BlockReaderFactory;
|
||||
import org.apache.hadoop.hdfs.DFSClient.Conf;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
@ -279,13 +277,13 @@ private void accessBlock(DatanodeInfo datanode, LocatedBlock lblock)
|
||||
s.connect(targetAddr, HdfsServerConstants.READ_TIMEOUT);
|
||||
s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
|
||||
|
||||
BlockReader blockReader = BlockReaderFactory.
|
||||
newBlockReader(new BlockReaderFactory.Params(new Conf(conf)).
|
||||
setFile(BlockReaderFactory.getFileName(targetAddr,
|
||||
"test-blockpoolid", block.getBlockId())).
|
||||
setBlock(block).setBlockToken(lblock.getBlockToken()).
|
||||
setSocket(s));
|
||||
blockReader.close();
|
||||
String file = BlockReaderFactory.getFileName(targetAddr,
|
||||
"test-blockpoolid",
|
||||
block.getBlockId());
|
||||
BlockReaderFactory.newBlockReader(conf, s, file, block, lblock
|
||||
.getBlockToken(), 0, -1, null);
|
||||
|
||||
// nothing - if it fails - it will throw and exception
|
||||
}
|
||||
|
||||
/**
|
||||
|
Loading…
Reference in New Issue
Block a user