MAPREDUCE-3868. Make Raid Compile. (Weiyan Wang via schen)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1351548 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Scott Chun-Yang Chen 2012-06-19 00:55:28 +00:00
parent ed7040f07b
commit 277b3dd736
65 changed files with 845 additions and 203 deletions

View File

@ -0,0 +1,60 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<assembly>
<id>hadoop-raid-dist</id>
<formats>
<format>dir</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<fileSets>
<!-- Configuration files -->
<fileSet>
<directory>${basedir}/src/main/conf</directory>
<outputDirectory>/etc/hadoop</outputDirectory>
<includes>
<include>*</include>
</includes>
</fileSet>
<fileSet>
<directory>${basedir}/src/main/sbin</directory>
<outputDirectory>/sbin</outputDirectory>
<includes>
<include>*</include>
</includes>
<fileMode>0755</fileMode>
</fileSet>
<fileSet>
<directory>${basedir}/src/main/libexec</directory>
<outputDirectory>/libexec</outputDirectory>
<includes>
<include>*</include>
</includes>
<fileMode>0755</fileMode>
</fileSet>
<!-- Documentation -->
<fileSet>
<directory>${project.build.directory}/site</directory>
<outputDirectory>/share/doc/hadoop/raid</outputDirectory>
</fileSet>
</fileSets>
<dependencySets>
<dependencySet>
<outputDirectory>/share/hadoop/${hadoop.component}/lib</outputDirectory>
<unpack>false</unpack>
<scope>runtime</scope>
<useProjectArtifact>true</useProjectArtifact>
</dependencySet>
</dependencySets>
</assembly>

View File

@ -52,6 +52,11 @@
<artifactId>hadoop-yarn-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs-raid</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
@ -120,6 +125,7 @@
run cp -r $ROOT/hadoop-common-project/hadoop-common/target/hadoop-common-${project.version}/* .
run cp -r $ROOT/hadoop-hdfs-project/hadoop-hdfs/target/hadoop-hdfs-${project.version}/* .
run cp -r $ROOT/hadoop-hdfs-project/hadoop-hdfs-httpfs/target/hadoop-hdfs-httpfs-${project.version}/* .
run cp -r $ROOT/hadoop-hdfs-project/hadoop-hdfs-raid/target/hadoop-hdfs-raid-${project.version}/* .
run cp -r $ROOT/hadoop-mapreduce-project/target/hadoop-mapreduce-${project.version}/* .
run cp -r $ROOT/hadoop-tools/hadoop-tools-dist/target/hadoop-tools-dist-${project.version}/* .
echo

View File

@ -0,0 +1,202 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project>
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-project-dist</artifactId>
<version>3.0.0-SNAPSHOT</version>
<relativePath>../../hadoop-project-dist</relativePath>
</parent>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs-raid</artifactId>
<version>3.0.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>Apache Hadoop HDFS Raid</name>
<description>Apache Hadoop HDFS Raid</description>
<properties>
<hadoop.component>raid</hadoop.component>
<is.hadoop.component>false</is.hadoop.component>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-annotations</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minicluster</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-archives</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>create-mrapp-generated-classpath</id>
<phase>generate-test-resources</phase>
<goals>
<goal>build-classpath</goal>
</goals>
<configuration>
<!--
This is needed to run the unit tests. It generates the required classpath
that is required in the env of the launch container in the mini mr/yarn cluster.
-->
<outputFile>${project.build.directory}/test-classes/mrapp-generated-classpath</outputFile>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
<configuration>
<excludes>
</excludes>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>findbugs-maven-plugin</artifactId>
<configuration>
<excludeFilterFile>${basedir}/dev-support/findbugsExcludeFile.xml</excludeFilterFile>
</configuration>
</plugin>
</plugins>
</build>
<profiles>
<profile>
<id>docs</id>
<activation>
<activeByDefault>false</activeByDefault>
</activation>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-site-plugin</artifactId>
<executions>
<execution>
<id>docs</id>
<phase>prepare-package</phase>
<goals>
<goal>site</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>dist</id>
<activation>
<activeByDefault>false</activeByDefault>
</activation>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-assemblies</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
<executions>
<execution>
<id>dist</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
<configuration>
<finalName>${project.artifactId}-${project.version}</finalName>
<appendAssemblyId>false</appendAssemblyId>
<attach>false</attach>
<descriptorRefs>
<descriptorRef>hadoop-raid-dist</descriptorRef>
</descriptorRefs>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
<executions>
<execution>
<id>tar</id>
<phase>package</phase>
<goals>
<goal>run</goal>
</goals>
<configuration>
<target if="tar">
<!-- Using Unix script to preserve symlinks -->
<echo file="${project.build.directory}/dist-maketar.sh">
which cygpath 2> /dev/null
if [ $? = 1 ]; then
BUILD_DIR="${project.build.directory}"
else
BUILD_DIR=`cygpath --unix '${project.build.directory}'`
fi
cd $BUILD_DIR
tar czf ${project.artifactId}-${project.version}.tar.gz ${project.artifactId}-${project.version}
</echo>
<exec executable="sh" dir="${project.build.directory}" failonerror="true">
<arg line="./dist-maketar.sh"/>
</exec>
</target>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>

View File

@ -34,7 +34,9 @@
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.net.SocketOutputStream;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.StringUtils;
@ -56,8 +58,10 @@ public class RaidBlockSender implements java.io.Closeable {
private DataInputStream checksumIn; // checksum datastream
private DataChecksum checksum; // checksum stream
private long offset; // starting position to read
/** Initial position to read */
private long initialOffset;
private long endOffset; // ending position
private int bytesPerChecksum; // chunk size
private int chunkSize; // chunk size
private int checksumSize; // checksum size
private boolean corruptChecksumOk; // if need to verify checksum
private boolean chunkOffsetOK; // if need to send chunk offset
@ -74,6 +78,8 @@ public class RaidBlockSender implements java.io.Closeable {
* not sure if there will be much more improvement.
*/
private static final int MIN_BUFFER_WITH_TRANSFERTO = 64*1024;
private static final int TRANSFERTO_BUFFER_SIZE = Math.max(
HdfsConstants.IO_FILE_BUFFER_SIZE, MIN_BUFFER_WITH_TRANSFERTO);
private volatile ChunkChecksum lastChunkChecksum = null;
@ -125,12 +131,13 @@ public RaidBlockSender(ExtendedBlock block, long blockLength, long startOffset,
* is mostly corrupted. For now just truncate bytesPerchecksum to
* blockLength.
*/
bytesPerChecksum = checksum.getBytesPerChecksum();
if (bytesPerChecksum > 10*1024*1024 && bytesPerChecksum > replicaVisibleLength) {
int size = checksum.getBytesPerChecksum();
if (size > 10*1024*1024 && size > replicaVisibleLength) {
checksum = DataChecksum.newDataChecksum(checksum.getChecksumType(),
Math.max((int)replicaVisibleLength, 10*1024*1024));
bytesPerChecksum = checksum.getBytesPerChecksum();
size = checksum.getBytesPerChecksum();
}
chunkSize = size;
checksumSize = checksum.getChecksumSize();
if (length < 0) {
@ -147,12 +154,12 @@ public RaidBlockSender(ExtendedBlock block, long blockLength, long startOffset,
throw new IOException(msg);
}
offset = (startOffset - (startOffset % bytesPerChecksum));
offset = (startOffset - (startOffset % chunkSize));
if (length >= 0) {
// Make sure endOffset points to end of a checksumed chunk.
long tmpLen = startOffset + length;
if (tmpLen % bytesPerChecksum != 0) {
tmpLen += (bytesPerChecksum - tmpLen % bytesPerChecksum);
if (tmpLen % chunkSize != 0) {
tmpLen += (chunkSize - tmpLen % chunkSize);
}
if (tmpLen < endOffset) {
// will use on-disk checksum here since the end is a stable chunk
@ -162,7 +169,7 @@ public RaidBlockSender(ExtendedBlock block, long blockLength, long startOffset,
// seek to the right offsets
if (offset > 0) {
long checksumSkip = (offset / bytesPerChecksum) * checksumSize;
long checksumSkip = (offset / chunkSize) * checksumSize;
// note blockInStream is seeked when created below
if (checksumSkip > 0) {
// Should we use seek() for checksum file as well?
@ -229,55 +236,83 @@ private static IOException ioeToSocketException(IOException ioe) {
}
/**
* Sends upto maxChunks chunks of data.
*
* When blockInPosition is >= 0, assumes 'out' is a
* {@link SocketOutputStream} and tries
* {@link SocketOutputStream#transferToFully(FileChannel, long, int)} to
* send data (and updates blockInPosition).
* @param datalen Length of data
* @return number of chunks for data of given size
*/
private int sendChunks(ByteBuffer pkt, int maxChunks, OutputStream out)
throws IOException {
// Sends multiple chunks in one packet with a single write().
private int numberOfChunks(long datalen) {
return (int) ((datalen + chunkSize - 1)/chunkSize);
}
int len = (int) Math.min(endOffset - offset,
(((long) bytesPerChecksum) * ((long) maxChunks)));
int numChunks = (len + bytesPerChecksum - 1)/bytesPerChecksum;
int packetLen = len + numChunks*checksumSize + 4;
boolean lastDataPacket = offset + len == endOffset && len > 0;
/**
* Write packet header into {@code pkt}
*/
private void writePacketHeader(ByteBuffer pkt, int dataLen, int packetLen) {
pkt.clear();
PacketHeader header = new PacketHeader(
packetLen, offset, seqno, (len == 0), len);
PacketHeader header = new PacketHeader(packetLen, offset, seqno,
(dataLen == 0), dataLen, false);
header.putInBuffer(pkt);
}
int checksumOff = pkt.position();
int checksumLen = numChunks * checksumSize;
byte[] buf = pkt.array();
if (checksumSize > 0 && checksumIn != null) {
/**
* Read checksum into given buffer
* @param buf buffer to read the checksum into
* @param checksumOffset offset at which to write the checksum into buf
* @param checksumLen length of checksum to write
* @throws IOException on error
*/
private void readChecksum(byte[] buf, final int checksumOffset,
final int checksumLen) throws IOException {
if (checksumSize <= 0 && checksumIn == null) {
return;
}
try {
checksumIn.readFully(buf, checksumOff, checksumLen);
checksumIn.readFully(buf, checksumOffset, checksumLen);
} catch (IOException e) {
LOG.warn(" Could not read or failed to veirfy checksum for data" +
" at offset " + offset + " for block " + block + " got : "
+ StringUtils.stringifyException(e));
LOG.warn(" Could not read or failed to veirfy checksum for data"
+ " at offset " + offset + " for block " + block, e);
IOUtils.closeStream(checksumIn);
checksumIn = null;
if (corruptChecksumOk) {
if (checksumOff < checksumLen) {
if (checksumOffset < checksumLen) {
// Just fill the array with zeros.
Arrays.fill(buf, checksumOff, checksumLen, (byte) 0);
Arrays.fill(buf, checksumOffset, checksumLen, (byte) 0);
}
} else {
throw e;
}
}
}
/**
* Sends a packet with up to maxChunks chunks of data.
*
* @param pkt buffer used for writing packet data
* @param maxChunks maximum number of chunks to send
* @param out stream to send data to
* @param transferTo use transferTo to send data
* @param throttler used for throttling data transfer bandwidth
*/
private int sendPacket(ByteBuffer pkt, int maxChunks, OutputStream out,
boolean transferTo, DataTransferThrottler throttler) throws IOException {
int dataLen = (int) Math.min(endOffset - offset,
(chunkSize * (long) maxChunks));
int numChunks = numberOfChunks(dataLen); // Number of chunks be sent in the packet
int checksumDataLen = numChunks * checksumSize;
int packetLen = dataLen + checksumDataLen + 4;
boolean lastDataPacket = offset + dataLen == endOffset && dataLen > 0;
writePacketHeader(pkt, dataLen, packetLen);
int checksumOff = pkt.position();
byte[] buf = pkt.array();
if (checksumSize > 0 && checksumIn != null) {
readChecksum(buf, checksumOff, checksumDataLen);
// write in progress that we need to use to get last checksum
if (lastDataPacket && lastChunkChecksum != null) {
int start = checksumOff + checksumLen - checksumSize;
int start = checksumOff + checksumDataLen - checksumSize;
byte[] updatedChecksum = lastChunkChecksum.getChecksum();
if (updatedChecksum != null) {
@ -286,61 +321,85 @@ private int sendChunks(ByteBuffer pkt, int maxChunks, OutputStream out)
}
}
int dataOff = checksumOff + checksumLen;
if (blockInPosition < 0) {
//normal transfer
IOUtils.readFully(blockIn, buf, dataOff, len);
int dataOff = checksumOff + checksumDataLen;
if (!transferTo) { // normal transfer
IOUtils.readFully(blockIn, buf, dataOff, dataLen);
if (verifyChecksum) {
int dOff = dataOff;
int cOff = checksumOff;
int dLeft = len;
verifyChecksum(buf, dataOff, dataLen, numChunks, checksumOff);
}
}
for (int i=0; i<numChunks; i++) {
try {
if (transferTo) {
SocketOutputStream sockOut = (SocketOutputStream)out;
sockOut.write(buf, 0, dataOff); // First write checksum
// no need to flush. since we know out is not a buffered stream.
sockOut.transferToFully(((FileInputStream)blockIn).getChannel(),
blockInPosition, dataLen);
blockInPosition += dataLen;
} else {
// normal transfer
out.write(buf, 0, dataOff + dataLen);
}
} catch (IOException e) {
/* Exception while writing to the client. Connection closure from
* the other end is mostly the case and we do not care much about
* it. But other things can go wrong, especially in transferTo(),
* which we do not want to ignore.
*
* The message parsing below should not be considered as a good
* coding example. NEVER do it to drive a program logic. NEVER.
* It was done here because the NIO throws an IOException for EPIPE.
*/
String ioem = e.getMessage();
if (!ioem.startsWith("Broken pipe") && !ioem.startsWith("Connection reset")) {
LOG.error("BlockSender.sendChunks() exception: ", e);
}
throw ioeToSocketException(e);
}
if (throttler != null) { // rebalancing so throttle
throttler.throttle(packetLen);
}
return dataLen;
}
/**
* Compute checksum for chunks and verify the checksum that is read from
* the metadata file is correct.
*
* @param buf buffer that has checksum and data
* @param dataOffset position where data is written in the buf
* @param datalen length of data
* @param numChunks number of chunks corresponding to data
* @param checksumOffset offset where checksum is written in the buf
* @throws ChecksumException on failed checksum verification
*/
public void verifyChecksum(final byte[] buf, final int dataOffset,
final int datalen, final int numChunks, final int checksumOffset)
throws ChecksumException {
int dOff = dataOffset;
int cOff = checksumOffset;
int dLeft = datalen;
for (int i = 0; i < numChunks; i++) {
checksum.reset();
int dLen = Math.min(dLeft, bytesPerChecksum);
int dLen = Math.min(dLeft, chunkSize);
checksum.update(buf, dOff, dLen);
if (!checksum.compare(buf, cOff)) {
long failedPos = offset + len -dLeft;
throw new ChecksumException("Checksum failed at " +
failedPos, failedPos);
long failedPos = offset + datalen - dLeft;
throw new ChecksumException("Checksum failed at " + failedPos,
failedPos);
}
dLeft -= dLen;
dOff += dLen;
cOff += checksumSize;
}
}
//writing is done below (mainly to handle IOException)
}
try {
if (blockInPosition >= 0) {
//use transferTo(). Checks on out and blockIn are already done.
SocketOutputStream sockOut = (SocketOutputStream)out;
//first write the packet
sockOut.write(buf, 0, dataOff);
// no need to flush. since we know out is not a buffered stream.
sockOut.transferToFully(((FileInputStream)blockIn).getChannel(),
blockInPosition, len);
blockInPosition += len;
} else {
// normal transfer
out.write(buf, 0, dataOff + len);
}
} catch (IOException e) {
/* exception while writing to the client (well, with transferTo(),
* it could also be while reading from the local file).
*/
throw ioeToSocketException(e);
}
return len;
}
/**
* sendBlock() is used to read block and its metadata and stream the data to
@ -356,79 +415,61 @@ private int sendChunks(ByteBuffer pkt, int maxChunks, OutputStream out)
*/
public long sendBlock(DataOutputStream out, OutputStream baseStream)
throws IOException {
if( out == null ) {
if (out == null) {
throw new IOException( "out stream is null" );
}
long initialOffset = offset;
initialOffset = offset;
long totalRead = 0;
OutputStream streamForSendChunks = out;
final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
try {
try {
checksum.writeHeader(out);
if ( chunkOffsetOK ) {
out.writeLong( offset );
}
out.flush();
} catch (IOException e) { //socket error
throw ioeToSocketException(e);
}
int maxChunksPerPacket;
int pktSize = PacketHeader.PKT_HEADER_LEN;
if (transferToAllowed && !verifyChecksum &&
baseStream instanceof SocketOutputStream &&
blockIn instanceof FileInputStream) {
boolean transferTo = transferToAllowed && !verifyChecksum
&& baseStream instanceof SocketOutputStream
&& blockIn instanceof FileInputStream;
if (transferTo) {
FileChannel fileChannel = ((FileInputStream)blockIn).getChannel();
// blockInPosition also indicates sendChunks() uses transferTo.
blockInPosition = fileChannel.position();
streamForSendChunks = baseStream;
maxChunksPerPacket = numberOfChunks(TRANSFERTO_BUFFER_SIZE);
// assure a mininum buffer size.
maxChunksPerPacket = (Math.max(HdfsConstants.IO_FILE_BUFFER_SIZE,
MIN_BUFFER_WITH_TRANSFERTO)
+ bytesPerChecksum - 1)/bytesPerChecksum;
// allocate smaller buffer while using transferTo().
// Smaller packet size to only hold checksum when doing transferTo
pktSize += checksumSize * maxChunksPerPacket;
} else {
maxChunksPerPacket = Math.max(1,
(HdfsConstants.IO_FILE_BUFFER_SIZE + bytesPerChecksum - 1)/bytesPerChecksum);
pktSize += (bytesPerChecksum + checksumSize) * maxChunksPerPacket;
numberOfChunks(HdfsConstants.IO_FILE_BUFFER_SIZE));
// Packet size includes both checksum and data
pktSize += (chunkSize + checksumSize) * maxChunksPerPacket;
}
ByteBuffer pktBuf = ByteBuffer.allocate(pktSize);
while (endOffset > offset) {
long len = sendChunks(pktBuf, maxChunksPerPacket,
streamForSendChunks);
long len = sendPacket(pktBuf, maxChunksPerPacket, streamForSendChunks,
transferTo, null);
offset += len;
totalRead += len + ((len + bytesPerChecksum - 1)/bytesPerChecksum*
checksumSize);
totalRead += len + (numberOfChunks(len) * checksumSize);
seqno++;
}
try {
// send an empty packet to mark the end of the block
sendChunks(pktBuf, maxChunksPerPacket, streamForSendChunks);
sendPacket(pktBuf, maxChunksPerPacket, streamForSendChunks, transferTo,
null);
out.flush();
} catch (IOException e) { //socket error
throw ioeToSocketException(e);
}
blockReadFully = true;
} finally {
if (clientTraceFmt != null) {
final long endTime = System.nanoTime();
ClientTraceLog.info(String.format(clientTraceFmt, totalRead, initialOffset, endTime - startTime));
ClientTraceLog.info(String.format(clientTraceFmt, totalRead,
initialOffset, endTime - startTime));
}
close();
}
blockReadFully = initialOffset == 0 && offset >= replicaVisibleLength;
return totalRead;
}
@ -440,6 +481,13 @@ public static interface InputStreamFactory {
public InputStream createStream(long offset) throws IOException;
}
/**
* @return the checksum type that will be used with this block transfer.
*/
public DataChecksum getChecksum() {
return checksum;
}
private static class BlockInputStreamFactory implements InputStreamFactory {
private final ExtendedBlock block;
private final FsDatasetSpi<?> data;

View File

@ -50,7 +50,7 @@ public static LocatedBlocks getBlockLocations(final FSNamesystem namesystem,
final boolean doAccessTime, final boolean needBlockToken
) throws FileNotFoundException, UnresolvedLinkException, IOException {
return namesystem.getBlockLocations(src, offset, length,
doAccessTime, needBlockToken);
doAccessTime, needBlockToken, true);
}
}

View File

@ -18,6 +18,9 @@
package org.apache.hadoop.raid;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY;
import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@ -44,14 +47,17 @@
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.datatransfer.*;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.RaidBlockSender;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -61,6 +67,7 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.RaidDFSUtil;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.net.NetUtils;
@ -649,7 +656,7 @@ static DataInputStream computeMetadata(Configuration conf,
mdOut.writeShort(BlockMetadataHeader.VERSION);
// Create a summer and write out its header.
int bytesPerChecksum = conf.getInt("io.bytes.per.checksum", 512);
int bytesPerChecksum = conf.getInt("dfs.bytes-per-checksum", 512);
DataChecksum sum =
DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32,
bytesPerChecksum);
@ -780,9 +787,11 @@ private void sendFixedBlock(DatanodeInfo datanode,
});
DatanodeInfo[] nodes = new DatanodeInfo[]{datanode};
DataChecksum checksum = blockSender.getChecksum();
new Sender(out).writeBlock(block.getBlock(), block.getBlockToken(), "",
nodes, null, BlockConstructionStage.PIPELINE_SETUP_CREATE,
1, 0L, blockSize, 0L, DataChecksum.newDataChecksum(metadataIn));
1, 0L, blockSize, 0L, DataChecksum.newDataChecksum(
checksum.getChecksumType(), checksum.getBytesPerChecksum()));
blockSender.sendBlock(out, baseStream);
LOG.info("Sent block " + block.getBlock() + " to " + datanode.getName());

View File

@ -112,6 +112,8 @@ public DirectoryTraversal(
public List<FileStatus> getFilteredFiles(FileFilter filter, int limit) {
List<FileStatus> filtered = new ArrayList<FileStatus>();
if (limit == 0)
return filtered;
// We need this semaphore to block when the number of running workitems
// is equal to the number of threads. FixedThreadPool limits the number
@ -120,20 +122,26 @@ public List<FileStatus> getFilteredFiles(FileFilter filter, int limit) {
Semaphore slots = new Semaphore(numThreads);
while (true) {
synchronized(filtered) {
if (filtered.size() >= limit) break;
}
FilterFileWorkItem work = null;
try {
slots.acquire();
synchronized(filtered) {
if (filtered.size() >= limit) {
slots.release();
break;
}
}
Node next = getNextDirectoryNode();
if (next == null) {
slots.release();
break;
}
work = new FilterFileWorkItem(filter, next, filtered, slots);
slots.acquire();
} catch (InterruptedException ie) {
slots.release();
break;
} catch (IOException e) {
slots.release();
break;
}
executor.execute(work);

View File

@ -277,6 +277,7 @@ public boolean startDistRaid() throws IOException {
*/
public boolean checkComplete() throws IOException {
JobID jobID = runningJob.getJobID();
LOG.info("Checking job " + jobID);
try {
if (runningJob.isComplete()) {
// delete job directory

View File

@ -208,7 +208,7 @@ public void solveVandermondeSystem(int[] x, int[] y) {
* @param len consider x and y only from 0...len-1
*/
public void solveVandermondeSystem(int[] x, int[] y, int len) {
assert(x.length <= len && y.length <= len);
assert(y.length <= len);
for (int i = 0; i < len - 1; i++) {
for (int j = len - 1; j > i; j--) {
y[j] = y[j] ^ mulTable[x[i]][y[j - 1]];
@ -302,4 +302,49 @@ public int substitute(int[] p, int x) {
}
return result;
}
/**
* Perform Gaussian elimination on the given matrix. This matrix has to be a
* fat matrix (number of rows > number of columns).
*/
public void gaussianElimination(int[][] matrix) {
assert(matrix != null && matrix.length > 0 && matrix[0].length > 0
&& matrix.length < matrix[0].length);
int height = matrix.length;
int width = matrix[0].length;
for (int i = 0; i < height; i++) {
boolean pivotFound = false;
// scan the column for a nonzero pivot and swap it to the diagonal
for (int j = i; j < height; j++) {
if (matrix[i][j] != 0) {
int[] tmp = matrix[i];
matrix[i] = matrix[j];
matrix[j] = tmp;
pivotFound = true;
break;
}
}
if (!pivotFound) {
continue;
}
int pivot = matrix[i][i];
for (int j = i; j < width; j++) {
matrix[i][j] = divide(matrix[i][j], pivot);
}
for (int j = i + 1; j < height; j++) {
int lead = matrix[j][i];
for (int k = i; k < width; k++) {
matrix[j][k] = add(matrix[j][k], multiply(lead, matrix[i][k]));
}
}
}
for (int i = height - 1; i >=0; i--) {
for (int j = 0; j < i; j++) {
int lead = matrix[j][i];
for (int k = i; k < width; k++) {
matrix[j][k] = add(matrix[j][k], multiply(lead, matrix[i][k]));
}
}
}
}
}

View File

@ -44,12 +44,13 @@ class JobMonitor implements Runnable {
volatile boolean running = true;
private Map<String, List<DistRaid>> jobs;
public static final String JOBMONITOR_INTERVAL_KEY = "raid.jobmonitor.interval";
private long jobMonitorInterval;
private volatile long jobsMonitored = 0;
private volatile long jobsSucceeded = 0;
public JobMonitor(Configuration conf) {
jobMonitorInterval = conf.getLong("raid.jobmonitor.interval", 60000);
jobMonitorInterval = conf.getLong(JOBMONITOR_INTERVAL_KEY, 60000);
jobs = new java.util.HashMap<String, List<DistRaid>>();
}
@ -112,6 +113,7 @@ public void doMonitor() {
} catch (IOException ioe) {
// If there was an error, consider the job finished.
addJob(finishedJobs, key, job);
LOG.error("JobMonitor exception", ioe);
}
}
}
@ -160,6 +162,17 @@ public long jobsSucceeded() {
return this.jobsSucceeded;
}
// For test code
int runningJobsCount() {
int total = 0;
synchronized(jobs) {
for (String key: jobs.keySet()) {
total += jobs.get(key).size();
}
}
return total;
}
private static void addJob(Map<String, List<DistRaid>> jobsMap,
String jobName, DistRaid job) {
synchronized(jobsMap) {

View File

@ -80,6 +80,8 @@ public abstract class RaidNode implements RaidProtocol {
}
public static final Log LOG = LogFactory.getLog( "org.apache.hadoop.raid.RaidNode");
public static final long SLEEP_TIME = 10000L; // 10 seconds
public static final String TRIGGER_MONITOR_SLEEP_TIME_KEY =
"hdfs.raid.trigger.monitor.sleep.time";
public static final int DEFAULT_PORT = 60000;
// Default stripe length = 5, parity length for RS code = 3
public static final int DEFAULT_STRIPE_LENGTH = 5;
@ -126,6 +128,7 @@ public abstract class RaidNode implements RaidProtocol {
/** Deamon thread to trigger policies */
Daemon triggerThread = null;
public static long triggerMonitorSleepTime = SLEEP_TIME;
/** Deamon thread to delete obsolete parity files */
PurgeMonitor purgeMonitor = null;
@ -299,6 +302,10 @@ private void initialize(Configuration conf)
this.blockFixer = BlockFixer.createBlockFixer(conf);
this.blockFixerThread = new Daemon(this.blockFixer);
this.blockFixerThread.start();
// start the deamon thread to fire polcies appropriately
RaidNode.triggerMonitorSleepTime = conf.getLong(
TRIGGER_MONITOR_SLEEP_TIME_KEY,
SLEEP_TIME);
// start the deamon thread to fire polcies appropriately
this.triggerThread = new Daemon(new TriggerMonitor());
@ -503,7 +510,7 @@ private void doProcess() throws IOException, InterruptedException {
}
}
while (running) {
Thread.sleep(SLEEP_TIME);
Thread.sleep(RaidNode.triggerMonitorSleepTime);
boolean reloaded = configMgr.reloadConfigsIfNecessary();
if (reloaded) {
@ -542,7 +549,7 @@ private void doProcess() throws IOException, InterruptedException {
// Apply the action on accepted paths
LOG.info("Triggering Policy Action " + info.getName() +
" " + info.getSrcPath());
" " + info.getSrcPath() + " raid " + filteredPaths.size() + " files");
try {
raidFiles(info, filteredPaths);
} catch (Exception e) {

View File

@ -43,6 +43,7 @@
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@ -296,9 +297,22 @@ public Path[] recover(String cmd, String argv[], int startindex)
for (int i = startindex; i < argv.length; i = i + 2) {
String path = argv[i];
long corruptOffset = Long.parseLong(argv[i+1]);
LOG.debug("RaidShell recoverFile for " + path + " corruptOffset " + corruptOffset);
paths[j] = new Path(raidnode.recoverFile(path, corruptOffset));
LOG.debug("Raidshell created recovery file " + paths[j]);
LOG.info("RaidShell recoverFile for " + path + " corruptOffset " + corruptOffset);
Path recovered = new Path("/tmp/recovered." + System.currentTimeMillis());
FileSystem fs = recovered.getFileSystem(conf);
DistributedFileSystem dfs = (DistributedFileSystem)fs;
Configuration raidConf = new Configuration(conf);
raidConf.set("fs.hdfs.impl",
"org.apache.hadoop.hdfs.DistributedRaidFileSystem");
raidConf.set("fs.raid.underlyingfs.impl",
"org.apache.hadoop.hdfs.DistributedFileSystem");
raidConf.setBoolean("fs.hdfs.impl.disable.cache", true);
java.net.URI dfsUri = dfs.getUri();
FileSystem raidFs = FileSystem.get(dfsUri, raidConf);
FileUtil.copy(raidFs, new Path(path), fs, recovered, false, conf);
paths[j] = recovered;
LOG.info("Raidshell created recovery file " + paths[j]);
j++;
}
return paths;

View File

@ -17,6 +17,8 @@
*/
package org.apache.hadoop.raid;
import java.util.Set;
public class ReedSolomonCode implements ErasureCode {
@ -103,4 +105,79 @@ public int paritySize() {
public int symbolSize() {
return (int) Math.round(Math.log(GF.getFieldSize()) / Math.log(2));
}
/**
* Given parity symbols followed by message symbols, return the locations of
* symbols that are corrupted. Can resolve up to (parity length / 2) error
* locations.
* @param data The message and parity. The parity should be placed in the
* first part of the array. In each integer, the relevant portion
* is present in the least significant bits of each int.
* The number of elements in data is stripeSize() + paritySize().
* <b>Note that data may be changed after calling this method.</b>
* @param errorLocations The set to put the error location results
* @return true If the locations can be resolved, return true.
*/
public boolean computeErrorLocations(int[] data,
Set<Integer> errorLocations) {
assert(data.length == paritySize + stripeSize && errorLocations != null);
errorLocations.clear();
int maxError = paritySize / 2;
int[][] syndromeMatrix = new int[maxError][];
for (int i = 0; i < syndromeMatrix.length; ++i) {
syndromeMatrix[i] = new int[maxError + 1];
}
int[] syndrome = new int[paritySize];
if (computeSyndrome(data, syndrome)) {
// Parity check OK. No error location added.
return true;
}
for (int i = 0; i < maxError; ++i) {
for (int j = 0; j < maxError + 1; ++j) {
syndromeMatrix[i][j] = syndrome[i + j];
}
}
GF.gaussianElimination(syndromeMatrix);
int[] polynomial = new int[maxError + 1];
polynomial[0] = 1;
for (int i = 0; i < maxError; ++i) {
polynomial[i + 1] = syndromeMatrix[maxError - 1 - i][maxError];
}
for (int i = 0; i < paritySize + stripeSize; ++i) {
int possibleRoot = GF.divide(1, primitivePower[i]);
if (GF.substitute(polynomial, possibleRoot) == 0) {
errorLocations.add(i);
}
}
// Now recover with error locations and check the syndrome again
int[] locations = new int[errorLocations.size()];
int k = 0;
for (int loc : errorLocations) {
locations[k++] = loc;
}
int [] erasedValue = new int[locations.length];
decode(data, locations, erasedValue);
for (int i = 0; i < locations.length; ++i) {
data[locations[i]] = erasedValue[i];
}
return computeSyndrome(data, syndrome);
}
/**
* Compute the syndrome of the input [parity, message]
* @param data [parity, message]
* @param syndrome The syndromes (checksums) of the data
* @return true If syndromes are all zeros
*/
private boolean computeSyndrome(int[] data, int [] syndrome) {
boolean corruptionFound = false;
for (int i = 0; i < paritySize; i++) {
syndrome[i] = GF.substitute(data, primitivePower[i]);
if (syndrome[i] != 0) {
corruptionFound = true;
}
}
return !corruptionFound;
}
}

View File

@ -47,8 +47,8 @@
public class TestRaidDfs extends TestCase {
final static String TEST_DIR = new File(System.getProperty("test.build.data",
"build/contrib/raid/test/data")).getAbsolutePath();
final static String LOG_DIR = "/raidlog";
"target/test-data")).getAbsolutePath();
final static String LOG_DIR = "target/raidlog";
final static long RELOAD_INTERVAL = 1000;
final static Log LOG = LogFactory.getLog("org.apache.hadoop.raid.TestRaidDfs");
final static int NUM_DATANODES = 3;
@ -414,6 +414,7 @@ public static boolean validateFile(FileSystem fileSys, Path name, long length,
LOG.info(" Newcrc " + newcrc.getValue() + " old crc " + crc);
if (newcrc.getValue() != crc) {
LOG.info("CRC mismatch of file " + name + ": " + newcrc + " vs. " + crc);
return false;
}
return true;
}

View File

@ -26,12 +26,14 @@
import java.util.Random;
import java.util.zip.CRC32;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.junit.Test;
import static org.junit.Assert.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.util.JarFinder;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@ -39,6 +41,8 @@
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
@ -53,9 +57,11 @@ public class TestBlockFixer {
final static Log LOG = LogFactory.getLog(
"org.apache.hadoop.raid.TestBlockFixer");
final static String TEST_DIR = new File(System.getProperty("test.build.data",
"build/contrib/raid/test/data")).getAbsolutePath();
"target/test-data")).getAbsolutePath();
final static String CONFIG_FILE = new File(TEST_DIR,
"test-raid.xml").getAbsolutePath();
public static final String DistBlockFixer_JAR =
JarFinder.getJar(DistBlockFixer.class);
final static long RELOAD_INTERVAL = 1000;
final static int NUM_DATANODES = 3;
Configuration conf;
@ -546,6 +552,8 @@ protected void mySetup(int stripeLength, int timeBeforeHar) throws Exception {
conf.setBoolean("dfs.permissions", false);
conf.set("mapreduce.framework.name", "yarn");
dfs = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build();
dfs.waitActive();
fileSys = dfs.getFileSystem();
@ -553,11 +561,28 @@ protected void mySetup(int stripeLength, int timeBeforeHar) throws Exception {
FileSystem.setDefaultUri(conf, namenode);
mr = new MiniMRCluster(4, namenode, 3);
jobTrackerName = "localhost:" + mr.getJobTrackerPort();
JobConf jobConf = mr.createJobConf();
jobTrackerName = "localhost:" + jobConf.get(JTConfig.JT_IPC_ADDRESS);
hftp = "hftp://localhost.localdomain:" + dfs.getNameNodePort();
FileSystem.setDefaultUri(conf, namenode);
conf.set("mapred.job.tracker", jobTrackerName);
conf.set("mapreduce.framework.name", "yarn");
String rmAdress = jobConf.get("yarn.resourcemanager.address");
if (rmAdress != null) {
conf.set("yarn.resourcemanager.address", rmAdress);
}
String schedulerAdress =
jobConf.get("yarn.resourcemanager.scheduler.address");
if (schedulerAdress != null) {
conf.set("yarn.resourcemanager.scheduler.address", schedulerAdress);
}
String jobHistoryAddress =
jobConf.get("mapreduce.jobhistory.address");
if (jobHistoryAddress != null) {
conf.set("mapreduce.jobhistory.address", jobHistoryAddress);
}
conf.set(JobContext.JAR, TestBlockFixer.DistBlockFixer_JAR);
FileWriter fileWriter = new FileWriter(CONFIG_FILE);
fileWriter.write("<?xml version=\"1.0\"?>\n");
@ -609,10 +634,11 @@ protected void myTearDown() throws Exception {
if (dfs != null) { dfs.shutdown(); }
}
private long getCRC(FileSystem fs, Path p) throws IOException {
public static long getCRC(FileSystem fs, Path p) throws IOException {
CRC32 crc = new CRC32();
FSDataInputStream stm = fs.open(p);
for (int b = 0; b > 0; b = stm.read()) {
int b;
while ((b = stm.read())>=0) {
crc.update(b);
}
stm.close();

View File

@ -40,7 +40,7 @@ public class TestDirectoryTraversal extends TestCase {
final static Log LOG = LogFactory.getLog(
"org.apache.hadoop.raid.TestDirectoryTraversal");
final static String TEST_DIR = new File(System.getProperty("test.build.data",
"build/contrib/raid/test/data")).getAbsolutePath();
"target/test-data")).getAbsolutePath();
MiniDFSCluster dfs = null;
FileSystem fs = null;
@ -211,7 +211,7 @@ private void createTestFile(Path file) throws IOException {
private void mySetup() throws IOException {
conf = new Configuration();
dfs = new MiniDFSCluster(conf, 6, true, null);
dfs = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
dfs.waitActive();
fs = dfs.getFileSystem();
}

View File

@ -169,6 +169,57 @@ public void testXorPerformance() {
assertTrue("Decode failed", java.util.Arrays.equals(copy, message[0]));
}
public void testComputeErrorLocations() {
for (int i = 0; i < TEST_TIMES; ++i) {
verifyErrorLocations(10, 4, 1);
verifyErrorLocations(10, 4, 2);
}
}
public void verifyErrorLocations(int stripeSize, int paritySize, int errors) {
int[] message = new int[stripeSize];
int[] parity = new int[paritySize];
Set<Integer> errorLocations = new HashSet<Integer>();
for (int i = 0; i < message.length; ++i) {
message[i] = RAND.nextInt(256);
}
while (errorLocations.size() < errors) {
int loc = RAND.nextInt(stripeSize + paritySize);
errorLocations.add(loc);
}
ReedSolomonCode codec = new ReedSolomonCode(stripeSize, paritySize);
codec.encode(message, parity);
int[] data = combineArrays(parity, message);
for (Integer i : errorLocations) {
data[i] = randError(data[i]);
}
Set<Integer> recoveredLocations = new HashSet<Integer>();
boolean resolved = codec.computeErrorLocations(data, recoveredLocations);
if (resolved) {
assertEquals(errorLocations, recoveredLocations);
}
}
private int randError(int actual) {
while (true) {
int r = RAND.nextInt(256);
if (r != actual) {
return r;
}
}
}
private int[] combineArrays(int[] array1, int[] array2) {
int[] result = new int[array1.length + array2.length];
for (int i = 0; i < array1.length; ++i) {
result[i] = array1[i];
}
for (int i = 0; i < array2.length; ++i) {
result[i + array1.length] = array2[i];
}
return result;
}
private int[] randomErasedLocation(int erasedLen, int dataLen) {
int[] erasedLocations = new int[erasedLen];
for (int i = 0; i < erasedLen; i++) {

View File

@ -36,7 +36,7 @@
public class TestRaidFilter extends TestCase {
final static String TEST_DIR = new File(System.getProperty("test.build.data",
"build/contrib/raid/test/data")).getAbsolutePath();
"target/test-data")).getAbsolutePath();
final static Log LOG =
LogFactory.getLog("org.apache.hadoop.raid.TestRaidFilter");

View File

@ -22,6 +22,7 @@
import java.io.FileNotFoundException;
import java.util.Random;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -34,6 +35,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MiniMRCluster;
/**
@ -41,7 +43,7 @@
*/
public class TestRaidHar extends TestCase {
final static String TEST_DIR = new File(System.getProperty("test.build.data",
"build/contrib/raid/test/data")).getAbsolutePath();
"target/test-data")).getAbsolutePath();
final static String CONFIG_FILE = new File(TEST_DIR,
"test-raid.xml").getAbsolutePath();
final static long RELOAD_INTERVAL = 1000;
@ -96,11 +98,27 @@ private void createClusters(boolean local) throws Exception {
fileSys = dfs.getFileSystem();
namenode = fileSys.getUri().toString();
mr = new MiniMRCluster(taskTrackers, namenode, 3);
jobTrackerName = "localhost:" + mr.getJobTrackerPort();
JobConf jobConf = mr.createJobConf();
jobTrackerName = "localhost:" + jobConf.get(JTConfig.JT_IPC_ADDRESS);
hftp = "hftp://localhost.localdomain:" + dfs.getNameNodePort();
FileSystem.setDefaultUri(conf, namenode);
conf.set("mapred.job.tracker", jobTrackerName);
conf.set("mapreduce.framework.name", "yarn");
String rmAdress = jobConf.get("yarn.resourcemanager.address");
if (rmAdress != null) {
conf.set("yarn.resourcemanager.address", rmAdress);
}
String schedulerAdress =
jobConf.get("yarn.resourcemanager.scheduler.address");
if (schedulerAdress != null) {
conf.set("yarn.resourcemanager.scheduler.address", schedulerAdress);
}
String jobHistoryAddress =
jobConf.get("mapreduce.jobhistory.address");
if (jobHistoryAddress != null) {
conf.set("mapreduce.jobhistory.address", jobHistoryAddress);
}
}
/**

View File

@ -37,9 +37,13 @@
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.raid.protocol.PolicyInfo;
import org.apache.hadoop.raid.protocol.PolicyList;
import org.apache.hadoop.util.JarFinder;
import org.apache.hadoop.raid.protocol.PolicyInfo.ErasureCodeType;
/**
@ -49,7 +53,8 @@
*/
public class TestRaidNode extends TestCase {
final static String TEST_DIR = new File(System.getProperty("test.build.data",
"build/contrib/raid/test/data")).getAbsolutePath();
"target/test-data")).getAbsolutePath();
public static final String DistRaid_JAR = JarFinder.getJar(DistRaid.class);
final static String CONFIG_FILE = new File(TEST_DIR,
"test-raid.xml").getAbsolutePath();
final static long RELOAD_INTERVAL = 1000;
@ -76,6 +81,8 @@ private void createClusters(boolean local) throws Exception {
conf.setBoolean("raid.config.reload", true);
conf.setLong("raid.config.reload.interval", RELOAD_INTERVAL);
conf.setBoolean("dfs.permissions.enabled", true);
conf.setLong(JobMonitor.JOBMONITOR_INTERVAL_KEY, 20000);
conf.setLong(RaidNode.TRIGGER_MONITOR_SLEEP_TIME_KEY, 3000L);
// scan all policies once every 5 second
conf.setLong("raid.policy.rescan.interval", 5000);
@ -103,11 +110,27 @@ private void createClusters(boolean local) throws Exception {
namenode = fileSys.getUri().toString();
final int taskTrackers = 4;
mr = new MiniMRCluster(taskTrackers, namenode, 3);
jobTrackerName = "localhost:" + mr.getJobTrackerPort();
JobConf jobConf = mr.createJobConf();
jobTrackerName = "localhost:" + jobConf.get(JTConfig.JT_IPC_ADDRESS);
hftp = "hftp://localhost.localdomain:" + dfs.getNameNodePort();
FileSystem.setDefaultUri(conf, namenode);
conf.set("mapred.job.tracker", jobTrackerName);
conf.set("mapreduce.framework.name", "yarn");
String rmAdress = jobConf.get("yarn.resourcemanager.address");
if (rmAdress != null) {
conf.set("yarn.resourcemanager.address", rmAdress);
}
String schedulerAdress =
jobConf.get("yarn.resourcemanager.scheduler.address");
if (schedulerAdress != null) {
conf.set("yarn.resourcemanager.scheduler.address", schedulerAdress);
}
String jobHistoryAddress =
jobConf.get("mapreduce.jobhistory.address");
if (jobHistoryAddress != null) {
conf.set("mapreduce.jobhistory.address", jobHistoryAddress);
}
}
class ConfigBuilder {
@ -238,9 +261,9 @@ public void testPathFilter() throws Exception {
LOG.info("Test testPathFilter started.");
long blockSizes [] = {1024L};
long stripeLengths [] = {1, 2, 5, 6, 10, 11, 12};
long targetReplication = 1;
long metaReplication = 1;
int stripeLengths [] = {5, 6, 10, 11, 12};
int targetReplication = 1;
int metaReplication = 1;
int numBlock = 11;
int iter = 0;
@ -284,7 +307,8 @@ private void doTestPathFilter(int iter, long targetReplication,
LOG.info("doTestPathFilter created test files for iteration " + iter);
// create an instance of the RaidNode
cnode = RaidNode.createRaidNode(null, conf);
Configuration localConf = new Configuration(conf);
cnode = RaidNode.createRaidNode(null, localConf);
FileStatus[] listPaths = null;
// wait till file is raided
@ -314,7 +338,6 @@ private void doTestPathFilter(int iter, long targetReplication,
}
// assertEquals(listPaths.length, 1); // all files raided
LOG.info("doTestPathFilter all files found in Raid.");
Thread.sleep(20000); // Without this wait, unit test crashes
// check for error at beginning of file
shell = new RaidShell(conf);
@ -466,16 +489,23 @@ private void doCheckPolicy() throws Exception {
LOG.info("doCheckPolicy completed:");
}
private void createTestFiles(String path, String destpath) throws IOException {
static public void createTestFiles(FileSystem fileSys,
String path, String destpath, int nfile,
int nblock) throws IOException {
createTestFiles(fileSys, path, destpath, nfile, nblock, (short)1);
}
static void createTestFiles(FileSystem fileSys, String path, String destpath, int nfile,
int nblock, short repl) throws IOException {
long blockSize = 1024L;
Path dir = new Path(path);
Path destPath = new Path(destpath);
fileSys.delete(dir, true);
fileSys.delete(destPath, true);
for(int i = 0 ; i < 10; i++){
for(int i = 0 ; i < nfile; i++){
Path file = new Path(path + "file" + i);
createOldFile(fileSys, file, 1, 7, blockSize);
createOldFile(fileSys, file, repl, nblock, blockSize);
}
}
@ -499,12 +529,15 @@ public void testDistRaid() throws Exception {
RaidNode cnode = null;
try {
createTestFiles("/user/dhruba/raidtest/", "/destraid/user/dhruba/raidtest");
createTestFiles("/user/dhruba/raidtest2/", "/destraid/user/dhruba/raidtest2");
createTestFiles(fileSys, "/user/dhruba/raidtest/",
"/destraid/user/dhruba/raidtest", 5, 7);
createTestFiles(fileSys, "/user/dhruba/raidtest2/",
"/destraid/user/dhruba/raidtest2", 5, 7);
LOG.info("Test testDistRaid created test files");
Configuration localConf = new Configuration(conf);
localConf.set(RaidNode.RAID_LOCATION_KEY, "/destraid");
localConf.set(JobContext.JAR, TestRaidNode.DistRaid_JAR);
cnode = RaidNode.createRaidNode(null, localConf);
// Verify the policies are parsed correctly
for (PolicyList policyList : cnode.getAllPolicies()) {
@ -540,15 +573,13 @@ public void testDistRaid() throws Exception {
System.currentTimeMillis() - start < MAX_WAITTIME) {
Thread.sleep(1000);
}
assertEquals(dcnode.jobMonitor.jobsMonitored(), 2);
start = System.currentTimeMillis();
while (dcnode.jobMonitor.jobsSucceeded() < 2 &&
System.currentTimeMillis() - start < MAX_WAITTIME) {
Thread.sleep(1000);
}
assertEquals(dcnode.jobMonitor.jobsSucceeded(), 2);
assertEquals(dcnode.jobMonitor.jobsSucceeded(), dcnode.jobMonitor.jobsMonitored());
LOG.info("Test testDistRaid successful.");
} catch (Exception e) {
@ -647,24 +678,19 @@ public void testSuspendTraversal() throws Exception {
RaidNode cnode = null;
try {
createTestFiles(
"/user/dhruba/raidtest/1/", "/destraid/user/dhruba/raidtest/1");
createTestFiles(
"/user/dhruba/raidtest/2/", "/destraid/user/dhruba/raidtest/2");
createTestFiles(
"/user/dhruba/raidtest/3/", "/destraid/user/dhruba/raidtest/3");
createTestFiles(
"/user/dhruba/raidtest/4/", "/destraid/user/dhruba/raidtest/4");
for(int i = 0; i < 4; i++){
Path file = new Path("/user/dhruba/raidtest/dir" + i + "/file" + i);
createOldFile(fileSys, file, 1, 7, 1024L);
}
LOG.info("Test testSuspendTraversal created test files");
Configuration localConf = new Configuration(conf);
localConf.set(RaidNode.RAID_LOCATION_KEY, "/destraid");
localConf.setInt("raid.distraid.max.files", 3);
localConf.setInt("raid.distraid.max.jobs", 2);
localConf.setInt("raid.distraid.max.files", 2);
localConf.setInt("raid.directorytraversal.threads", 1);
// This is too dependent on the implementation of getFilteredFiles().
// It relies on the threading behavior where two directories are traversed
// before returning because the list of files is modified in a separate
// thread from the one that decides if there are enough files.
localConf.set(JobContext.JAR, TestRaidNode.DistRaid_JAR);
// 4 test files: 2 jobs with 2 files each.
final int numJobsExpected = 2;
cnode = RaidNode.createRaidNode(null, localConf);
@ -677,10 +703,20 @@ public void testSuspendTraversal() throws Exception {
start = System.currentTimeMillis();
while (dcnode.jobMonitor.jobsSucceeded() < numJobsExpected &&
System.currentTimeMillis() - start < MAX_WAITTIME) {
LOG.info("Waiting for num jobs succeeded " + dcnode.jobMonitor.jobsSucceeded() +
" to reach " + numJobsExpected);
Thread.sleep(3000);
}
// Wait for any running jobs to finish.
start = System.currentTimeMillis();
while (dcnode.jobMonitor.runningJobsCount() > 0 &&
System.currentTimeMillis() - start < MAX_WAITTIME) {
LOG.info("Waiting for zero running jobs: " +
dcnode.jobMonitor.runningJobsCount());
Thread.sleep(1000);
}
assertEquals(dcnode.jobMonitor.jobsMonitored(), numJobsExpected);
assertEquals(dcnode.jobMonitor.jobsSucceeded(), numJobsExpected);
assertEquals(numJobsExpected, dcnode.jobMonitor.jobsMonitored());
assertEquals(numJobsExpected, dcnode.jobMonitor.jobsSucceeded());
LOG.info("Test testSuspendTraversal successful.");

View File

@ -51,6 +51,7 @@
import org.apache.hadoop.raid.protocol.PolicyList;
import org.apache.hadoop.hdfs.TestRaidDfs;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.raid.protocol.PolicyInfo;
/**
@ -58,7 +59,7 @@
*/
public class TestRaidPurge extends TestCase {
final static String TEST_DIR = new File(System.getProperty("test.build.data",
"build/contrib/raid/test/data")).getAbsolutePath();
"target/test-data")).getAbsolutePath();
final static String CONFIG_FILE = new File(TEST_DIR,
"test-raid.xml").getAbsolutePath();
final static long RELOAD_INTERVAL = 1000;
@ -113,11 +114,27 @@ private void createClusters(boolean local) throws Exception {
fileSys = dfs.getFileSystem();
namenode = fileSys.getUri().toString();
mr = new MiniMRCluster(taskTrackers, namenode, 3);
jobTrackerName = "localhost:" + mr.getJobTrackerPort();
JobConf jobConf = mr.createJobConf();
jobTrackerName = "localhost:" + jobConf.get(JTConfig.JT_IPC_ADDRESS);
hftp = "hftp://localhost.localdomain:" + dfs.getNameNodePort();
FileSystem.setDefaultUri(conf, namenode);
conf.set("mapred.job.tracker", jobTrackerName);
conf.set("mapreduce.framework.name", "yarn");
String rmAdress = jobConf.get("yarn.resourcemanager.address");
if (rmAdress != null) {
conf.set("yarn.resourcemanager.address", rmAdress);
}
String schedulerAdress =
jobConf.get("yarn.resourcemanager.scheduler.address");
if (schedulerAdress != null) {
conf.set("yarn.resourcemanager.scheduler.address", schedulerAdress);
}
String jobHistoryAddress =
jobConf.get("mapreduce.jobhistory.address");
if (jobHistoryAddress != null) {
conf.set("mapreduce.jobhistory.address", jobHistoryAddress);
}
}
/**
@ -235,6 +252,7 @@ private void doTestPurge(int iter, long targetReplication,
// create an instance of the RaidNode
Configuration localConf = new Configuration(conf);
localConf.set(RaidNode.RAID_LOCATION_KEY, "/destraid");
cnode = RaidNode.createRaidNode(null, localConf);
FileStatus[] listPaths = null;
@ -299,7 +317,7 @@ public void testPurgeHar() throws Exception {
createClusters(true);
mySetup(1, 1, 5, harDelay);
Path dir = new Path("/user/dhruba/raidtest/");
Path destPath = new Path("/destraid/user/dhruba/raidtest");
Path destPath = new Path("/raid/user/dhruba/raidtest");
Path file1 = new Path(dir + "/file");
RaidNode cnode = null;
try {
@ -308,7 +326,6 @@ public void testPurgeHar() throws Exception {
// create an instance of the RaidNode
Configuration localConf = new Configuration(conf);
localConf.set(RaidNode.RAID_LOCATION_KEY, "/destraid");
cnode = RaidNode.createRaidNode(null, localConf);
// Wait till har is created.
@ -334,14 +351,7 @@ public void testPurgeHar() throws Exception {
boolean found = false;
FileStatus[] listPaths = null;
while (!found || listPaths == null || listPaths.length > 1) {
try {
listPaths = fileSys.listStatus(destPath);
} catch (FileNotFoundException e) {
// If the parent directory is deleted because the har is deleted
// and the parent is empty, try again.
Thread.sleep(1000);
continue;
}
if (listPaths != null) {
for (FileStatus s: listPaths) {
LOG.info("testPurgeHar waiting for parity file to be recreated" +

View File

@ -47,7 +47,7 @@ public class TestRaidShell extends TestCase {
final static Log LOG = LogFactory.getLog(
"org.apache.hadoop.raid.TestRaidShell");
final static String TEST_DIR = new File(System.getProperty("test.build.data",
"build/contrib/raid/test/data")).getAbsolutePath();
"target/test-data")).getAbsolutePath();
final static String CONFIG_FILE = new File(TEST_DIR,
"test-raid.xml").getAbsolutePath();
final static long RELOAD_INTERVAL = 1000;
@ -249,7 +249,8 @@ private void myTearDown() throws Exception {
private long getCRC(FileSystem fs, Path p) throws IOException {
CRC32 crc = new CRC32();
FSDataInputStream stm = fs.open(p);
for (int b = 0; b > 0; b = stm.read()) {
int b;
while ((b = stm.read())>=0) {
crc.update(b);
}
stm.close();

View File

@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http:www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -51,8 +51,8 @@ public class TestRaidShellFsck {
LogFactory.getLog("org.apache.hadoop.raid.TestRaidShellFsck");
final static String TEST_DIR =
new File(System.
getProperty("test.build.data", "build/contrib/raid/test/data")).
getAbsolutePath();
getProperty("test.build.data", "target/test-data")).getAbsolutePath();
final static String CONFIG_FILE = new File(TEST_DIR, "test-raid.xml").
getAbsolutePath();
final static long RELOAD_INTERVAL = 1000;

View File

@ -42,7 +42,7 @@ public class TestReedSolomonDecoder extends TestCase {
final static Log LOG = LogFactory.getLog(
"org.apache.hadoop.raid.TestReedSolomonDecoder");
final static String TEST_DIR = new File(System.getProperty("test.build.data",
"build/contrib/raid/test/data")).getAbsolutePath();
"target/test-data")).getAbsolutePath();
final static int NUM_DATANODES = 3;
Configuration conf;

View File

@ -49,7 +49,7 @@ public class TestReedSolomonEncoder extends TestCase {
final static Log LOG = LogFactory.getLog(
"org.apache.hadoop.raid.TestReedSolomonEncoder");
final static String TEST_DIR = new File(System.getProperty("test.build.data",
"build/contrib/raid/test/data")).getAbsolutePath();
"target/test-data")).getAbsolutePath();
final static int NUM_DATANODES = 3;
Configuration conf;

View File

@ -31,7 +31,7 @@
/** I-node for closed file. */
@InterfaceAudience.Private
class INodeFile extends INode implements BlockCollection {
public class INodeFile extends INode implements BlockCollection {
static final FsPermission UMASK = FsPermission.createImmutable((short)0111);
//Number of bits for Block size

View File

@ -34,6 +34,7 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
<module>hadoop-hdfs</module>
<module>hadoop-hdfs-httpfs</module>
<module>hadoop-hdfs/src/contrib/bkjournal</module>
<module>hadoop-hdfs-raid</module>
</modules>
<build>

View File

@ -118,6 +118,8 @@ Trunk (unreleased changes)
MAPREDUCE-3990. MRBench allows Long-sized input-lines value
but parses CLI argument as an Integer. (harsh)
MAPREDUCE-3868. Make Raid Compile. (Weiyan Wang via schen)
Branch-2 ( Unreleased changes )
INCOMPATIBLE CHANGES

View File

@ -257,6 +257,12 @@
<artifactId>hadoop-client</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs-raid</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minicluster</artifactId>