HDFS-8117. More accurate verification in SimulatedFSDataset: replace DEFAULT_DATABYTE with patterned data. Contributed by Zhe Zhang.
This commit is contained in:
parent
7fc50e2525
commit
d60e22152a
@ -431,6 +431,9 @@ Release 2.8.0 - UNRELEASED
|
|||||||
HDFS-8083. Move dfs.client.write.* conf from DFSConfigKeys to
|
HDFS-8083. Move dfs.client.write.* conf from DFSConfigKeys to
|
||||||
HdfsClientConfigKeys.Write. (szetszwo)
|
HdfsClientConfigKeys.Write. (szetszwo)
|
||||||
|
|
||||||
|
HDFS-8117. More accurate verification in SimulatedFSDataset: replace
|
||||||
|
DEFAULT_DATABYTE with patterned data. (Zhe Zhang via wang)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
||||||
|
@ -91,6 +91,7 @@
|
|||||||
import org.apache.hadoop.hdfs.MiniDFSCluster.NameNodeInfo;
|
import org.apache.hadoop.hdfs.MiniDFSCluster.NameNodeInfo;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
|
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
|
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
|
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||||
@ -117,6 +118,7 @@
|
|||||||
import org.apache.hadoop.hdfs.server.common.StorageInfo;
|
import org.apache.hadoop.hdfs.server.common.StorageInfo;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeLayoutVersion;
|
import org.apache.hadoop.hdfs.server.datanode.DataNodeLayoutVersion;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.TestTransferRbw;
|
import org.apache.hadoop.hdfs.server.datanode.TestTransferRbw;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
|
import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
|
||||||
@ -1769,4 +1771,27 @@ public static void resetLastUpdatesWithOffset(DatanodeInfo dn, long offset) {
|
|||||||
dn.setLastUpdateMonotonic(Time.monotonicNow() + offset);
|
dn.setLastUpdateMonotonic(Time.monotonicNow() + offset);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This method takes a set of block locations and fills the provided buffer
|
||||||
|
* with expected bytes based on simulated content from
|
||||||
|
* {@link SimulatedFSDataset}.
|
||||||
|
*
|
||||||
|
* @param lbs The block locations of a file
|
||||||
|
* @param expected The buffer to be filled with expected bytes on the above
|
||||||
|
* locations.
|
||||||
|
*/
|
||||||
|
public static void fillExpectedBuf(LocatedBlocks lbs, byte[] expected) {
|
||||||
|
Block[] blks = new Block[lbs.getLocatedBlocks().size()];
|
||||||
|
for (int i = 0; i < lbs.getLocatedBlocks().size(); i++) {
|
||||||
|
blks[i] = lbs.getLocatedBlocks().get(i).getBlock().getLocalBlock();
|
||||||
|
}
|
||||||
|
int bufPos = 0;
|
||||||
|
for (Block b : blks) {
|
||||||
|
for (long blkPos = 0; blkPos < b.getNumBytes(); blkPos++) {
|
||||||
|
assert bufPos < expected.length;
|
||||||
|
expected[bufPos++] = SimulatedFSDataset.simulatedByte(b, blkPos);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -68,7 +68,7 @@ private void writeFile(FSDataOutputStream stm) throws IOException {
|
|||||||
//
|
//
|
||||||
// verify that the data written to the full blocks are sane
|
// verify that the data written to the full blocks are sane
|
||||||
//
|
//
|
||||||
private void checkFile(FileSystem fileSys, Path name, int repl)
|
private void checkFile(DistributedFileSystem fileSys, Path name, int repl)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
boolean done = false;
|
boolean done = false;
|
||||||
|
|
||||||
@ -96,9 +96,9 @@ private void checkFile(FileSystem fileSys, Path name, int repl)
|
|||||||
byte[] expected =
|
byte[] expected =
|
||||||
new byte[AppendTestUtil.NUM_BLOCKS * AppendTestUtil.BLOCK_SIZE];
|
new byte[AppendTestUtil.NUM_BLOCKS * AppendTestUtil.BLOCK_SIZE];
|
||||||
if (simulatedStorage) {
|
if (simulatedStorage) {
|
||||||
for (int i= 0; i < expected.length; i++) {
|
LocatedBlocks lbs = fileSys.getClient().getLocatedBlocks(name.toString(),
|
||||||
expected[i] = SimulatedFSDataset.DEFAULT_DATABYTE;
|
0, AppendTestUtil.FILE_SIZE);
|
||||||
}
|
DFSTestUtil.fillExpectedBuf(lbs, expected);
|
||||||
} else {
|
} else {
|
||||||
System.arraycopy(fileContents, 0, expected, 0, expected.length);
|
System.arraycopy(fileContents, 0, expected, 0, expected.length);
|
||||||
}
|
}
|
||||||
@ -193,7 +193,7 @@ public void testSimpleFlush() throws IOException {
|
|||||||
}
|
}
|
||||||
fileContents = AppendTestUtil.initBuffer(AppendTestUtil.FILE_SIZE);
|
fileContents = AppendTestUtil.initBuffer(AppendTestUtil.FILE_SIZE);
|
||||||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
|
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
|
||||||
FileSystem fs = cluster.getFileSystem();
|
DistributedFileSystem fs = cluster.getFileSystem();
|
||||||
try {
|
try {
|
||||||
|
|
||||||
// create a new file.
|
// create a new file.
|
||||||
@ -249,7 +249,7 @@ public void testComplexFlush() throws IOException {
|
|||||||
}
|
}
|
||||||
fileContents = AppendTestUtil.initBuffer(AppendTestUtil.FILE_SIZE);
|
fileContents = AppendTestUtil.initBuffer(AppendTestUtil.FILE_SIZE);
|
||||||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
|
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
|
||||||
FileSystem fs = cluster.getFileSystem();
|
DistributedFileSystem fs = cluster.getFileSystem();
|
||||||
try {
|
try {
|
||||||
|
|
||||||
// create a new file.
|
// create a new file.
|
||||||
|
@ -37,6 +37,7 @@
|
|||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||||
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
|
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
|
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
@ -54,6 +55,8 @@
|
|||||||
public class TestPread {
|
public class TestPread {
|
||||||
static final long seed = 0xDEADBEEFL;
|
static final long seed = 0xDEADBEEFL;
|
||||||
static final int blockSize = 4096;
|
static final int blockSize = 4096;
|
||||||
|
static final int numBlocksPerFile = 12;
|
||||||
|
static final int fileSize = numBlocksPerFile * blockSize;
|
||||||
boolean simulatedStorage;
|
boolean simulatedStorage;
|
||||||
boolean isHedgedRead;
|
boolean isHedgedRead;
|
||||||
|
|
||||||
@ -66,10 +69,10 @@ public void setup() {
|
|||||||
private void writeFile(FileSystem fileSys, Path name) throws IOException {
|
private void writeFile(FileSystem fileSys, Path name) throws IOException {
|
||||||
int replication = 3;// We need > 1 blocks to test out the hedged reads.
|
int replication = 3;// We need > 1 blocks to test out the hedged reads.
|
||||||
// test empty file open and read
|
// test empty file open and read
|
||||||
DFSTestUtil.createFile(fileSys, name, 12 * blockSize, 0,
|
DFSTestUtil.createFile(fileSys, name, fileSize, 0,
|
||||||
blockSize, (short)replication, seed);
|
blockSize, (short)replication, seed);
|
||||||
FSDataInputStream in = fileSys.open(name);
|
FSDataInputStream in = fileSys.open(name);
|
||||||
byte[] buffer = new byte[12 * blockSize];
|
byte[] buffer = new byte[fileSize];
|
||||||
in.readFully(0, buffer, 0, 0);
|
in.readFully(0, buffer, 0, 0);
|
||||||
IOException res = null;
|
IOException res = null;
|
||||||
try { // read beyond the end of the file
|
try { // read beyond the end of the file
|
||||||
@ -84,7 +87,7 @@ private void writeFile(FileSystem fileSys, Path name) throws IOException {
|
|||||||
assertTrue("Cannot delete file", false);
|
assertTrue("Cannot delete file", false);
|
||||||
|
|
||||||
// now create the real file
|
// now create the real file
|
||||||
DFSTestUtil.createFile(fileSys, name, 12 * blockSize, 12 * blockSize,
|
DFSTestUtil.createFile(fileSys, name, fileSize, fileSize,
|
||||||
blockSize, (short) replication, seed);
|
blockSize, (short) replication, seed);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -128,11 +131,13 @@ private void doPread(FSDataInputStream stm, long position, byte[] buffer,
|
|||||||
|
|
||||||
private void pReadFile(FileSystem fileSys, Path name) throws IOException {
|
private void pReadFile(FileSystem fileSys, Path name) throws IOException {
|
||||||
FSDataInputStream stm = fileSys.open(name);
|
FSDataInputStream stm = fileSys.open(name);
|
||||||
byte[] expected = new byte[12 * blockSize];
|
byte[] expected = new byte[fileSize];
|
||||||
if (simulatedStorage) {
|
if (simulatedStorage) {
|
||||||
for (int i= 0; i < expected.length; i++) {
|
assert fileSys instanceof DistributedFileSystem;
|
||||||
expected[i] = SimulatedFSDataset.DEFAULT_DATABYTE;
|
DistributedFileSystem dfs = (DistributedFileSystem) fileSys;
|
||||||
}
|
LocatedBlocks lbs = dfs.getClient().getLocatedBlocks(name.toString(),
|
||||||
|
0, fileSize);
|
||||||
|
DFSTestUtil.fillExpectedBuf(lbs, expected);
|
||||||
} else {
|
} else {
|
||||||
Random rand = new Random(seed);
|
Random rand = new Random(seed);
|
||||||
rand.nextBytes(expected);
|
rand.nextBytes(expected);
|
||||||
@ -447,7 +452,7 @@ private void dfsPreadTest(Configuration conf, boolean disableTransferTo, boolean
|
|||||||
FileSystem fileSys = cluster.getFileSystem();
|
FileSystem fileSys = cluster.getFileSystem();
|
||||||
fileSys.setVerifyChecksum(verifyChecksum);
|
fileSys.setVerifyChecksum(verifyChecksum);
|
||||||
try {
|
try {
|
||||||
Path file1 = new Path("preadtest.dat");
|
Path file1 = new Path("/preadtest.dat");
|
||||||
writeFile(fileSys, file1);
|
writeFile(fileSys, file1);
|
||||||
pReadFile(fileSys, file1);
|
pReadFile(fileSys, file1);
|
||||||
datanodeRestartTest(cluster, fileSys, file1);
|
datanodeRestartTest(cluster, fileSys, file1);
|
||||||
|
@ -25,11 +25,10 @@
|
|||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.BlockLocation;
|
import org.apache.hadoop.fs.BlockLocation;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
|
||||||
import org.apache.hadoop.fs.FSDataInputStream;
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
|
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
@ -52,16 +51,17 @@ private void checkAndEraseData(byte[] actual, int from, byte[] expected, String
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void checkFile(FileSystem fileSys, Path name) throws IOException {
|
private void checkFile(DistributedFileSystem fileSys, Path name)
|
||||||
|
throws IOException {
|
||||||
BlockLocation[] locations = fileSys.getFileBlockLocations(
|
BlockLocation[] locations = fileSys.getFileBlockLocations(
|
||||||
fileSys.getFileStatus(name), 0, fileSize);
|
fileSys.getFileStatus(name), 0, fileSize);
|
||||||
assertEquals("Number of blocks", fileSize, locations.length);
|
assertEquals("Number of blocks", fileSize, locations.length);
|
||||||
FSDataInputStream stm = fileSys.open(name);
|
FSDataInputStream stm = fileSys.open(name);
|
||||||
byte[] expected = new byte[fileSize];
|
byte[] expected = new byte[fileSize];
|
||||||
if (simulatedStorage) {
|
if (simulatedStorage) {
|
||||||
for (int i = 0; i < expected.length; ++i) {
|
LocatedBlocks lbs = fileSys.getClient().getLocatedBlocks(name.toString(),
|
||||||
expected[i] = SimulatedFSDataset.DEFAULT_DATABYTE;
|
0, fileSize);
|
||||||
}
|
DFSTestUtil.fillExpectedBuf(lbs, expected);
|
||||||
} else {
|
} else {
|
||||||
Random rand = new Random(seed);
|
Random rand = new Random(seed);
|
||||||
rand.nextBytes(expected);
|
rand.nextBytes(expected);
|
||||||
@ -90,9 +90,9 @@ public void testSmallBlock() throws IOException {
|
|||||||
}
|
}
|
||||||
conf.set(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, "1");
|
conf.set(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, "1");
|
||||||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
|
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
|
||||||
FileSystem fileSys = cluster.getFileSystem();
|
DistributedFileSystem fileSys = cluster.getFileSystem();
|
||||||
try {
|
try {
|
||||||
Path file1 = new Path("smallblocktest.dat");
|
Path file1 = new Path("/smallblocktest.dat");
|
||||||
DFSTestUtil.createFile(fileSys, file1, fileSize, fileSize, blockSize,
|
DFSTestUtil.createFile(fileSys, file1, fileSize, fileSize, blockSize,
|
||||||
(short) 1, seed);
|
(short) 1, seed);
|
||||||
checkFile(fileSys, file1);
|
checkFile(fileSys, file1);
|
||||||
|
@ -98,11 +98,15 @@ public static void setFactory(Configuration conf) {
|
|||||||
Factory.class.getName());
|
Factory.class.getName());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static byte simulatedByte(Block b, long offsetInBlk) {
|
||||||
|
byte firstByte = (byte) (b.getBlockId() % Byte.MAX_VALUE);
|
||||||
|
return (byte) ((firstByte + offsetInBlk) % Byte.MAX_VALUE);
|
||||||
|
}
|
||||||
|
|
||||||
public static final String CONFIG_PROPERTY_CAPACITY =
|
public static final String CONFIG_PROPERTY_CAPACITY =
|
||||||
"dfs.datanode.simulateddatastorage.capacity";
|
"dfs.datanode.simulateddatastorage.capacity";
|
||||||
|
|
||||||
public static final long DEFAULT_CAPACITY = 2L<<40; // 1 terabyte
|
public static final long DEFAULT_CAPACITY = 2L<<40; // 1 terabyte
|
||||||
public static final byte DEFAULT_DATABYTE = 9;
|
|
||||||
|
|
||||||
public static final String CONFIG_PROPERTY_STATE =
|
public static final String CONFIG_PROPERTY_STATE =
|
||||||
"dfs.datanode.simulateddatastorage.state";
|
"dfs.datanode.simulateddatastorage.state";
|
||||||
@ -182,9 +186,9 @@ synchronized public void setNumBytes(long length) {
|
|||||||
synchronized SimulatedInputStream getIStream() {
|
synchronized SimulatedInputStream getIStream() {
|
||||||
if (!finalized) {
|
if (!finalized) {
|
||||||
// throw new IOException("Trying to read an unfinalized block");
|
// throw new IOException("Trying to read an unfinalized block");
|
||||||
return new SimulatedInputStream(oStream.getLength(), DEFAULT_DATABYTE);
|
return new SimulatedInputStream(oStream.getLength(), theBlock);
|
||||||
} else {
|
} else {
|
||||||
return new SimulatedInputStream(theBlock.getNumBytes(), DEFAULT_DATABYTE);
|
return new SimulatedInputStream(theBlock.getNumBytes(), theBlock);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -991,21 +995,19 @@ public synchronized void adjustCrcChannelPosition(ExtendedBlock b,
|
|||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
static private class SimulatedInputStream extends java.io.InputStream {
|
static private class SimulatedInputStream extends java.io.InputStream {
|
||||||
|
|
||||||
|
|
||||||
byte theRepeatedData = 7;
|
|
||||||
final long length; // bytes
|
final long length; // bytes
|
||||||
int currentPos = 0;
|
int currentPos = 0;
|
||||||
byte[] data = null;
|
byte[] data = null;
|
||||||
|
Block theBlock = null;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An input stream of size l with repeated bytes
|
* An input stream of size l with repeated bytes
|
||||||
* @param l size of the stream
|
* @param l size of the stream
|
||||||
* @param iRepeatedData byte that is repeated in the stream
|
* @param iRepeatedData byte that is repeated in the stream
|
||||||
*/
|
*/
|
||||||
SimulatedInputStream(long l, byte iRepeatedData) {
|
SimulatedInputStream(long l, Block b) {
|
||||||
length = l;
|
length = l;
|
||||||
theRepeatedData = iRepeatedData;
|
theBlock = b;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -1031,8 +1033,7 @@ public int read() throws IOException {
|
|||||||
if (data !=null) {
|
if (data !=null) {
|
||||||
return data[currentPos++];
|
return data[currentPos++];
|
||||||
} else {
|
} else {
|
||||||
currentPos++;
|
return simulatedByte(theBlock, currentPos++);
|
||||||
return theRepeatedData;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1052,8 +1053,8 @@ public int read(byte[] b) throws IOException {
|
|||||||
if (data != null) {
|
if (data != null) {
|
||||||
System.arraycopy(data, currentPos, b, 0, bytesRead);
|
System.arraycopy(data, currentPos, b, 0, bytesRead);
|
||||||
} else { // all data is zero
|
} else { // all data is zero
|
||||||
for (int i : b) {
|
for (int i = 0; i < bytesRead; i++) {
|
||||||
b[i] = theRepeatedData;
|
b[i] = simulatedByte(theBlock, currentPos + i);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
currentPos += bytesRead;
|
currentPos += bytesRead;
|
||||||
|
@ -144,7 +144,8 @@ void checkBlockDataAndSize(SimulatedFSDataset fsdataset, ExtendedBlock b,
|
|||||||
long lengthRead = 0;
|
long lengthRead = 0;
|
||||||
int data;
|
int data;
|
||||||
while ((data = input.read()) != -1) {
|
while ((data = input.read()) != -1) {
|
||||||
assertEquals(SimulatedFSDataset.DEFAULT_DATABYTE, data);
|
assertEquals(SimulatedFSDataset.simulatedByte(b.getLocalBlock(),
|
||||||
|
lengthRead), data);
|
||||||
lengthRead++;
|
lengthRead++;
|
||||||
}
|
}
|
||||||
assertEquals(expectedLen, lengthRead);
|
assertEquals(expectedLen, lengthRead);
|
||||||
|
Loading…
Reference in New Issue
Block a user