HDFS-5728. Block recovery will fail if the metafile does not have crc for all chunks of the block. Contributed by Vinay.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1561223 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
3497e76e19
commit
dd1bc7e1c7
@ -594,6 +594,9 @@ Release 2.4.0 - UNRELEASED
|
||||
HDFS-5806. balancer should set SoTimeout to avoid indefinite hangs.
|
||||
(Nathan Roberts via Andrew Wang).
|
||||
|
||||
HDFS-5728. Block recovery will fail if the metafile does not have crc
|
||||
for all chunks of the block (Vinay via kihwal)
|
||||
|
||||
BREAKDOWN OF HDFS-2832 SUBTASKS AND RELATED JIRAS
|
||||
|
||||
HDFS-4985. Add storage type to the protocol and expose it in block report
|
||||
|
@ -23,6 +23,7 @@
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.RandomAccessFile;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.DU;
|
||||
@ -191,7 +192,7 @@ void addToReplicasMap(ReplicaMap volumeMap, File dir, boolean isFinalized
|
||||
blockFile.length(), genStamp, volume, blockFile.getParentFile());
|
||||
} else {
|
||||
newReplica = new ReplicaWaitingToBeRecovered(blockId,
|
||||
validateIntegrity(blockFile, genStamp),
|
||||
validateIntegrityAndSetLength(blockFile, genStamp),
|
||||
genStamp, volume, blockFile.getParentFile());
|
||||
}
|
||||
|
||||
@ -214,7 +215,7 @@ void addToReplicasMap(ReplicaMap volumeMap, File dir, boolean isFinalized
|
||||
* @param genStamp generation stamp of the block
|
||||
* @return the number of valid bytes
|
||||
*/
|
||||
private long validateIntegrity(File blockFile, long genStamp) {
|
||||
private long validateIntegrityAndSetLength(File blockFile, long genStamp) {
|
||||
DataInputStream checksumIn = null;
|
||||
InputStream blockIn = null;
|
||||
try {
|
||||
@ -257,11 +258,25 @@ private long validateIntegrity(File blockFile, long genStamp) {
|
||||
IOUtils.readFully(blockIn, buf, 0, lastChunkSize);
|
||||
|
||||
checksum.update(buf, 0, lastChunkSize);
|
||||
long validFileLength;
|
||||
if (checksum.compare(buf, lastChunkSize)) { // last chunk matches crc
|
||||
return lastChunkStartPos + lastChunkSize;
|
||||
validFileLength = lastChunkStartPos + lastChunkSize;
|
||||
} else { // last chunck is corrupt
|
||||
return lastChunkStartPos;
|
||||
validFileLength = lastChunkStartPos;
|
||||
}
|
||||
|
||||
// truncate if extra bytes are present without CRC
|
||||
if (blockFile.length() > validFileLength) {
|
||||
RandomAccessFile blockRAF = new RandomAccessFile(blockFile, "rw");
|
||||
try {
|
||||
// truncate blockFile
|
||||
blockRAF.setLength(validFileLength);
|
||||
} finally {
|
||||
blockRAF.close();
|
||||
}
|
||||
}
|
||||
|
||||
return validFileLength;
|
||||
} catch (IOException e) {
|
||||
FsDatasetImpl.LOG.warn(e);
|
||||
return 0;
|
||||
|
@ -19,20 +19,28 @@
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.RandomAccessFile;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.TestInterDatanodeProtocol;
|
||||
import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestLeaseRecovery {
|
||||
@ -148,4 +156,55 @@ public void testBlockSynchronization() throws Exception {
|
||||
if (cluster != null) {cluster.shutdown();}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Block Recovery when the meta file not having crcs for all chunks in block
|
||||
* file
|
||||
*/
|
||||
@Test
|
||||
public void testBlockRecoveryWithLessMetafile() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
|
||||
UserGroupInformation.getCurrentUser().getShortUserName());
|
||||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
|
||||
.build();
|
||||
Path file = new Path("/testRecoveryFile");
|
||||
DistributedFileSystem dfs = cluster.getFileSystem();
|
||||
FSDataOutputStream out = dfs.create(file);
|
||||
int count = 0;
|
||||
while (count < 2 * 1024 * 1024) {
|
||||
out.writeBytes("Data");
|
||||
count += 4;
|
||||
}
|
||||
out.hsync();
|
||||
// abort the original stream
|
||||
((DFSOutputStream) out.getWrappedStream()).abort();
|
||||
|
||||
LocatedBlocks locations = cluster.getNameNodeRpc().getBlockLocations(
|
||||
file.toString(), 0, count);
|
||||
ExtendedBlock block = locations.get(0).getBlock();
|
||||
DataNode dn = cluster.getDataNodes().get(0);
|
||||
BlockLocalPathInfo localPathInfo = dn.getBlockLocalPathInfo(block, null);
|
||||
File metafile = new File(localPathInfo.getMetaPath());
|
||||
assertTrue(metafile.exists());
|
||||
|
||||
// reduce the block meta file size
|
||||
RandomAccessFile raf = new RandomAccessFile(metafile, "rw");
|
||||
raf.setLength(metafile.length() - 20);
|
||||
raf.close();
|
||||
|
||||
// restart DN to make replica to RWR
|
||||
DataNodeProperties dnProp = cluster.stopDataNode(0);
|
||||
cluster.restartDataNode(dnProp, true);
|
||||
|
||||
// try to recover the lease
|
||||
DistributedFileSystem newdfs = (DistributedFileSystem) FileSystem
|
||||
.newInstance(cluster.getConfiguration(0));
|
||||
count = 0;
|
||||
while (++count < 10 && !newdfs.recoverLease(file)) {
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
assertTrue("File should be closed", newdfs.recoverLease(file));
|
||||
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user