HDFS-8057 Move BlockReader implementation to the client implementation package. Contributed by Takanobu Asanuma
This commit is contained in:
parent
10f0f7851a
commit
f308561f1d
@ -56,6 +56,7 @@
|
|||||||
import org.apache.hadoop.fs.ReadOption;
|
import org.apache.hadoop.fs.ReadOption;
|
||||||
import org.apache.hadoop.fs.StorageType;
|
import org.apache.hadoop.fs.StorageType;
|
||||||
import org.apache.hadoop.hdfs.DFSUtilClient.CorruptedBlocks;
|
import org.apache.hadoop.hdfs.DFSUtilClient.CorruptedBlocks;
|
||||||
|
import org.apache.hadoop.hdfs.client.impl.BlockReaderFactory;
|
||||||
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
|
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
|
||||||
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
|
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
|
@ -15,7 +15,7 @@
|
|||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs;
|
package org.apache.hadoop.hdfs.client.impl;
|
||||||
|
|
||||||
import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitFdResponse.USE_RECEIPT_VERIFICATION;
|
import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitFdResponse.USE_RECEIPT_VERIFICATION;
|
||||||
|
|
||||||
@ -34,7 +34,15 @@
|
|||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.StorageType;
|
import org.apache.hadoop.fs.StorageType;
|
||||||
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
|
import org.apache.hadoop.hdfs.BlockReader;
|
||||||
|
import org.apache.hadoop.hdfs.ClientContext;
|
||||||
|
import org.apache.hadoop.hdfs.DFSClient;
|
||||||
|
import org.apache.hadoop.hdfs.DFSInputStream;
|
||||||
|
import org.apache.hadoop.hdfs.DFSUtilClient;
|
||||||
|
import org.apache.hadoop.hdfs.ExtendedBlockId;
|
||||||
|
import org.apache.hadoop.hdfs.RemotePeerFactory;
|
||||||
|
import org.apache.hadoop.hdfs.ReplicaAccessor;
|
||||||
|
import org.apache.hadoop.hdfs.ReplicaAccessorBuilder;
|
||||||
import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
|
import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
|
||||||
import org.apache.hadoop.hdfs.net.DomainPeer;
|
import org.apache.hadoop.hdfs.net.DomainPeer;
|
||||||
import org.apache.hadoop.hdfs.net.Peer;
|
import org.apache.hadoop.hdfs.net.Peer;
|
||||||
@ -646,7 +654,7 @@ private ShortCircuitReplicaInfo requestFileDescriptors(DomainPeer peer,
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get a RemoteBlockReader that communicates over a UNIX domain socket.
|
* Get a BlockReaderRemote that communicates over a UNIX domain socket.
|
||||||
*
|
*
|
||||||
* @return The new BlockReader, or null if we failed to create the block
|
* @return The new BlockReader, or null if we failed to create the block
|
||||||
* reader.
|
* reader.
|
||||||
@ -709,7 +717,7 @@ private BlockReader getRemoteBlockReaderFromDomain() throws IOException {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get a RemoteBlockReader that communicates over a TCP socket.
|
* Get a BlockReaderRemote that communicates over a TCP socket.
|
||||||
*
|
*
|
||||||
* @return The new BlockReader. We will not return null, but instead throw
|
* @return The new BlockReader. We will not return null, but instead throw
|
||||||
* an exception if this fails.
|
* an exception if this fails.
|
||||||
@ -837,13 +845,13 @@ private static boolean isSecurityException(IOException ioe) {
|
|||||||
private BlockReader getRemoteBlockReader(Peer peer) throws IOException {
|
private BlockReader getRemoteBlockReader(Peer peer) throws IOException {
|
||||||
int networkDistance = clientContext.getNetworkDistance(datanode);
|
int networkDistance = clientContext.getNetworkDistance(datanode);
|
||||||
if (conf.getShortCircuitConf().isUseLegacyBlockReader()) {
|
if (conf.getShortCircuitConf().isUseLegacyBlockReader()) {
|
||||||
return RemoteBlockReader.newBlockReader(fileName,
|
return BlockReaderRemote.newBlockReader(fileName,
|
||||||
block, token, startOffset, length, conf.getIoBufferSize(),
|
block, token, startOffset, length, conf.getIoBufferSize(),
|
||||||
verifyChecksum, clientName, peer, datanode,
|
verifyChecksum, clientName, peer, datanode,
|
||||||
clientContext.getPeerCache(), cachingStrategy, tracer,
|
clientContext.getPeerCache(), cachingStrategy, tracer,
|
||||||
networkDistance);
|
networkDistance);
|
||||||
} else {
|
} else {
|
||||||
return RemoteBlockReader2.newBlockReader(
|
return BlockReaderRemote2.newBlockReader(
|
||||||
fileName, block, token, startOffset, length,
|
fileName, block, token, startOffset, length,
|
||||||
verifyChecksum, clientName, peer, datanode,
|
verifyChecksum, clientName, peer, datanode,
|
||||||
clientContext.getPeerCache(), cachingStrategy, tracer,
|
clientContext.getPeerCache(), cachingStrategy, tracer,
|
@ -15,7 +15,7 @@
|
|||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs;
|
package org.apache.hadoop.hdfs.client.impl;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
@ -25,6 +25,7 @@
|
|||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.fs.ReadOption;
|
import org.apache.hadoop.fs.ReadOption;
|
||||||
import org.apache.hadoop.fs.StorageType;
|
import org.apache.hadoop.fs.StorageType;
|
||||||
|
import org.apache.hadoop.hdfs.BlockReader;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
|
import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
@ -15,7 +15,7 @@
|
|||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs;
|
package org.apache.hadoop.hdfs.client.impl;
|
||||||
|
|
||||||
import java.io.DataInputStream;
|
import java.io.DataInputStream;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
@ -33,8 +33,9 @@
|
|||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.ReadOption;
|
import org.apache.hadoop.fs.ReadOption;
|
||||||
import org.apache.hadoop.fs.StorageType;
|
import org.apache.hadoop.fs.StorageType;
|
||||||
|
import org.apache.hadoop.hdfs.BlockReader;
|
||||||
|
import org.apache.hadoop.hdfs.DFSUtilClient;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
|
|
||||||
import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
|
import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
|
||||||
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
|
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
|
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
|
@ -15,7 +15,7 @@
|
|||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs;
|
package org.apache.hadoop.hdfs.client.impl;
|
||||||
|
|
||||||
import java.io.BufferedInputStream;
|
import java.io.BufferedInputStream;
|
||||||
import java.io.BufferedOutputStream;
|
import java.io.BufferedOutputStream;
|
||||||
@ -29,6 +29,8 @@
|
|||||||
import org.apache.hadoop.fs.FSInputChecker;
|
import org.apache.hadoop.fs.FSInputChecker;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.ReadOption;
|
import org.apache.hadoop.fs.ReadOption;
|
||||||
|
import org.apache.hadoop.hdfs.BlockReader;
|
||||||
|
import org.apache.hadoop.hdfs.PeerCache;
|
||||||
import org.apache.hadoop.hdfs.net.Peer;
|
import org.apache.hadoop.hdfs.net.Peer;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||||
@ -54,13 +56,13 @@
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* @deprecated this is an old implementation that is being left around
|
* @deprecated this is an old implementation that is being left around
|
||||||
* in case any issues spring up with the new {@link RemoteBlockReader2}
|
* in case any issues spring up with the new {@link BlockReaderRemote2}
|
||||||
* implementation.
|
* implementation.
|
||||||
* It will be removed in the next release.
|
* It will be removed in the next release.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
@Deprecated
|
@Deprecated
|
||||||
public class RemoteBlockReader extends FSInputChecker implements BlockReader {
|
public class BlockReaderRemote extends FSInputChecker implements BlockReader {
|
||||||
static final Logger LOG = LoggerFactory.getLogger(FSInputChecker.class);
|
static final Logger LOG = LoggerFactory.getLogger(FSInputChecker.class);
|
||||||
|
|
||||||
private final Peer peer;
|
private final Peer peer;
|
||||||
@ -209,7 +211,7 @@ protected synchronized int readChunk(long pos, byte[] buf, int offset,
|
|||||||
int len, byte[] checksumBuf)
|
int len, byte[] checksumBuf)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
try (TraceScope ignored = tracer.newScope(
|
try (TraceScope ignored = tracer.newScope(
|
||||||
"RemoteBlockReader#readChunk(" + blockId + ")")) {
|
"BlockReaderRemote#readChunk(" + blockId + ")")) {
|
||||||
return readChunkImpl(pos, buf, offset, len, checksumBuf);
|
return readChunkImpl(pos, buf, offset, len, checksumBuf);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -335,7 +337,7 @@ private synchronized int readChunkImpl(long pos, byte[] buf, int offset,
|
|||||||
return bytesToRead;
|
return bytesToRead;
|
||||||
}
|
}
|
||||||
|
|
||||||
private RemoteBlockReader(String file, String bpid, long blockId,
|
private BlockReaderRemote(String file, String bpid, long blockId,
|
||||||
DataInputStream in, DataChecksum checksum, boolean verifyChecksum,
|
DataInputStream in, DataChecksum checksum, boolean verifyChecksum,
|
||||||
long startOffset, long firstChunkOffset, long bytesToRead, Peer peer,
|
long startOffset, long firstChunkOffset, long bytesToRead, Peer peer,
|
||||||
DatanodeID datanodeID, PeerCache peerCache, Tracer tracer,
|
DatanodeID datanodeID, PeerCache peerCache, Tracer tracer,
|
||||||
@ -386,7 +388,7 @@ private RemoteBlockReader(String file, String bpid, long blockId,
|
|||||||
* @param clientName Client name
|
* @param clientName Client name
|
||||||
* @return New BlockReader instance, or null on error.
|
* @return New BlockReader instance, or null on error.
|
||||||
*/
|
*/
|
||||||
public static RemoteBlockReader newBlockReader(String file,
|
public static BlockReaderRemote newBlockReader(String file,
|
||||||
ExtendedBlock block,
|
ExtendedBlock block,
|
||||||
Token<BlockTokenIdentifier> blockToken,
|
Token<BlockTokenIdentifier> blockToken,
|
||||||
long startOffset, long len,
|
long startOffset, long len,
|
||||||
@ -412,7 +414,7 @@ public static RemoteBlockReader newBlockReader(String file,
|
|||||||
|
|
||||||
BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
|
BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
|
||||||
PBHelperClient.vintPrefixed(in));
|
PBHelperClient.vintPrefixed(in));
|
||||||
RemoteBlockReader2.checkSuccess(status, peer, block, file);
|
BlockReaderRemote2.checkSuccess(status, peer, block, file);
|
||||||
ReadOpChecksumInfoProto checksumInfo =
|
ReadOpChecksumInfoProto checksumInfo =
|
||||||
status.getReadOpChecksumInfo();
|
status.getReadOpChecksumInfo();
|
||||||
DataChecksum checksum = DataTransferProtoUtil.fromProto(
|
DataChecksum checksum = DataTransferProtoUtil.fromProto(
|
||||||
@ -429,7 +431,7 @@ public static RemoteBlockReader newBlockReader(String file,
|
|||||||
startOffset + " for file " + file);
|
startOffset + " for file " + file);
|
||||||
}
|
}
|
||||||
|
|
||||||
return new RemoteBlockReader(file, block.getBlockPoolId(), block.getBlockId(),
|
return new BlockReaderRemote(file, block.getBlockPoolId(), block.getBlockId(),
|
||||||
in, checksum, verifyChecksum, startOffset, firstChunkOffset, len,
|
in, checksum, verifyChecksum, startOffset, firstChunkOffset, len,
|
||||||
peer, datanodeID, peerCache, tracer, networkDistance);
|
peer, datanodeID, peerCache, tracer, networkDistance);
|
||||||
}
|
}
|
||||||
@ -467,7 +469,7 @@ public int readAll(byte[] buf, int offset, int len) throws IOException {
|
|||||||
void sendReadResult(Peer peer, Status statusCode) {
|
void sendReadResult(Peer peer, Status statusCode) {
|
||||||
assert !sentStatusCode : "already sent status code to " + peer;
|
assert !sentStatusCode : "already sent status code to " + peer;
|
||||||
try {
|
try {
|
||||||
RemoteBlockReader2.writeReadResult(peer.getOutputStream(), statusCode);
|
BlockReaderRemote2.writeReadResult(peer.getOutputStream(), statusCode);
|
||||||
sentStatusCode = true;
|
sentStatusCode = true;
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
// It's ok not to be able to send this. But something is probably wrong.
|
// It's ok not to be able to send this. But something is probably wrong.
|
||||||
@ -478,14 +480,14 @@ void sendReadResult(Peer peer, Status statusCode) {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int read(ByteBuffer buf) throws IOException {
|
public int read(ByteBuffer buf) throws IOException {
|
||||||
throw new UnsupportedOperationException("readDirect unsupported in RemoteBlockReader");
|
throw new UnsupportedOperationException("readDirect unsupported in BlockReaderRemote");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int available() {
|
public int available() {
|
||||||
// An optimistic estimate of how much data is available
|
// An optimistic estimate of how much data is available
|
||||||
// to us without doing network I/O.
|
// to us without doing network I/O.
|
||||||
return RemoteBlockReader2.TCP_WINDOW_SIZE;
|
return BlockReaderRemote2.TCP_WINDOW_SIZE;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
@ -15,7 +15,7 @@
|
|||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs;
|
package org.apache.hadoop.hdfs.client.impl;
|
||||||
|
|
||||||
import java.io.BufferedOutputStream;
|
import java.io.BufferedOutputStream;
|
||||||
import java.io.DataInputStream;
|
import java.io.DataInputStream;
|
||||||
@ -30,6 +30,8 @@
|
|||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.fs.ReadOption;
|
import org.apache.hadoop.fs.ReadOption;
|
||||||
|
import org.apache.hadoop.hdfs.BlockReader;
|
||||||
|
import org.apache.hadoop.hdfs.PeerCache;
|
||||||
import org.apache.hadoop.hdfs.net.Peer;
|
import org.apache.hadoop.hdfs.net.Peer;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
@ -78,13 +80,13 @@
|
|||||||
*
|
*
|
||||||
* This is a new implementation introduced in Hadoop 0.23 which
|
* This is a new implementation introduced in Hadoop 0.23 which
|
||||||
* is more efficient and simpler than the older BlockReader
|
* is more efficient and simpler than the older BlockReader
|
||||||
* implementation. It should be renamed to RemoteBlockReader
|
* implementation. It should be renamed to BlockReaderRemote
|
||||||
* once we are confident in it.
|
* once we are confident in it.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class RemoteBlockReader2 implements BlockReader {
|
public class BlockReaderRemote2 implements BlockReader {
|
||||||
|
|
||||||
static final Logger LOG = LoggerFactory.getLogger(RemoteBlockReader2.class);
|
static final Logger LOG = LoggerFactory.getLogger(BlockReaderRemote2.class);
|
||||||
static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB;
|
static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB;
|
||||||
|
|
||||||
final private Peer peer;
|
final private Peer peer;
|
||||||
@ -138,7 +140,7 @@ public synchronized int read(byte[] buf, int off, int len)
|
|||||||
if (curDataSlice == null ||
|
if (curDataSlice == null ||
|
||||||
curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
|
curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
|
||||||
try (TraceScope ignored = tracer.newScope(
|
try (TraceScope ignored = tracer.newScope(
|
||||||
"RemoteBlockReader2#readNextPacket(" + blockId + ")")) {
|
"BlockReaderRemote2#readNextPacket(" + blockId + ")")) {
|
||||||
readNextPacket();
|
readNextPacket();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -162,7 +164,7 @@ public synchronized int read(ByteBuffer buf) throws IOException {
|
|||||||
if (curDataSlice == null ||
|
if (curDataSlice == null ||
|
||||||
(curDataSlice.remaining() == 0 && bytesNeededToFinish > 0)) {
|
(curDataSlice.remaining() == 0 && bytesNeededToFinish > 0)) {
|
||||||
try (TraceScope ignored = tracer.newScope(
|
try (TraceScope ignored = tracer.newScope(
|
||||||
"RemoteBlockReader2#readNextPacket(" + blockId + ")")) {
|
"BlockReaderRemote2#readNextPacket(" + blockId + ")")) {
|
||||||
readNextPacket();
|
readNextPacket();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -273,7 +275,7 @@ private void readTrailingEmptyPacket() throws IOException {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected RemoteBlockReader2(String file, long blockId,
|
protected BlockReaderRemote2(String file, long blockId,
|
||||||
DataChecksum checksum, boolean verifyChecksum,
|
DataChecksum checksum, boolean verifyChecksum,
|
||||||
long startOffset, long firstChunkOffset, long bytesToRead, Peer peer,
|
long startOffset, long firstChunkOffset, long bytesToRead, Peer peer,
|
||||||
DatanodeID datanodeID, PeerCache peerCache, Tracer tracer,
|
DatanodeID datanodeID, PeerCache peerCache, Tracer tracer,
|
||||||
@ -425,7 +427,7 @@ public static BlockReader newBlockReader(String file,
|
|||||||
startOffset + " for file " + file);
|
startOffset + " for file " + file);
|
||||||
}
|
}
|
||||||
|
|
||||||
return new RemoteBlockReader2(file, block.getBlockId(), checksum,
|
return new BlockReaderRemote2(file, block.getBlockId(), checksum,
|
||||||
verifyChecksum, startOffset, firstChunkOffset, len, peer, datanodeID,
|
verifyChecksum, startOffset, firstChunkOffset, len, peer, datanodeID,
|
||||||
peerCache, tracer, networkDistance);
|
peerCache, tracer, networkDistance);
|
||||||
}
|
}
|
@ -15,9 +15,10 @@
|
|||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs;
|
package org.apache.hadoop.hdfs.client.impl;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hdfs.BlockReader;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
@ -15,7 +15,7 @@
|
|||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs;
|
package org.apache.hadoop.hdfs.client.impl;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
@ -23,6 +23,8 @@
|
|||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.fs.ReadOption;
|
import org.apache.hadoop.fs.ReadOption;
|
||||||
|
import org.apache.hadoop.hdfs.BlockReader;
|
||||||
|
import org.apache.hadoop.hdfs.ReplicaAccessor;
|
||||||
import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
|
import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
|
||||||
import org.apache.hadoop.util.DataChecksum;
|
import org.apache.hadoop.util.DataChecksum;
|
||||||
|
|
@ -133,7 +133,7 @@
|
|||||||
|
|
||||||
<!-- Don't complain about LocalDatanodeInfo's anonymous class -->
|
<!-- Don't complain about LocalDatanodeInfo's anonymous class -->
|
||||||
<Match>
|
<Match>
|
||||||
<Class name="org.apache.hadoop.hdfs.BlockReaderLocal$LocalDatanodeInfo$1" />
|
<Class name="org.apache.hadoop.hdfs.client.impl.BlockReaderLocal$LocalDatanodeInfo$1" />
|
||||||
<Bug pattern="SE_BAD_FIELD_INNER_CLASS" />
|
<Bug pattern="SE_BAD_FIELD_INNER_CLASS" />
|
||||||
</Match>
|
</Match>
|
||||||
<!-- Only one method increments numFailedVolumes and it is synchronized -->
|
<!-- Only one method increments numFailedVolumes and it is synchronized -->
|
||||||
|
@ -23,7 +23,7 @@
|
|||||||
import org.apache.hadoop.hdfs.BlockReader;
|
import org.apache.hadoop.hdfs.BlockReader;
|
||||||
import org.apache.hadoop.hdfs.DFSUtilClient;
|
import org.apache.hadoop.hdfs.DFSUtilClient;
|
||||||
import org.apache.hadoop.hdfs.DFSUtilClient.CorruptedBlocks;
|
import org.apache.hadoop.hdfs.DFSUtilClient.CorruptedBlocks;
|
||||||
import org.apache.hadoop.hdfs.RemoteBlockReader2;
|
import org.apache.hadoop.hdfs.client.impl.BlockReaderRemote2;
|
||||||
import org.apache.hadoop.hdfs.net.Peer;
|
import org.apache.hadoop.hdfs.net.Peer;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
@ -112,7 +112,7 @@ private BlockReader createBlockReader(long offsetInBlock) {
|
|||||||
*
|
*
|
||||||
* TODO: add proper tracer
|
* TODO: add proper tracer
|
||||||
*/
|
*/
|
||||||
return RemoteBlockReader2.newBlockReader(
|
return BlockReaderRemote2.newBlockReader(
|
||||||
"dummy", block, blockToken, offsetInBlock,
|
"dummy", block, blockToken, offsetInBlock,
|
||||||
block.getNumBytes() - offsetInBlock, true,
|
block.getNumBytes() - offsetInBlock, true,
|
||||||
"", newConnectedPeer(block, dnAddr, blockToken, source), source,
|
"", newConnectedPeer(block, dnAddr, blockToken, source), source,
|
||||||
|
@ -43,7 +43,7 @@
|
|||||||
import org.apache.hadoop.fs.StorageType;
|
import org.apache.hadoop.fs.StorageType;
|
||||||
import org.apache.hadoop.fs.UnresolvedLinkException;
|
import org.apache.hadoop.fs.UnresolvedLinkException;
|
||||||
import org.apache.hadoop.hdfs.BlockReader;
|
import org.apache.hadoop.hdfs.BlockReader;
|
||||||
import org.apache.hadoop.hdfs.BlockReaderFactory;
|
import org.apache.hadoop.hdfs.client.impl.BlockReaderFactory;
|
||||||
import org.apache.hadoop.hdfs.DFSClient;
|
import org.apache.hadoop.hdfs.DFSClient;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.DFSUtilClient;
|
import org.apache.hadoop.hdfs.DFSUtilClient;
|
||||||
|
@ -38,7 +38,7 @@
|
|||||||
import org.apache.commons.lang.mutable.MutableBoolean;
|
import org.apache.commons.lang.mutable.MutableBoolean;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.hdfs.BlockReaderTestUtil;
|
import org.apache.hadoop.hdfs.client.impl.BlockReaderTestUtil;
|
||||||
import org.apache.hadoop.hdfs.ClientContext;
|
import org.apache.hadoop.hdfs.ClientContext;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
@ -48,7 +48,6 @@
|
|||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
|
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
|
||||||
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
|
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
|
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
|
@ -25,6 +25,7 @@
|
|||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hdfs.client.impl.BlockReaderTestUtil;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
|
@ -26,6 +26,7 @@
|
|||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
|
import org.apache.hadoop.hdfs.client.impl.BlockReaderTestUtil;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
@ -24,6 +24,7 @@
|
|||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
|
import org.apache.hadoop.hdfs.client.impl.BlockReaderTestUtil;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -28,6 +28,7 @@
|
|||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hdfs.client.impl.BlockReaderTestUtil;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
import org.apache.log4j.Level;
|
import org.apache.log4j.Level;
|
||||||
|
@ -16,7 +16,7 @@
|
|||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.apache.hadoop.hdfs;
|
package org.apache.hadoop.hdfs.client.impl;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
@ -32,6 +32,15 @@
|
|||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.FsTracer;
|
import org.apache.hadoop.fs.FsTracer;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hdfs.BlockReader;
|
||||||
|
import org.apache.hadoop.hdfs.ClientContext;
|
||||||
|
import org.apache.hadoop.hdfs.DFSClient;
|
||||||
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
import org.apache.hadoop.hdfs.DFSUtilClient;
|
||||||
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
import org.apache.hadoop.hdfs.RemotePeerFactory;
|
||||||
import org.apache.hadoop.hdfs.net.Peer;
|
import org.apache.hadoop.hdfs.net.Peer;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
@ -224,7 +233,7 @@ public DataNode getDataNode(LocatedBlock testBlock) {
|
|||||||
int ipcport = nodes[0].getIpcPort();
|
int ipcport = nodes[0].getIpcPort();
|
||||||
return cluster.getDataNode(ipcport);
|
return cluster.getDataNode(ipcport);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void enableHdfsCachingTracing() {
|
public static void enableHdfsCachingTracing() {
|
||||||
LogManager.getLogger(CacheReplicationMonitor.class.getName()).setLevel(
|
LogManager.getLogger(CacheReplicationMonitor.class.getName()).setLevel(
|
||||||
Level.TRACE);
|
Level.TRACE);
|
@ -15,7 +15,7 @@
|
|||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs;
|
package org.apache.hadoop.hdfs.client.impl;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
@ -24,6 +24,9 @@
|
|||||||
|
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hdfs.BlockReader;
|
||||||
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
@ -15,7 +15,7 @@
|
|||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs;
|
package org.apache.hadoop.hdfs.client.impl;
|
||||||
|
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
|
||||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CONTEXT;
|
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CONTEXT;
|
||||||
@ -41,6 +41,11 @@
|
|||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hdfs.BlockReader;
|
||||||
|
import org.apache.hadoop.hdfs.DFSInputStream;
|
||||||
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
@ -125,7 +130,7 @@ public void testFallbackFromShortCircuitToUnixDomainTraffic()
|
|||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
sockDir.close();
|
sockDir.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test the case where we have multiple threads waiting on the
|
* Test the case where we have multiple threads waiting on the
|
||||||
* ShortCircuitCache delivering a certain ShortCircuitReplica.
|
* ShortCircuitCache delivering a certain ShortCircuitReplica.
|
||||||
@ -200,7 +205,7 @@ public void run() {
|
|||||||
* Test the case where we have a failure to complete a short circuit read
|
* Test the case where we have a failure to complete a short circuit read
|
||||||
* that occurs, and then later on, we have a success.
|
* that occurs, and then later on, we have a success.
|
||||||
* Any thread waiting on a cache load should receive the failure (if it
|
* Any thread waiting on a cache load should receive the failure (if it
|
||||||
* occurs); however, the failure result should not be cached. We want
|
* occurs); however, the failure result should not be cached. We want
|
||||||
* to be able to retry later and succeed.
|
* to be able to retry later and succeed.
|
||||||
*/
|
*/
|
||||||
@Test(timeout=60000)
|
@Test(timeout=60000)
|
||||||
@ -244,7 +249,7 @@ public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
|
|||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
// First time should fail.
|
// First time should fail.
|
||||||
List<LocatedBlock> locatedBlocks =
|
List<LocatedBlock> locatedBlocks =
|
||||||
cluster.getNameNode().getRpcServer().getBlockLocations(
|
cluster.getNameNode().getRpcServer().getBlockLocations(
|
||||||
TEST_FILE, 0, TEST_FILE_LEN).getLocatedBlocks();
|
TEST_FILE, 0, TEST_FILE_LEN).getLocatedBlocks();
|
||||||
LocatedBlock lblock = locatedBlocks.get(0); // first block
|
LocatedBlock lblock = locatedBlocks.get(0); // first block
|
||||||
@ -253,7 +258,7 @@ public void run() {
|
|||||||
blockReader = BlockReaderTestUtil.getBlockReader(
|
blockReader = BlockReaderTestUtil.getBlockReader(
|
||||||
cluster.getFileSystem(), lblock, 0, TEST_FILE_LEN);
|
cluster.getFileSystem(), lblock, 0, TEST_FILE_LEN);
|
||||||
Assert.fail("expected getBlockReader to fail the first time.");
|
Assert.fail("expected getBlockReader to fail the first time.");
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
Assert.assertTrue("expected to see 'TCP reads were disabled " +
|
Assert.assertTrue("expected to see 'TCP reads were disabled " +
|
||||||
"for testing' in exception " + t, t.getMessage().contains(
|
"for testing' in exception " + t, t.getMessage().contains(
|
||||||
"TCP reads were disabled for testing"));
|
"TCP reads were disabled for testing"));
|
||||||
@ -267,7 +272,7 @@ public void run() {
|
|||||||
try {
|
try {
|
||||||
blockReader = BlockReaderTestUtil.getBlockReader(
|
blockReader = BlockReaderTestUtil.getBlockReader(
|
||||||
cluster.getFileSystem(), lblock, 0, TEST_FILE_LEN);
|
cluster.getFileSystem(), lblock, 0, TEST_FILE_LEN);
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
LOG.error("error trying to retrieve a block reader " +
|
LOG.error("error trying to retrieve a block reader " +
|
||||||
"the second time.", t);
|
"the second time.", t);
|
||||||
throw t;
|
throw t;
|
||||||
@ -327,7 +332,7 @@ public void testShortCircuitReadFromServerWithoutShm() throws Exception {
|
|||||||
calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
|
calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
|
||||||
Assert.assertTrue(Arrays.equals(contents, expected));
|
Assert.assertTrue(Arrays.equals(contents, expected));
|
||||||
final ShortCircuitCache cache =
|
final ShortCircuitCache cache =
|
||||||
fs.dfs.getClientContext().getShortCircuitCache();
|
fs.getClient().getClientContext().getShortCircuitCache();
|
||||||
final DatanodeInfo datanode =
|
final DatanodeInfo datanode =
|
||||||
new DatanodeInfo(cluster.getDataNodes().get(0).getDatanodeId());
|
new DatanodeInfo(cluster.getDataNodes().get(0).getDatanodeId());
|
||||||
cache.getDfsClientShmManager().visit(new Visitor() {
|
cache.getDfsClientShmManager().visit(new Visitor() {
|
||||||
@ -344,7 +349,7 @@ public void visit(HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info)
|
|||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
sockDir.close();
|
sockDir.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test that a client which does not support short-circuit reads using
|
* Test that a client which does not support short-circuit reads using
|
||||||
* shared memory can talk with a server which supports it.
|
* shared memory can talk with a server which supports it.
|
||||||
@ -375,12 +380,12 @@ public void testShortCircuitReadFromClientWithoutShm() throws Exception {
|
|||||||
calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
|
calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
|
||||||
Assert.assertTrue(Arrays.equals(contents, expected));
|
Assert.assertTrue(Arrays.equals(contents, expected));
|
||||||
final ShortCircuitCache cache =
|
final ShortCircuitCache cache =
|
||||||
fs.dfs.getClientContext().getShortCircuitCache();
|
fs.getClient().getClientContext().getShortCircuitCache();
|
||||||
Assert.assertEquals(null, cache.getDfsClientShmManager());
|
Assert.assertEquals(null, cache.getDfsClientShmManager());
|
||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
sockDir.close();
|
sockDir.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test shutting down the ShortCircuitCache while there are things in it.
|
* Test shutting down the ShortCircuitCache while there are things in it.
|
||||||
*/
|
*/
|
||||||
@ -407,7 +412,7 @@ public void testShortCircuitCacheShutdown() throws Exception {
|
|||||||
calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
|
calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
|
||||||
Assert.assertTrue(Arrays.equals(contents, expected));
|
Assert.assertTrue(Arrays.equals(contents, expected));
|
||||||
final ShortCircuitCache cache =
|
final ShortCircuitCache cache =
|
||||||
fs.dfs.getClientContext().getShortCircuitCache();
|
fs.getClient().getClientContext().getShortCircuitCache();
|
||||||
cache.close();
|
cache.close();
|
||||||
Assert.assertTrue(cache.getDfsClientShmManager().
|
Assert.assertTrue(cache.getDfsClientShmManager().
|
||||||
getDomainSocketWatcher().isClosed());
|
getDomainSocketWatcher().isClosed());
|
@ -15,7 +15,7 @@
|
|||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs;
|
package org.apache.hadoop.hdfs.client.impl;
|
||||||
|
|
||||||
import static org.hamcrest.CoreMatchers.equalTo;
|
import static org.hamcrest.CoreMatchers.equalTo;
|
||||||
|
|
||||||
@ -32,9 +32,15 @@
|
|||||||
import org.apache.hadoop.fs.FSDataInputStream;
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hdfs.ClientContext;
|
||||||
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
import org.apache.hadoop.hdfs.DFSInputStream;
|
||||||
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
|
import org.apache.hadoop.hdfs.ExtendedBlockId;
|
||||||
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
|
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
|
||||||
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
|
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
|
||||||
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache;
|
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache;
|
||||||
@ -55,24 +61,24 @@
|
|||||||
|
|
||||||
public class TestBlockReaderLocal {
|
public class TestBlockReaderLocal {
|
||||||
private static TemporarySocketDirectory sockDir;
|
private static TemporarySocketDirectory sockDir;
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void init() {
|
public static void init() {
|
||||||
sockDir = new TemporarySocketDirectory();
|
sockDir = new TemporarySocketDirectory();
|
||||||
DomainSocket.disableBindPathValidation();
|
DomainSocket.disableBindPathValidation();
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterClass
|
@AfterClass
|
||||||
public static void shutdown() throws IOException {
|
public static void shutdown() throws IOException {
|
||||||
sockDir.close();
|
sockDir.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void assertArrayRegionsEqual(byte []buf1, int off1, byte []buf2,
|
public static void assertArrayRegionsEqual(byte []buf1, int off1, byte []buf2,
|
||||||
int off2, int len) {
|
int off2, int len) {
|
||||||
for (int i = 0; i < len; i++) {
|
for (int i = 0; i < len; i++) {
|
||||||
if (buf1[off1 + i] != buf2[off2 + i]) {
|
if (buf1[off1 + i] != buf2[off2 + i]) {
|
||||||
Assert.fail("arrays differ at byte " + i + ". " +
|
Assert.fail("arrays differ at byte " + i + ". " +
|
||||||
"The first array has " + (int)buf1[off1 + i] +
|
"The first array has " + (int)buf1[off1 + i] +
|
||||||
", but the second array has " + (int)buf2[off2 + i]);
|
", but the second array has " + (int)buf2[off2 + i]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -85,7 +91,7 @@ public static void assertArrayRegionsEqual(byte []buf1, int off1, byte []buf2,
|
|||||||
* @param buf The ByteBuffer to read into
|
* @param buf The ByteBuffer to read into
|
||||||
* @param off The offset in the buffer to read into
|
* @param off The offset in the buffer to read into
|
||||||
* @param len The number of bytes to read.
|
* @param len The number of bytes to read.
|
||||||
*
|
*
|
||||||
* @throws IOException If it could not read the requested number of bytes
|
* @throws IOException If it could not read the requested number of bytes
|
||||||
*/
|
*/
|
||||||
private static void readFully(BlockReaderLocal reader,
|
private static void readFully(BlockReaderLocal reader,
|
||||||
@ -120,7 +126,7 @@ public void doTest(BlockReaderLocal reader, byte original[])
|
|||||||
// default: no-op
|
// default: no-op
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void runBlockReaderLocalTest(BlockReaderLocalTest test,
|
public void runBlockReaderLocalTest(BlockReaderLocalTest test,
|
||||||
boolean checksum, long readahead) throws IOException {
|
boolean checksum, long readahead) throws IOException {
|
||||||
Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null));
|
Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null));
|
||||||
@ -139,7 +145,7 @@ public void runBlockReaderLocalTest(BlockReaderLocalTest test,
|
|||||||
BlockReaderLocal blockReaderLocal = null;
|
BlockReaderLocal blockReaderLocal = null;
|
||||||
FSDataInputStream fsIn = null;
|
FSDataInputStream fsIn = null;
|
||||||
byte original[] = new byte[BlockReaderLocalTest.TEST_LENGTH];
|
byte original[] = new byte[BlockReaderLocalTest.TEST_LENGTH];
|
||||||
|
|
||||||
FileSystem fs = null;
|
FileSystem fs = null;
|
||||||
ShortCircuitShm shm = null;
|
ShortCircuitShm shm = null;
|
||||||
RandomAccessFile raf = null;
|
RandomAccessFile raf = null;
|
||||||
@ -186,7 +192,7 @@ public void runBlockReaderLocalTest(BlockReaderLocalTest test,
|
|||||||
raf.setLength(8192);
|
raf.setLength(8192);
|
||||||
FileInputStream shmStream = new FileInputStream(raf.getFD());
|
FileInputStream shmStream = new FileInputStream(raf.getFD());
|
||||||
shm = new ShortCircuitShm(ShmId.createRandom(), shmStream);
|
shm = new ShortCircuitShm(ShmId.createRandom(), shmStream);
|
||||||
ShortCircuitReplica replica =
|
ShortCircuitReplica replica =
|
||||||
new ShortCircuitReplica(key, dataIn, metaIn, shortCircuitCache,
|
new ShortCircuitReplica(key, dataIn, metaIn, shortCircuitCache,
|
||||||
Time.now(), shm.allocAndRegisterSlot(
|
Time.now(), shm.allocAndRegisterSlot(
|
||||||
ExtendedBlockId.fromExtendedBlock(block)));
|
ExtendedBlockId.fromExtendedBlock(block)));
|
||||||
@ -216,21 +222,21 @@ public void runBlockReaderLocalTest(BlockReaderLocalTest test,
|
|||||||
if (raf != null) raf.close();
|
if (raf != null) raf.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class TestBlockReaderLocalImmediateClose
|
private static class TestBlockReaderLocalImmediateClose
|
||||||
extends BlockReaderLocalTest {
|
extends BlockReaderLocalTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testBlockReaderLocalImmediateClose() throws IOException {
|
public void testBlockReaderLocalImmediateClose() throws IOException {
|
||||||
runBlockReaderLocalTest(new TestBlockReaderLocalImmediateClose(), true, 0);
|
runBlockReaderLocalTest(new TestBlockReaderLocalImmediateClose(), true, 0);
|
||||||
runBlockReaderLocalTest(new TestBlockReaderLocalImmediateClose(), false, 0);
|
runBlockReaderLocalTest(new TestBlockReaderLocalImmediateClose(), false, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class TestBlockReaderSimpleReads
|
private static class TestBlockReaderSimpleReads
|
||||||
extends BlockReaderLocalTest {
|
extends BlockReaderLocalTest {
|
||||||
@Override
|
@Override
|
||||||
public void doTest(BlockReaderLocal reader, byte original[])
|
public void doTest(BlockReaderLocal reader, byte original[])
|
||||||
throws IOException {
|
throws IOException {
|
||||||
byte buf[] = new byte[TEST_LENGTH];
|
byte buf[] = new byte[TEST_LENGTH];
|
||||||
reader.readFully(buf, 0, 512);
|
reader.readFully(buf, 0, 512);
|
||||||
@ -246,7 +252,7 @@ public void doTest(BlockReaderLocal reader, byte original[])
|
|||||||
BlockReaderLocalTest.BYTES_PER_CHECKSUM);
|
BlockReaderLocalTest.BYTES_PER_CHECKSUM);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testBlockReaderSimpleReads() throws IOException {
|
public void testBlockReaderSimpleReads() throws IOException {
|
||||||
runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), true,
|
runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), true,
|
||||||
@ -275,11 +281,11 @@ public void testBlockReaderSimpleReadsNoChecksumNoReadahead()
|
|||||||
throws IOException {
|
throws IOException {
|
||||||
runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), false, 0);
|
runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), false, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class TestBlockReaderLocalArrayReads2
|
private static class TestBlockReaderLocalArrayReads2
|
||||||
extends BlockReaderLocalTest {
|
extends BlockReaderLocalTest {
|
||||||
@Override
|
@Override
|
||||||
public void doTest(BlockReaderLocal reader, byte original[])
|
public void doTest(BlockReaderLocal reader, byte original[])
|
||||||
throws IOException {
|
throws IOException {
|
||||||
byte buf[] = new byte[TEST_LENGTH];
|
byte buf[] = new byte[TEST_LENGTH];
|
||||||
reader.readFully(buf, 0, 10);
|
reader.readFully(buf, 0, 10);
|
||||||
@ -296,7 +302,7 @@ public void doTest(BlockReaderLocal reader, byte original[])
|
|||||||
assertArrayRegionsEqual(original, 1716, buf, 1716, 5);
|
assertArrayRegionsEqual(original, 1716, buf, 1716, 5);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testBlockReaderLocalArrayReads2() throws IOException {
|
public void testBlockReaderLocalArrayReads2() throws IOException {
|
||||||
runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(),
|
runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(),
|
||||||
@ -322,10 +328,10 @@ public void testBlockReaderLocalArrayReads2NoChecksumNoReadahead()
|
|||||||
runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(), false, 0);
|
runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(), false, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class TestBlockReaderLocalByteBufferReads
|
private static class TestBlockReaderLocalByteBufferReads
|
||||||
extends BlockReaderLocalTest {
|
extends BlockReaderLocalTest {
|
||||||
@Override
|
@Override
|
||||||
public void doTest(BlockReaderLocal reader, byte original[])
|
public void doTest(BlockReaderLocal reader, byte original[])
|
||||||
throws IOException {
|
throws IOException {
|
||||||
ByteBuffer buf = ByteBuffer.wrap(new byte[TEST_LENGTH]);
|
ByteBuffer buf = ByteBuffer.wrap(new byte[TEST_LENGTH]);
|
||||||
readFully(reader, buf, 0, 10);
|
readFully(reader, buf, 0, 10);
|
||||||
@ -339,7 +345,7 @@ public void doTest(BlockReaderLocal reader, byte original[])
|
|||||||
assertArrayRegionsEqual(original, 811, buf.array(), 811, 5);
|
assertArrayRegionsEqual(original, 811, buf.array(), 811, 5);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testBlockReaderLocalByteBufferReads()
|
public void testBlockReaderLocalByteBufferReads()
|
||||||
throws IOException {
|
throws IOException {
|
||||||
@ -354,7 +360,7 @@ public void testBlockReaderLocalByteBufferReadsNoChecksum()
|
|||||||
new TestBlockReaderLocalByteBufferReads(),
|
new TestBlockReaderLocalByteBufferReads(),
|
||||||
false, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
|
false, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testBlockReaderLocalByteBufferReadsNoReadahead()
|
public void testBlockReaderLocalByteBufferReadsNoReadahead()
|
||||||
throws IOException {
|
throws IOException {
|
||||||
@ -373,7 +379,7 @@ public void testBlockReaderLocalByteBufferReadsNoChecksumNoReadahead()
|
|||||||
* Test reads that bypass the bounce buffer (because they are aligned
|
* Test reads that bypass the bounce buffer (because they are aligned
|
||||||
* and bigger than the readahead).
|
* and bigger than the readahead).
|
||||||
*/
|
*/
|
||||||
private static class TestBlockReaderLocalByteBufferFastLaneReads
|
private static class TestBlockReaderLocalByteBufferFastLaneReads
|
||||||
extends BlockReaderLocalTest {
|
extends BlockReaderLocalTest {
|
||||||
@Override
|
@Override
|
||||||
public void doTest(BlockReaderLocal reader, byte original[])
|
public void doTest(BlockReaderLocal reader, byte original[])
|
||||||
@ -410,7 +416,7 @@ public void doTest(BlockReaderLocal reader, byte original[])
|
|||||||
50);
|
50);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testBlockReaderLocalByteBufferFastLaneReads()
|
public void testBlockReaderLocalByteBufferFastLaneReads()
|
||||||
throws IOException {
|
throws IOException {
|
||||||
@ -456,7 +462,7 @@ public void setup(File blockFile, boolean usingChecksums)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void doTest(BlockReaderLocal reader, byte original[])
|
public void doTest(BlockReaderLocal reader, byte original[])
|
||||||
throws IOException {
|
throws IOException {
|
||||||
byte buf[] = new byte[TEST_LENGTH];
|
byte buf[] = new byte[TEST_LENGTH];
|
||||||
if (usingChecksums) {
|
if (usingChecksums) {
|
||||||
@ -471,19 +477,19 @@ public void doTest(BlockReaderLocal reader, byte original[])
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testBlockReaderLocalReadCorruptStart()
|
public void testBlockReaderLocalReadCorruptStart()
|
||||||
throws IOException {
|
throws IOException {
|
||||||
runBlockReaderLocalTest(new TestBlockReaderLocalReadCorruptStart(), true,
|
runBlockReaderLocalTest(new TestBlockReaderLocalReadCorruptStart(), true,
|
||||||
HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
|
HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class TestBlockReaderLocalReadCorrupt
|
private static class TestBlockReaderLocalReadCorrupt
|
||||||
extends BlockReaderLocalTest {
|
extends BlockReaderLocalTest {
|
||||||
boolean usingChecksums = false;
|
boolean usingChecksums = false;
|
||||||
@Override
|
@Override
|
||||||
public void setup(File blockFile, boolean usingChecksums)
|
public void setup(File blockFile, boolean usingChecksums)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
RandomAccessFile bf = null;
|
RandomAccessFile bf = null;
|
||||||
this.usingChecksums = usingChecksums;
|
this.usingChecksums = usingChecksums;
|
||||||
@ -496,7 +502,7 @@ public void setup(File blockFile, boolean usingChecksums)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void doTest(BlockReaderLocal reader, byte original[])
|
public void doTest(BlockReaderLocal reader, byte original[])
|
||||||
throws IOException {
|
throws IOException {
|
||||||
byte buf[] = new byte[TEST_LENGTH];
|
byte buf[] = new byte[TEST_LENGTH];
|
||||||
try {
|
try {
|
||||||
@ -522,7 +528,7 @@ public void doTest(BlockReaderLocal reader, byte original[])
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testBlockReaderLocalReadCorrupt()
|
public void testBlockReaderLocalReadCorrupt()
|
||||||
throws IOException {
|
throws IOException {
|
||||||
@ -555,7 +561,7 @@ private static class TestBlockReaderLocalWithMlockChanges
|
|||||||
public void setup(File blockFile, boolean usingChecksums)
|
public void setup(File blockFile, boolean usingChecksums)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void doTest(BlockReaderLocal reader, byte original[])
|
public void doTest(BlockReaderLocal reader, byte original[])
|
||||||
throws IOException {
|
throws IOException {
|
||||||
@ -675,7 +681,7 @@ public void testBlockReaderLocalOnFileWithoutChecksumNoChecksumNoReadahead()
|
|||||||
runBlockReaderLocalTest(new TestBlockReaderLocalOnFileWithoutChecksum(),
|
runBlockReaderLocalTest(new TestBlockReaderLocalOnFileWithoutChecksum(),
|
||||||
false, 0);
|
false, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testBlockReaderLocalReadZeroBytes()
|
public void testBlockReaderLocalReadZeroBytes()
|
||||||
throws IOException {
|
throws IOException {
|
||||||
@ -703,7 +709,7 @@ public void testBlockReaderLocalReadZeroBytesNoChecksumNoReadahead()
|
|||||||
runBlockReaderLocalTest(new TestBlockReaderLocalReadZeroBytes(),
|
runBlockReaderLocalTest(new TestBlockReaderLocalReadZeroBytes(),
|
||||||
false, 0);
|
false, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test(timeout=60000)
|
@Test(timeout=60000)
|
||||||
public void TestStatisticsForShortCircuitLocalRead() throws Exception {
|
public void TestStatisticsForShortCircuitLocalRead() throws Exception {
|
||||||
@ -714,7 +720,7 @@ public void TestStatisticsForShortCircuitLocalRead() throws Exception {
|
|||||||
public void TestStatisticsForLocalRead() throws Exception {
|
public void TestStatisticsForLocalRead() throws Exception {
|
||||||
testStatistics(false);
|
testStatistics(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void testStatistics(boolean isShortCircuit) throws Exception {
|
private void testStatistics(boolean isShortCircuit) throws Exception {
|
||||||
Assume.assumeTrue(DomainSocket.getLoadingFailureReason() == null);
|
Assume.assumeTrue(DomainSocket.getLoadingFailureReason() == null);
|
||||||
HdfsConfiguration conf = new HdfsConfiguration();
|
HdfsConfiguration conf = new HdfsConfiguration();
|
||||||
@ -756,12 +762,12 @@ private void testStatistics(boolean isShortCircuit) throws Exception {
|
|||||||
IOUtils.readFully(fsIn, original, 0,
|
IOUtils.readFully(fsIn, original, 0,
|
||||||
BlockReaderLocalTest.TEST_LENGTH);
|
BlockReaderLocalTest.TEST_LENGTH);
|
||||||
HdfsDataInputStream dfsIn = (HdfsDataInputStream)fsIn;
|
HdfsDataInputStream dfsIn = (HdfsDataInputStream)fsIn;
|
||||||
Assert.assertEquals(BlockReaderLocalTest.TEST_LENGTH,
|
Assert.assertEquals(BlockReaderLocalTest.TEST_LENGTH,
|
||||||
dfsIn.getReadStatistics().getTotalBytesRead());
|
dfsIn.getReadStatistics().getTotalBytesRead());
|
||||||
Assert.assertEquals(BlockReaderLocalTest.TEST_LENGTH,
|
Assert.assertEquals(BlockReaderLocalTest.TEST_LENGTH,
|
||||||
dfsIn.getReadStatistics().getTotalLocalBytesRead());
|
dfsIn.getReadStatistics().getTotalLocalBytesRead());
|
||||||
if (isShortCircuit) {
|
if (isShortCircuit) {
|
||||||
Assert.assertEquals(BlockReaderLocalTest.TEST_LENGTH,
|
Assert.assertEquals(BlockReaderLocalTest.TEST_LENGTH,
|
||||||
dfsIn.getReadStatistics().getTotalShortCircuitBytesRead());
|
dfsIn.getReadStatistics().getTotalShortCircuitBytesRead());
|
||||||
} else {
|
} else {
|
||||||
Assert.assertEquals(0,
|
Assert.assertEquals(0,
|
@ -15,7 +15,7 @@
|
|||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs;
|
package org.apache.hadoop.hdfs.client.impl;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
@ -30,6 +30,13 @@
|
|||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
import org.apache.hadoop.hdfs.DFSInputStream;
|
||||||
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
|
import org.apache.hadoop.hdfs.DFSUtilClient;
|
||||||
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
|
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
|
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
|
||||||
@ -52,7 +59,7 @@ public static void setupCluster() throws IOException {
|
|||||||
DFSInputStream.tcpReadsDisabledForTesting = true;
|
DFSInputStream.tcpReadsDisabledForTesting = true;
|
||||||
DomainSocket.disableBindPathValidation();
|
DomainSocket.disableBindPathValidation();
|
||||||
}
|
}
|
||||||
|
|
||||||
private static HdfsConfiguration getConfiguration(
|
private static HdfsConfiguration getConfiguration(
|
||||||
TemporarySocketDirectory socketDir) throws IOException {
|
TemporarySocketDirectory socketDir) throws IOException {
|
||||||
HdfsConfiguration conf = new HdfsConfiguration();
|
HdfsConfiguration conf = new HdfsConfiguration();
|
||||||
@ -84,7 +91,7 @@ private static HdfsConfiguration getConfiguration(
|
|||||||
public void testStablePositionAfterCorruptRead() throws Exception {
|
public void testStablePositionAfterCorruptRead() throws Exception {
|
||||||
final short REPL_FACTOR = 1;
|
final short REPL_FACTOR = 1;
|
||||||
final long FILE_LENGTH = 512L;
|
final long FILE_LENGTH = 512L;
|
||||||
|
|
||||||
HdfsConfiguration conf = getConfiguration(null);
|
HdfsConfiguration conf = getConfiguration(null);
|
||||||
MiniDFSCluster cluster =
|
MiniDFSCluster cluster =
|
||||||
new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
@ -15,11 +15,12 @@
|
|||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs;
|
package org.apache.hadoop.hdfs.client.impl;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
|
|
||||||
public class TestRemoteBlockReader extends TestBlockReaderBase {
|
public class TestBlockReaderRemote extends TestBlockReaderBase {
|
||||||
|
|
||||||
HdfsConfiguration createConf() {
|
HdfsConfiguration createConf() {
|
||||||
HdfsConfiguration conf = new HdfsConfiguration();
|
HdfsConfiguration conf = new HdfsConfiguration();
|
@ -15,9 +15,11 @@
|
|||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs;
|
package org.apache.hadoop.hdfs.client.impl;
|
||||||
|
|
||||||
public class TestRemoteBlockReader2 extends TestBlockReaderBase {
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
|
|
||||||
|
public class TestBlockReaderRemote2 extends TestBlockReaderBase {
|
||||||
HdfsConfiguration createConf() {
|
HdfsConfiguration createConf() {
|
||||||
HdfsConfiguration conf = new HdfsConfiguration();
|
HdfsConfiguration conf = new HdfsConfiguration();
|
||||||
return conf;
|
return conf;
|
@ -16,7 +16,7 @@
|
|||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.apache.hadoop.hdfs;
|
package org.apache.hadoop.hdfs.client.impl;
|
||||||
|
|
||||||
import static org.mockito.Mockito.never;
|
import static org.mockito.Mockito.never;
|
||||||
import static org.mockito.Mockito.spy;
|
import static org.mockito.Mockito.spy;
|
||||||
@ -25,6 +25,7 @@
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hdfs.DFSClient;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
|
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
@ -41,7 +42,7 @@ public class TestClientBlockVerification {
|
|||||||
static LocatedBlock testBlock = null;
|
static LocatedBlock testBlock = null;
|
||||||
|
|
||||||
static {
|
static {
|
||||||
GenericTestUtils.setLogLevel(RemoteBlockReader2.LOG, Level.ALL);
|
GenericTestUtils.setLogLevel(BlockReaderRemote2.LOG, Level.ALL);
|
||||||
}
|
}
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void setupCluster() throws Exception {
|
public static void setupCluster() throws Exception {
|
||||||
@ -57,7 +58,7 @@ public static void setupCluster() throws Exception {
|
|||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testBlockVerification() throws Exception {
|
public void testBlockVerification() throws Exception {
|
||||||
RemoteBlockReader2 reader = (RemoteBlockReader2)spy(
|
BlockReaderRemote2 reader = (BlockReaderRemote2)spy(
|
||||||
util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024));
|
util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024));
|
||||||
util.readAndCheckEOS(reader, FILE_SIZE_K * 1024, true);
|
util.readAndCheckEOS(reader, FILE_SIZE_K * 1024, true);
|
||||||
verify(reader).sendReadResult(Status.CHECKSUM_OK);
|
verify(reader).sendReadResult(Status.CHECKSUM_OK);
|
||||||
@ -69,7 +70,7 @@ public void testBlockVerification() throws Exception {
|
|||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testIncompleteRead() throws Exception {
|
public void testIncompleteRead() throws Exception {
|
||||||
RemoteBlockReader2 reader = (RemoteBlockReader2)spy(
|
BlockReaderRemote2 reader = (BlockReaderRemote2)spy(
|
||||||
util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024));
|
util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024));
|
||||||
util.readAndCheckEOS(reader, FILE_SIZE_K / 2 * 1024, false);
|
util.readAndCheckEOS(reader, FILE_SIZE_K / 2 * 1024, false);
|
||||||
|
|
||||||
@ -87,7 +88,7 @@ public void testIncompleteRead() throws Exception {
|
|||||||
@Test
|
@Test
|
||||||
public void testCompletePartialRead() throws Exception {
|
public void testCompletePartialRead() throws Exception {
|
||||||
// Ask for half the file
|
// Ask for half the file
|
||||||
RemoteBlockReader2 reader = (RemoteBlockReader2)spy(
|
BlockReaderRemote2 reader = (BlockReaderRemote2)spy(
|
||||||
util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024 / 2));
|
util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024 / 2));
|
||||||
// And read half the file
|
// And read half the file
|
||||||
util.readAndCheckEOS(reader, FILE_SIZE_K * 1024 / 2, true);
|
util.readAndCheckEOS(reader, FILE_SIZE_K * 1024 / 2, true);
|
||||||
@ -107,7 +108,7 @@ public void testUnalignedReads() throws Exception {
|
|||||||
for (int length : lengths) {
|
for (int length : lengths) {
|
||||||
DFSClient.LOG.info("Testing startOffset = " + startOffset + " and " +
|
DFSClient.LOG.info("Testing startOffset = " + startOffset + " and " +
|
||||||
" len=" + length);
|
" len=" + length);
|
||||||
RemoteBlockReader2 reader = (RemoteBlockReader2)spy(
|
BlockReaderRemote2 reader = (BlockReaderRemote2)spy(
|
||||||
util.getBlockReader(testBlock, startOffset, length));
|
util.getBlockReader(testBlock, startOffset, length));
|
||||||
util.readAndCheckEOS(reader, length, true);
|
util.readAndCheckEOS(reader, length, true);
|
||||||
verify(reader).sendReadResult(Status.CHECKSUM_OK);
|
verify(reader).sendReadResult(Status.CHECKSUM_OK);
|
@ -35,7 +35,7 @@
|
|||||||
import org.apache.hadoop.fs.FsTracer;
|
import org.apache.hadoop.fs.FsTracer;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.BlockReader;
|
import org.apache.hadoop.hdfs.BlockReader;
|
||||||
import org.apache.hadoop.hdfs.BlockReaderFactory;
|
import org.apache.hadoop.hdfs.client.impl.BlockReaderFactory;
|
||||||
import org.apache.hadoop.hdfs.ClientContext;
|
import org.apache.hadoop.hdfs.ClientContext;
|
||||||
import org.apache.hadoop.hdfs.DFSClient;
|
import org.apache.hadoop.hdfs.DFSClient;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
@ -42,7 +42,7 @@
|
|||||||
import org.apache.hadoop.fs.FsTracer;
|
import org.apache.hadoop.fs.FsTracer;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.BlockReader;
|
import org.apache.hadoop.hdfs.BlockReader;
|
||||||
import org.apache.hadoop.hdfs.BlockReaderFactory;
|
import org.apache.hadoop.hdfs.client.impl.BlockReaderFactory;
|
||||||
import org.apache.hadoop.hdfs.ClientContext;
|
import org.apache.hadoop.hdfs.ClientContext;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
|
@ -43,7 +43,7 @@
|
|||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.RemoteIterator;
|
import org.apache.hadoop.fs.RemoteIterator;
|
||||||
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
||||||
import org.apache.hadoop.hdfs.BlockReaderTestUtil;
|
import org.apache.hadoop.hdfs.client.impl.BlockReaderTestUtil;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
@ -59,7 +59,6 @@
|
|||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetCache;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetCache;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetCache.PageRounder;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetCache.PageRounder;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
|
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSImage;
|
import org.apache.hadoop.hdfs.server.namenode.FSImage;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand;
|
import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand;
|
||||||
|
@ -27,7 +27,7 @@
|
|||||||
import org.apache.hadoop.fs.FSDataInputStream;
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.ReadOption;
|
import org.apache.hadoop.fs.ReadOption;
|
||||||
import org.apache.hadoop.hdfs.BlockReaderTestUtil;
|
import org.apache.hadoop.hdfs.client.impl.BlockReaderTestUtil;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
|
@ -55,7 +55,7 @@
|
|||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.RemoteIterator;
|
import org.apache.hadoop.fs.RemoteIterator;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.hdfs.BlockReaderTestUtil;
|
import org.apache.hadoop.hdfs.client.impl.BlockReaderTestUtil;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
|
@ -39,8 +39,8 @@
|
|||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FSDataInputStream;
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.BlockReaderFactory;
|
import org.apache.hadoop.hdfs.client.impl.BlockReaderFactory;
|
||||||
import org.apache.hadoop.hdfs.BlockReaderTestUtil;
|
import org.apache.hadoop.hdfs.client.impl.BlockReaderTestUtil;
|
||||||
import org.apache.hadoop.hdfs.DFSInputStream;
|
import org.apache.hadoop.hdfs.DFSInputStream;
|
||||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
|
@ -45,7 +45,7 @@
|
|||||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.TestBlockReaderLocal;
|
import org.apache.hadoop.hdfs.client.impl.TestBlockReaderLocal;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
|
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
|
||||||
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
|
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
|
||||||
@ -54,7 +54,6 @@
|
|||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||||
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
|
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
|
||||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
|
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.net.unix.DomainSocket;
|
import org.apache.hadoop.net.unix.DomainSocket;
|
||||||
import org.apache.hadoop.net.unix.TemporarySocketDirectory;
|
import org.apache.hadoop.net.unix.TemporarySocketDirectory;
|
||||||
@ -593,7 +592,7 @@ public void testReadWithRemoteBlockReader()
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Test that file data can be read by reading the block
|
* Test that file data can be read by reading the block
|
||||||
* through RemoteBlockReader
|
* through BlockReaderRemote
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public void doTestShortCircuitReadWithRemoteBlockReader(
|
public void doTestShortCircuitReadWithRemoteBlockReader(
|
||||||
@ -623,9 +622,9 @@ public void doTestShortCircuitReadWithRemoteBlockReader(
|
|||||||
try {
|
try {
|
||||||
checkFileContent(uri, file1, fileData, readOffset, shortCircuitUser,
|
checkFileContent(uri, file1, fileData, readOffset, shortCircuitUser,
|
||||||
conf, shortCircuitFails);
|
conf, shortCircuitFails);
|
||||||
//RemoteBlockReader have unsupported method read(ByteBuffer bf)
|
//BlockReaderRemote have unsupported method read(ByteBuffer bf)
|
||||||
assertTrue(
|
assertTrue(
|
||||||
"RemoteBlockReader unsupported method read(ByteBuffer bf) error",
|
"BlockReaderRemote unsupported method read(ByteBuffer bf) error",
|
||||||
checkUnsupportedMethod(fs, file1, fileData, readOffset));
|
checkUnsupportedMethod(fs, file1, fileData, readOffset));
|
||||||
} catch(IOException e) {
|
} catch(IOException e) {
|
||||||
throw new IOException(
|
throw new IOException(
|
||||||
|
Loading…
Reference in New Issue
Block a user