diff --git a/hadoop-assemblies/src/main/resources/assemblies/hadoop-raid-dist.xml b/hadoop-assemblies/src/main/resources/assemblies/hadoop-raid-dist.xml
new file mode 100644
index 0000000000..a7da364e72
--- /dev/null
+++ b/hadoop-assemblies/src/main/resources/assemblies/hadoop-raid-dist.xml
@@ -0,0 +1,60 @@
+
+
+
+ hadoop-raid-dist
+
+ dir
+
+ false
+
+
+
+ ${basedir}/src/main/conf
+ /etc/hadoop
+
+ *
+
+
+
+ ${basedir}/src/main/sbin
+ /sbin
+
+ *
+
+ 0755
+
+
+ ${basedir}/src/main/libexec
+ /libexec
+
+ *
+
+ 0755
+
+
+
+ ${project.build.directory}/site
+ /share/doc/hadoop/raid
+
+
+
+
+ /share/hadoop/${hadoop.component}/lib
+ false
+ runtime
+ true
+
+
+
diff --git a/hadoop-dist/pom.xml b/hadoop-dist/pom.xml
index 7fc6e563db..442cdcac33 100644
--- a/hadoop-dist/pom.xml
+++ b/hadoop-dist/pom.xml
@@ -52,6 +52,11 @@
hadoop-yarn-api
provided
+
+ org.apache.hadoop
+ hadoop-hdfs-raid
+ provided
+
@@ -120,6 +125,7 @@
run cp -r $ROOT/hadoop-common-project/hadoop-common/target/hadoop-common-${project.version}/* .
run cp -r $ROOT/hadoop-hdfs-project/hadoop-hdfs/target/hadoop-hdfs-${project.version}/* .
run cp -r $ROOT/hadoop-hdfs-project/hadoop-hdfs-httpfs/target/hadoop-hdfs-httpfs-${project.version}/* .
+ run cp -r $ROOT/hadoop-hdfs-project/hadoop-hdfs-raid/target/hadoop-hdfs-raid-${project.version}/* .
run cp -r $ROOT/hadoop-mapreduce-project/target/hadoop-mapreduce-${project.version}/* .
run cp -r $ROOT/hadoop-tools/hadoop-tools-dist/target/hadoop-tools-dist-${project.version}/* .
echo
diff --git a/hadoop-hdfs-project/hadoop-hdfs-raid/pom.xml b/hadoop-hdfs-project/hadoop-hdfs-raid/pom.xml
new file mode 100644
index 0000000000..f4012b93f2
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-raid/pom.xml
@@ -0,0 +1,202 @@
+
+
+
+ 4.0.0
+
+ org.apache.hadoop
+ hadoop-project-dist
+ 3.0.0-SNAPSHOT
+ ../../hadoop-project-dist
+
+ org.apache.hadoop
+ hadoop-hdfs-raid
+ 3.0.0-SNAPSHOT
+ jar
+
+ Apache Hadoop HDFS Raid
+ Apache Hadoop HDFS Raid
+
+
+
+ raid
+ false
+
+
+
+
+ junit
+ junit
+ test
+
+
+ org.apache.hadoop
+ hadoop-annotations
+ provided
+
+
+ org.apache.hadoop
+ hadoop-minicluster
+ test
+
+
+ org.apache.hadoop
+ hadoop-client
+ provided
+
+
+ org.apache.hadoop
+ hadoop-archives
+ provided
+
+
+
+
+
+
+
+ maven-dependency-plugin
+
+
+ create-mrapp-generated-classpath
+ generate-test-resources
+
+ build-classpath
+
+
+
+ ${project.build.directory}/test-classes/mrapp-generated-classpath
+
+
+
+
+
+ org.apache.rat
+ apache-rat-plugin
+
+
+
+
+
+
+ org.codehaus.mojo
+ findbugs-maven-plugin
+
+ ${basedir}/dev-support/findbugsExcludeFile.xml
+
+
+
+
+
+
+
+ docs
+
+ false
+
+
+
+
+ org.apache.maven.plugins
+ maven-site-plugin
+
+
+ docs
+ prepare-package
+
+ site
+
+
+
+
+
+
+
+
+
+ dist
+
+ false
+
+
+
+
+ org.apache.maven.plugins
+ maven-assembly-plugin
+
+
+ org.apache.hadoop
+ hadoop-assemblies
+ ${project.version}
+
+
+
+
+ dist
+ package
+
+ single
+
+
+ ${project.artifactId}-${project.version}
+ false
+ false
+
+ hadoop-raid-dist
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-antrun-plugin
+
+
+ tar
+ package
+
+ run
+
+
+
+
+
+
+ which cygpath 2> /dev/null
+ if [ $? = 1 ]; then
+ BUILD_DIR="${project.build.directory}"
+ else
+ BUILD_DIR=`cygpath --unix '${project.build.directory}'`
+ fi
+ cd $BUILD_DIR
+ tar czf ${project.artifactId}-${project.version}.tar.gz ${project.artifactId}-${project.version}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/hadoop-mapreduce-project/src/contrib/raid/conf/raid.xml b/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/conf/raid.xml
similarity index 100%
rename from hadoop-mapreduce-project/src/contrib/raid/conf/raid.xml
rename to hadoop-hdfs-project/hadoop-hdfs-raid/src/main/conf/raid.xml
diff --git a/hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/hdfs/DistributedRaidFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/hdfs/DistributedRaidFileSystem.java
similarity index 100%
rename from hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/hdfs/DistributedRaidFileSystem.java
rename to hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/hdfs/DistributedRaidFileSystem.java
diff --git a/hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/hdfs/RaidDFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/hdfs/RaidDFSUtil.java
similarity index 100%
rename from hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/hdfs/RaidDFSUtil.java
rename to hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/hdfs/RaidDFSUtil.java
diff --git a/hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRaid.java b/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRaid.java
similarity index 100%
rename from hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRaid.java
rename to hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRaid.java
diff --git a/hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/hdfs/server/datanode/RaidBlockSender.java b/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/hdfs/server/datanode/RaidBlockSender.java
similarity index 63%
rename from hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/hdfs/server/datanode/RaidBlockSender.java
rename to hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/hdfs/server/datanode/RaidBlockSender.java
index a29a3ca1b1..7eb6e16c55 100644
--- a/hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/hdfs/server/datanode/RaidBlockSender.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/hdfs/server/datanode/RaidBlockSender.java
@@ -34,7 +34,9 @@
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.net.SocketOutputStream;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.StringUtils;
@@ -56,8 +58,10 @@ public class RaidBlockSender implements java.io.Closeable {
private DataInputStream checksumIn; // checksum datastream
private DataChecksum checksum; // checksum stream
private long offset; // starting position to read
+ /** Initial position to read */
+ private long initialOffset;
private long endOffset; // ending position
- private int bytesPerChecksum; // chunk size
+ private int chunkSize; // chunk size
private int checksumSize; // checksum size
private boolean corruptChecksumOk; // if need to verify checksum
private boolean chunkOffsetOK; // if need to send chunk offset
@@ -74,6 +78,8 @@ public class RaidBlockSender implements java.io.Closeable {
* not sure if there will be much more improvement.
*/
private static final int MIN_BUFFER_WITH_TRANSFERTO = 64*1024;
+ private static final int TRANSFERTO_BUFFER_SIZE = Math.max(
+ HdfsConstants.IO_FILE_BUFFER_SIZE, MIN_BUFFER_WITH_TRANSFERTO);
private volatile ChunkChecksum lastChunkChecksum = null;
@@ -125,12 +131,13 @@ public RaidBlockSender(ExtendedBlock block, long blockLength, long startOffset,
* is mostly corrupted. For now just truncate bytesPerchecksum to
* blockLength.
*/
- bytesPerChecksum = checksum.getBytesPerChecksum();
- if (bytesPerChecksum > 10*1024*1024 && bytesPerChecksum > replicaVisibleLength) {
+ int size = checksum.getBytesPerChecksum();
+ if (size > 10*1024*1024 && size > replicaVisibleLength) {
checksum = DataChecksum.newDataChecksum(checksum.getChecksumType(),
Math.max((int)replicaVisibleLength, 10*1024*1024));
- bytesPerChecksum = checksum.getBytesPerChecksum();
+ size = checksum.getBytesPerChecksum();
}
+ chunkSize = size;
checksumSize = checksum.getChecksumSize();
if (length < 0) {
@@ -147,12 +154,12 @@ public RaidBlockSender(ExtendedBlock block, long blockLength, long startOffset,
throw new IOException(msg);
}
- offset = (startOffset - (startOffset % bytesPerChecksum));
+ offset = (startOffset - (startOffset % chunkSize));
if (length >= 0) {
// Make sure endOffset points to end of a checksumed chunk.
long tmpLen = startOffset + length;
- if (tmpLen % bytesPerChecksum != 0) {
- tmpLen += (bytesPerChecksum - tmpLen % bytesPerChecksum);
+ if (tmpLen % chunkSize != 0) {
+ tmpLen += (chunkSize - tmpLen % chunkSize);
}
if (tmpLen < endOffset) {
// will use on-disk checksum here since the end is a stable chunk
@@ -162,7 +169,7 @@ public RaidBlockSender(ExtendedBlock block, long blockLength, long startOffset,
// seek to the right offsets
if (offset > 0) {
- long checksumSkip = (offset / bytesPerChecksum) * checksumSize;
+ long checksumSkip = (offset / chunkSize) * checksumSize;
// note blockInStream is seeked when created below
if (checksumSkip > 0) {
// Should we use seek() for checksum file as well?
@@ -178,7 +185,7 @@ public RaidBlockSender(ExtendedBlock block, long blockLength, long startOffset,
throw ioe;
}
}
-
+
/**
* close opened files.
*/
@@ -227,57 +234,85 @@ private static IOException ioeToSocketException(IOException ioe) {
// otherwise just return the same exception.
return ioe;
}
-
+
/**
- * Sends upto maxChunks chunks of data.
- *
- * When blockInPosition is >= 0, assumes 'out' is a
- * {@link SocketOutputStream} and tries
- * {@link SocketOutputStream#transferToFully(FileChannel, long, int)} to
- * send data (and updates blockInPosition).
+ * @param datalen Length of data
+ * @return number of chunks for data of given size
*/
- private int sendChunks(ByteBuffer pkt, int maxChunks, OutputStream out)
- throws IOException {
- // Sends multiple chunks in one packet with a single write().
-
- int len = (int) Math.min(endOffset - offset,
- (((long) bytesPerChecksum) * ((long) maxChunks)));
- int numChunks = (len + bytesPerChecksum - 1)/bytesPerChecksum;
- int packetLen = len + numChunks*checksumSize + 4;
- boolean lastDataPacket = offset + len == endOffset && len > 0;
+ private int numberOfChunks(long datalen) {
+ return (int) ((datalen + chunkSize - 1)/chunkSize);
+ }
+
+ /**
+ * Write packet header into {@code pkt}
+ */
+ private void writePacketHeader(ByteBuffer pkt, int dataLen, int packetLen) {
pkt.clear();
-
-
- PacketHeader header = new PacketHeader(
- packetLen, offset, seqno, (len == 0), len);
+ PacketHeader header = new PacketHeader(packetLen, offset, seqno,
+ (dataLen == 0), dataLen, false);
header.putInBuffer(pkt);
+ }
+
+ /**
+ * Read checksum into given buffer
+ * @param buf buffer to read the checksum into
+ * @param checksumOffset offset at which to write the checksum into buf
+ * @param checksumLen length of checksum to write
+ * @throws IOException on error
+ */
+ private void readChecksum(byte[] buf, final int checksumOffset,
+ final int checksumLen) throws IOException {
+ if (checksumSize <= 0 && checksumIn == null) {
+ return;
+ }
+ try {
+ checksumIn.readFully(buf, checksumOffset, checksumLen);
+ } catch (IOException e) {
+ LOG.warn(" Could not read or failed to veirfy checksum for data"
+ + " at offset " + offset + " for block " + block, e);
+ IOUtils.closeStream(checksumIn);
+ checksumIn = null;
+ if (corruptChecksumOk) {
+ if (checksumOffset < checksumLen) {
+ // Just fill the array with zeros.
+ Arrays.fill(buf, checksumOffset, checksumLen, (byte) 0);
+ }
+ } else {
+ throw e;
+ }
+ }
+ }
+
+ /**
+ * Sends a packet with up to maxChunks chunks of data.
+ *
+ * @param pkt buffer used for writing packet data
+ * @param maxChunks maximum number of chunks to send
+ * @param out stream to send data to
+ * @param transferTo use transferTo to send data
+ * @param throttler used for throttling data transfer bandwidth
+ */
+ private int sendPacket(ByteBuffer pkt, int maxChunks, OutputStream out,
+ boolean transferTo, DataTransferThrottler throttler) throws IOException {
+ int dataLen = (int) Math.min(endOffset - offset,
+ (chunkSize * (long) maxChunks));
+
+ int numChunks = numberOfChunks(dataLen); // Number of chunks be sent in the packet
+ int checksumDataLen = numChunks * checksumSize;
+ int packetLen = dataLen + checksumDataLen + 4;
+ boolean lastDataPacket = offset + dataLen == endOffset && dataLen > 0;
+
+ writePacketHeader(pkt, dataLen, packetLen);
int checksumOff = pkt.position();
- int checksumLen = numChunks * checksumSize;
byte[] buf = pkt.array();
if (checksumSize > 0 && checksumIn != null) {
- try {
- checksumIn.readFully(buf, checksumOff, checksumLen);
- } catch (IOException e) {
- LOG.warn(" Could not read or failed to veirfy checksum for data" +
- " at offset " + offset + " for block " + block + " got : "
- + StringUtils.stringifyException(e));
- IOUtils.closeStream(checksumIn);
- checksumIn = null;
- if (corruptChecksumOk) {
- if (checksumOff < checksumLen) {
- // Just fill the array with zeros.
- Arrays.fill(buf, checksumOff, checksumLen, (byte) 0);
- }
- } else {
- throw e;
- }
- }
+ readChecksum(buf, checksumOff, checksumDataLen);
// write in progress that we need to use to get last checksum
if (lastDataPacket && lastChunkChecksum != null) {
- int start = checksumOff + checksumLen - checksumSize;
+ int start = checksumOff + checksumDataLen - checksumSize;
byte[] updatedChecksum = lastChunkChecksum.getChecksum();
if (updatedChecksum != null) {
@@ -286,61 +321,85 @@ private int sendChunks(ByteBuffer pkt, int maxChunks, OutputStream out)
}
}
- int dataOff = checksumOff + checksumLen;
-
- if (blockInPosition < 0) {
- //normal transfer
- IOUtils.readFully(blockIn, buf, dataOff, len);
+ int dataOff = checksumOff + checksumDataLen;
+ if (!transferTo) { // normal transfer
+ IOUtils.readFully(blockIn, buf, dataOff, dataLen);
if (verifyChecksum) {
- int dOff = dataOff;
- int cOff = checksumOff;
- int dLeft = len;
-
- for (int i=0; i= 0) {
- //use transferTo(). Checks on out and blockIn are already done.
-
+ if (transferTo) {
SocketOutputStream sockOut = (SocketOutputStream)out;
- //first write the packet
- sockOut.write(buf, 0, dataOff);
+ sockOut.write(buf, 0, dataOff); // First write checksum
+
// no need to flush. since we know out is not a buffered stream.
-
sockOut.transferToFully(((FileInputStream)blockIn).getChannel(),
- blockInPosition, len);
-
- blockInPosition += len;
- } else {
+ blockInPosition, dataLen);
+ blockInPosition += dataLen;
+ } else {
// normal transfer
- out.write(buf, 0, dataOff + len);
+ out.write(buf, 0, dataOff + dataLen);
}
-
} catch (IOException e) {
- /* exception while writing to the client (well, with transferTo(),
- * it could also be while reading from the local file).
+ /* Exception while writing to the client. Connection closure from
+ * the other end is mostly the case and we do not care much about
+ * it. But other things can go wrong, especially in transferTo(),
+ * which we do not want to ignore.
+ *
+ * The message parsing below should not be considered as a good
+ * coding example. NEVER do it to drive a program logic. NEVER.
+ * It was done here because the NIO throws an IOException for EPIPE.
*/
+ String ioem = e.getMessage();
+ if (!ioem.startsWith("Broken pipe") && !ioem.startsWith("Connection reset")) {
+ LOG.error("BlockSender.sendChunks() exception: ", e);
+ }
throw ioeToSocketException(e);
}
- return len;
+ if (throttler != null) { // rebalancing so throttle
+ throttler.throttle(packetLen);
+ }
+
+ return dataLen;
}
+
+ /**
+ * Compute checksum for chunks and verify the checksum that is read from
+ * the metadata file is correct.
+ *
+ * @param buf buffer that has checksum and data
+ * @param dataOffset position where data is written in the buf
+ * @param datalen length of data
+ * @param numChunks number of chunks corresponding to data
+ * @param checksumOffset offset where checksum is written in the buf
+ * @throws ChecksumException on failed checksum verification
+ */
+ public void verifyChecksum(final byte[] buf, final int dataOffset,
+ final int datalen, final int numChunks, final int checksumOffset)
+ throws ChecksumException {
+ int dOff = dataOffset;
+ int cOff = checksumOffset;
+ int dLeft = datalen;
+
+ for (int i = 0; i < numChunks; i++) {
+ checksum.reset();
+ int dLen = Math.min(dLeft, chunkSize);
+ checksum.update(buf, dOff, dLen);
+ if (!checksum.compare(buf, cOff)) {
+ long failedPos = offset + datalen - dLeft;
+ throw new ChecksumException("Checksum failed at " + failedPos,
+ failedPos);
+ }
+ dLeft -= dLen;
+ dOff += dLen;
+ cOff += checksumSize;
+ }
+ }
+
/**
* sendBlock() is used to read block and its metadata and stream the data to
@@ -356,79 +415,61 @@ private int sendChunks(ByteBuffer pkt, int maxChunks, OutputStream out)
*/
public long sendBlock(DataOutputStream out, OutputStream baseStream)
throws IOException {
- if( out == null ) {
+ if (out == null) {
throw new IOException( "out stream is null" );
}
-
- long initialOffset = offset;
+ initialOffset = offset;
long totalRead = 0;
OutputStream streamForSendChunks = out;
final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
try {
- try {
- checksum.writeHeader(out);
- if ( chunkOffsetOK ) {
- out.writeLong( offset );
- }
- out.flush();
- } catch (IOException e) { //socket error
- throw ioeToSocketException(e);
- }
-
int maxChunksPerPacket;
int pktSize = PacketHeader.PKT_HEADER_LEN;
-
- if (transferToAllowed && !verifyChecksum &&
- baseStream instanceof SocketOutputStream &&
- blockIn instanceof FileInputStream) {
-
+ boolean transferTo = transferToAllowed && !verifyChecksum
+ && baseStream instanceof SocketOutputStream
+ && blockIn instanceof FileInputStream;
+ if (transferTo) {
FileChannel fileChannel = ((FileInputStream)blockIn).getChannel();
-
- // blockInPosition also indicates sendChunks() uses transferTo.
blockInPosition = fileChannel.position();
streamForSendChunks = baseStream;
+ maxChunksPerPacket = numberOfChunks(TRANSFERTO_BUFFER_SIZE);
- // assure a mininum buffer size.
- maxChunksPerPacket = (Math.max(HdfsConstants.IO_FILE_BUFFER_SIZE,
- MIN_BUFFER_WITH_TRANSFERTO)
- + bytesPerChecksum - 1)/bytesPerChecksum;
-
- // allocate smaller buffer while using transferTo().
+ // Smaller packet size to only hold checksum when doing transferTo
pktSize += checksumSize * maxChunksPerPacket;
} else {
maxChunksPerPacket = Math.max(1,
- (HdfsConstants.IO_FILE_BUFFER_SIZE + bytesPerChecksum - 1)/bytesPerChecksum);
- pktSize += (bytesPerChecksum + checksumSize) * maxChunksPerPacket;
+ numberOfChunks(HdfsConstants.IO_FILE_BUFFER_SIZE));
+ // Packet size includes both checksum and data
+ pktSize += (chunkSize + checksumSize) * maxChunksPerPacket;
}
ByteBuffer pktBuf = ByteBuffer.allocate(pktSize);
while (endOffset > offset) {
- long len = sendChunks(pktBuf, maxChunksPerPacket,
- streamForSendChunks);
+ long len = sendPacket(pktBuf, maxChunksPerPacket, streamForSendChunks,
+ transferTo, null);
offset += len;
- totalRead += len + ((len + bytesPerChecksum - 1)/bytesPerChecksum*
- checksumSize);
+ totalRead += len + (numberOfChunks(len) * checksumSize);
seqno++;
}
try {
// send an empty packet to mark the end of the block
- sendChunks(pktBuf, maxChunksPerPacket, streamForSendChunks);
+ sendPacket(pktBuf, maxChunksPerPacket, streamForSendChunks, transferTo,
+ null);
out.flush();
} catch (IOException e) { //socket error
throw ioeToSocketException(e);
}
+ blockReadFully = true;
} finally {
if (clientTraceFmt != null) {
final long endTime = System.nanoTime();
- ClientTraceLog.info(String.format(clientTraceFmt, totalRead, initialOffset, endTime - startTime));
+ ClientTraceLog.info(String.format(clientTraceFmt, totalRead,
+ initialOffset, endTime - startTime));
}
close();
}
-
- blockReadFully = initialOffset == 0 && offset >= replicaVisibleLength;
-
return totalRead;
}
@@ -440,6 +481,13 @@ public static interface InputStreamFactory {
public InputStream createStream(long offset) throws IOException;
}
+ /**
+ * @return the checksum type that will be used with this block transfer.
+ */
+ public DataChecksum getChecksum() {
+ return checksum;
+ }
+
private static class BlockInputStreamFactory implements InputStreamFactory {
private final ExtendedBlock block;
private final FsDatasetSpi> data;
diff --git a/hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRaidUtil.java b/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRaidUtil.java
similarity index 98%
rename from hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRaidUtil.java
rename to hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRaidUtil.java
index 531a0f238e..9258696156 100644
--- a/hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRaidUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRaidUtil.java
@@ -50,7 +50,7 @@ public static LocatedBlocks getBlockLocations(final FSNamesystem namesystem,
final boolean doAccessTime, final boolean needBlockToken
) throws FileNotFoundException, UnresolvedLinkException, IOException {
return namesystem.getBlockLocations(src, offset, length,
- doAccessTime, needBlockToken);
+ doAccessTime, needBlockToken, true);
}
}
diff --git a/hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/raid/BlockFixer.java b/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/BlockFixer.java
similarity index 97%
rename from hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/raid/BlockFixer.java
rename to hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/BlockFixer.java
index 6e1d7f7917..dd2fb96c2b 100644
--- a/hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/raid/BlockFixer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/BlockFixer.java
@@ -18,6 +18,9 @@
package org.apache.hadoop.raid;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY;
+
import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@@ -44,14 +47,17 @@
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.datatransfer.*;
+import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.RaidBlockSender;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -61,6 +67,7 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.RaidDFSUtil;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.net.NetUtils;
@@ -649,7 +656,7 @@ static DataInputStream computeMetadata(Configuration conf,
mdOut.writeShort(BlockMetadataHeader.VERSION);
// Create a summer and write out its header.
- int bytesPerChecksum = conf.getInt("io.bytes.per.checksum", 512);
+ int bytesPerChecksum = conf.getInt("dfs.bytes-per-checksum", 512);
DataChecksum sum =
DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32,
bytesPerChecksum);
@@ -709,8 +716,8 @@ private void computeMetadataAndSendFixedBlock(DatanodeInfo datanode,
blockContents.close();
// Reopen
blockContents = new FileInputStream(localBlockFile);
- sendFixedBlock(datanode, blockContents, blockMetadata, block,
- blockSize);
+ sendFixedBlock(datanode, blockContents, blockMetadata, block,
+ blockSize);
} finally {
if (blockContents != null) {
blockContents.close();
@@ -780,9 +787,11 @@ private void sendFixedBlock(DatanodeInfo datanode,
});
DatanodeInfo[] nodes = new DatanodeInfo[]{datanode};
+ DataChecksum checksum = blockSender.getChecksum();
new Sender(out).writeBlock(block.getBlock(), block.getBlockToken(), "",
nodes, null, BlockConstructionStage.PIPELINE_SETUP_CREATE,
- 1, 0L, blockSize, 0L, DataChecksum.newDataChecksum(metadataIn));
+ 1, 0L, blockSize, 0L, DataChecksum.newDataChecksum(
+ checksum.getChecksumType(), checksum.getBytesPerChecksum()));
blockSender.sendBlock(out, baseStream);
LOG.info("Sent block " + block.getBlock() + " to " + datanode.getName());
diff --git a/hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/raid/ConfigManager.java b/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/ConfigManager.java
similarity index 100%
rename from hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/raid/ConfigManager.java
rename to hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/ConfigManager.java
diff --git a/hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/raid/Decoder.java b/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/Decoder.java
similarity index 100%
rename from hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/raid/Decoder.java
rename to hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/Decoder.java
diff --git a/hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/raid/DirectoryTraversal.java b/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/DirectoryTraversal.java
similarity index 97%
rename from hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/raid/DirectoryTraversal.java
rename to hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/DirectoryTraversal.java
index 4c955df005..c1ff9bccc4 100644
--- a/hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/raid/DirectoryTraversal.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/DirectoryTraversal.java
@@ -112,6 +112,8 @@ public DirectoryTraversal(
public List getFilteredFiles(FileFilter filter, int limit) {
List filtered = new ArrayList();
+ if (limit == 0)
+ return filtered;
// We need this semaphore to block when the number of running workitems
// is equal to the number of threads. FixedThreadPool limits the number
@@ -120,20 +122,26 @@ public List getFilteredFiles(FileFilter filter, int limit) {
Semaphore slots = new Semaphore(numThreads);
while (true) {
- synchronized(filtered) {
- if (filtered.size() >= limit) break;
- }
FilterFileWorkItem work = null;
try {
+ slots.acquire();
+ synchronized(filtered) {
+ if (filtered.size() >= limit) {
+ slots.release();
+ break;
+ }
+ }
Node next = getNextDirectoryNode();
if (next == null) {
+ slots.release();
break;
}
work = new FilterFileWorkItem(filter, next, filtered, slots);
- slots.acquire();
} catch (InterruptedException ie) {
+ slots.release();
break;
} catch (IOException e) {
+ slots.release();
break;
}
executor.execute(work);
diff --git a/hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/raid/DistBlockFixer.java b/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/DistBlockFixer.java
similarity index 100%
rename from hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/raid/DistBlockFixer.java
rename to hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/DistBlockFixer.java
diff --git a/hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/raid/DistRaid.java b/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/DistRaid.java
similarity index 99%
rename from hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/raid/DistRaid.java
rename to hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/DistRaid.java
index 81a3198f57..5c6e5cc79d 100644
--- a/hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/raid/DistRaid.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/DistRaid.java
@@ -277,6 +277,7 @@ public boolean startDistRaid() throws IOException {
*/
public boolean checkComplete() throws IOException {
JobID jobID = runningJob.getJobID();
+ LOG.info("Checking job " + jobID);
try {
if (runningJob.isComplete()) {
// delete job directory
diff --git a/hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/raid/DistRaidNode.java b/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/DistRaidNode.java
similarity index 100%
rename from hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/raid/DistRaidNode.java
rename to hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/DistRaidNode.java
diff --git a/hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/raid/Encoder.java b/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/Encoder.java
similarity index 100%
rename from hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/raid/Encoder.java
rename to hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/Encoder.java
diff --git a/hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/raid/ErasureCode.java b/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/ErasureCode.java
similarity index 100%
rename from hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/raid/ErasureCode.java
rename to hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/ErasureCode.java
diff --git a/hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/raid/GaloisField.java b/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/GaloisField.java
similarity index 86%
rename from hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/raid/GaloisField.java
rename to hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/GaloisField.java
index 78b7af1b34..a10f5d79ea 100644
--- a/hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/raid/GaloisField.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/GaloisField.java
@@ -208,7 +208,7 @@ public void solveVandermondeSystem(int[] x, int[] y) {
* @param len consider x and y only from 0...len-1
*/
public void solveVandermondeSystem(int[] x, int[] y, int len) {
- assert(x.length <= len && y.length <= len);
+ assert(y.length <= len);
for (int i = 0; i < len - 1; i++) {
for (int j = len - 1; j > i; j--) {
y[j] = y[j] ^ mulTable[x[i]][y[j - 1]];
@@ -302,4 +302,49 @@ public int substitute(int[] p, int x) {
}
return result;
}
+
+ /**
+ * Perform Gaussian elimination on the given matrix. This matrix has to be a
+ * fat matrix (number of rows > number of columns).
+ */
+ public void gaussianElimination(int[][] matrix) {
+ assert(matrix != null && matrix.length > 0 && matrix[0].length > 0
+ && matrix.length < matrix[0].length);
+ int height = matrix.length;
+ int width = matrix[0].length;
+ for (int i = 0; i < height; i++) {
+ boolean pivotFound = false;
+ // scan the column for a nonzero pivot and swap it to the diagonal
+ for (int j = i; j < height; j++) {
+ if (matrix[i][j] != 0) {
+ int[] tmp = matrix[i];
+ matrix[i] = matrix[j];
+ matrix[j] = tmp;
+ pivotFound = true;
+ break;
+ }
+ }
+ if (!pivotFound) {
+ continue;
+ }
+ int pivot = matrix[i][i];
+ for (int j = i; j < width; j++) {
+ matrix[i][j] = divide(matrix[i][j], pivot);
+ }
+ for (int j = i + 1; j < height; j++) {
+ int lead = matrix[j][i];
+ for (int k = i; k < width; k++) {
+ matrix[j][k] = add(matrix[j][k], multiply(lead, matrix[i][k]));
+ }
+ }
+ }
+ for (int i = height - 1; i >=0; i--) {
+ for (int j = 0; j < i; j++) {
+ int lead = matrix[j][i];
+ for (int k = i; k < width; k++) {
+ matrix[j][k] = add(matrix[j][k], multiply(lead, matrix[i][k]));
+ }
+ }
+ }
+ }
}
diff --git a/hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/raid/HarIndex.java b/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/HarIndex.java
similarity index 100%
rename from hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/raid/HarIndex.java
rename to hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/HarIndex.java
diff --git a/hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/raid/JobMonitor.java b/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/JobMonitor.java
similarity index 93%
rename from hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/raid/JobMonitor.java
rename to hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/JobMonitor.java
index e01fcba73e..cda295c65a 100644
--- a/hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/raid/JobMonitor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/JobMonitor.java
@@ -44,12 +44,13 @@ class JobMonitor implements Runnable {
volatile boolean running = true;
private Map> jobs;
+ public static final String JOBMONITOR_INTERVAL_KEY = "raid.jobmonitor.interval";
private long jobMonitorInterval;
private volatile long jobsMonitored = 0;
private volatile long jobsSucceeded = 0;
public JobMonitor(Configuration conf) {
- jobMonitorInterval = conf.getLong("raid.jobmonitor.interval", 60000);
+ jobMonitorInterval = conf.getLong(JOBMONITOR_INTERVAL_KEY, 60000);
jobs = new java.util.HashMap>();
}
@@ -112,6 +113,7 @@ public void doMonitor() {
} catch (IOException ioe) {
// If there was an error, consider the job finished.
addJob(finishedJobs, key, job);
+ LOG.error("JobMonitor exception", ioe);
}
}
}
@@ -159,6 +161,17 @@ public long jobsMonitored() {
public long jobsSucceeded() {
return this.jobsSucceeded;
}
+
+ // For test code
+ int runningJobsCount() {
+ int total = 0;
+ synchronized(jobs) {
+ for (String key: jobs.keySet()) {
+ total += jobs.get(key).size();
+ }
+ }
+ return total;
+ }
private static void addJob(Map> jobsMap,
String jobName, DistRaid job) {
diff --git a/hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/raid/LocalBlockFixer.java b/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/LocalBlockFixer.java
similarity index 100%
rename from hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/raid/LocalBlockFixer.java
rename to hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/LocalBlockFixer.java
diff --git a/hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/raid/LocalRaidNode.java b/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/LocalRaidNode.java
similarity index 100%
rename from hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/raid/LocalRaidNode.java
rename to hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/LocalRaidNode.java
diff --git a/hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/raid/ParityInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/ParityInputStream.java
similarity index 100%
rename from hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/raid/ParityInputStream.java
rename to hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/ParityInputStream.java
diff --git a/hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidConfigurationException.java b/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/RaidConfigurationException.java
similarity index 100%
rename from hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidConfigurationException.java
rename to hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/RaidConfigurationException.java
diff --git a/hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidFilter.java b/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/RaidFilter.java
similarity index 100%
rename from hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidFilter.java
rename to hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/RaidFilter.java
diff --git a/hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java b/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/RaidNode.java
similarity index 99%
rename from hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
rename to hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/RaidNode.java
index 48329d357f..dc196494cf 100644
--- a/hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/RaidNode.java
@@ -80,6 +80,8 @@ public abstract class RaidNode implements RaidProtocol {
}
public static final Log LOG = LogFactory.getLog( "org.apache.hadoop.raid.RaidNode");
public static final long SLEEP_TIME = 10000L; // 10 seconds
+ public static final String TRIGGER_MONITOR_SLEEP_TIME_KEY =
+ "hdfs.raid.trigger.monitor.sleep.time";
public static final int DEFAULT_PORT = 60000;
// Default stripe length = 5, parity length for RS code = 3
public static final int DEFAULT_STRIPE_LENGTH = 5;
@@ -126,6 +128,7 @@ public abstract class RaidNode implements RaidProtocol {
/** Deamon thread to trigger policies */
Daemon triggerThread = null;
+ public static long triggerMonitorSleepTime = SLEEP_TIME;
/** Deamon thread to delete obsolete parity files */
PurgeMonitor purgeMonitor = null;
@@ -299,6 +302,10 @@ private void initialize(Configuration conf)
this.blockFixer = BlockFixer.createBlockFixer(conf);
this.blockFixerThread = new Daemon(this.blockFixer);
this.blockFixerThread.start();
+ // start the deamon thread to fire polcies appropriately
+ RaidNode.triggerMonitorSleepTime = conf.getLong(
+ TRIGGER_MONITOR_SLEEP_TIME_KEY,
+ SLEEP_TIME);
// start the deamon thread to fire polcies appropriately
this.triggerThread = new Daemon(new TriggerMonitor());
@@ -503,7 +510,7 @@ private void doProcess() throws IOException, InterruptedException {
}
}
while (running) {
- Thread.sleep(SLEEP_TIME);
+ Thread.sleep(RaidNode.triggerMonitorSleepTime);
boolean reloaded = configMgr.reloadConfigsIfNecessary();
if (reloaded) {
@@ -542,7 +549,7 @@ private void doProcess() throws IOException, InterruptedException {
// Apply the action on accepted paths
LOG.info("Triggering Policy Action " + info.getName() +
- " " + info.getSrcPath());
+ " " + info.getSrcPath() + " raid " + filteredPaths.size() + " files");
try {
raidFiles(info, filteredPaths);
} catch (Exception e) {
diff --git a/hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidShell.java b/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/RaidShell.java
similarity index 96%
rename from hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidShell.java
rename to hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/RaidShell.java
index 479043caad..55812101ca 100644
--- a/hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidShell.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/RaidShell.java
@@ -43,6 +43,7 @@
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -296,9 +297,22 @@ public Path[] recover(String cmd, String argv[], int startindex)
for (int i = startindex; i < argv.length; i = i + 2) {
String path = argv[i];
long corruptOffset = Long.parseLong(argv[i+1]);
- LOG.debug("RaidShell recoverFile for " + path + " corruptOffset " + corruptOffset);
- paths[j] = new Path(raidnode.recoverFile(path, corruptOffset));
- LOG.debug("Raidshell created recovery file " + paths[j]);
+ LOG.info("RaidShell recoverFile for " + path + " corruptOffset " + corruptOffset);
+ Path recovered = new Path("/tmp/recovered." + System.currentTimeMillis());
+ FileSystem fs = recovered.getFileSystem(conf);
+ DistributedFileSystem dfs = (DistributedFileSystem)fs;
+ Configuration raidConf = new Configuration(conf);
+ raidConf.set("fs.hdfs.impl",
+ "org.apache.hadoop.hdfs.DistributedRaidFileSystem");
+ raidConf.set("fs.raid.underlyingfs.impl",
+ "org.apache.hadoop.hdfs.DistributedFileSystem");
+ raidConf.setBoolean("fs.hdfs.impl.disable.cache", true);
+ java.net.URI dfsUri = dfs.getUri();
+ FileSystem raidFs = FileSystem.get(dfsUri, raidConf);
+ FileUtil.copy(raidFs, new Path(path), fs, recovered, false, conf);
+
+ paths[j] = recovered;
+ LOG.info("Raidshell created recovery file " + paths[j]);
j++;
}
return paths;
diff --git a/hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidUtils.java b/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/RaidUtils.java
similarity index 100%
rename from hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidUtils.java
rename to hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/RaidUtils.java
diff --git a/hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/raid/ReedSolomonCode.java b/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/ReedSolomonCode.java
similarity index 54%
rename from hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/raid/ReedSolomonCode.java
rename to hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/ReedSolomonCode.java
index 639e537369..0a5d91bf4d 100644
--- a/hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/raid/ReedSolomonCode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/ReedSolomonCode.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.raid;
+import java.util.Set;
+
public class ReedSolomonCode implements ErasureCode {
@@ -103,4 +105,79 @@ public int paritySize() {
public int symbolSize() {
return (int) Math.round(Math.log(GF.getFieldSize()) / Math.log(2));
}
+
+ /**
+ * Given parity symbols followed by message symbols, return the locations of
+ * symbols that are corrupted. Can resolve up to (parity length / 2) error
+ * locations.
+ * @param data The message and parity. The parity should be placed in the
+ * first part of the array. In each integer, the relevant portion
+ * is present in the least significant bits of each int.
+ * The number of elements in data is stripeSize() + paritySize().
+ * Note that data may be changed after calling this method.
+ * @param errorLocations The set to put the error location results
+ * @return true If the locations can be resolved, return true.
+ */
+ public boolean computeErrorLocations(int[] data,
+ Set errorLocations) {
+ assert(data.length == paritySize + stripeSize && errorLocations != null);
+ errorLocations.clear();
+ int maxError = paritySize / 2;
+ int[][] syndromeMatrix = new int[maxError][];
+ for (int i = 0; i < syndromeMatrix.length; ++i) {
+ syndromeMatrix[i] = new int[maxError + 1];
+ }
+ int[] syndrome = new int[paritySize];
+
+ if (computeSyndrome(data, syndrome)) {
+ // Parity check OK. No error location added.
+ return true;
+ }
+ for (int i = 0; i < maxError; ++i) {
+ for (int j = 0; j < maxError + 1; ++j) {
+ syndromeMatrix[i][j] = syndrome[i + j];
+ }
+ }
+ GF.gaussianElimination(syndromeMatrix);
+ int[] polynomial = new int[maxError + 1];
+ polynomial[0] = 1;
+ for (int i = 0; i < maxError; ++i) {
+ polynomial[i + 1] = syndromeMatrix[maxError - 1 - i][maxError];
+ }
+ for (int i = 0; i < paritySize + stripeSize; ++i) {
+ int possibleRoot = GF.divide(1, primitivePower[i]);
+ if (GF.substitute(polynomial, possibleRoot) == 0) {
+ errorLocations.add(i);
+ }
+ }
+ // Now recover with error locations and check the syndrome again
+ int[] locations = new int[errorLocations.size()];
+ int k = 0;
+ for (int loc : errorLocations) {
+ locations[k++] = loc;
+ }
+ int [] erasedValue = new int[locations.length];
+ decode(data, locations, erasedValue);
+ for (int i = 0; i < locations.length; ++i) {
+ data[locations[i]] = erasedValue[i];
+ }
+ return computeSyndrome(data, syndrome);
+ }
+
+ /**
+ * Compute the syndrome of the input [parity, message]
+ * @param data [parity, message]
+ * @param syndrome The syndromes (checksums) of the data
+ * @return true If syndromes are all zeros
+ */
+ private boolean computeSyndrome(int[] data, int [] syndrome) {
+ boolean corruptionFound = false;
+ for (int i = 0; i < paritySize; i++) {
+ syndrome[i] = GF.substitute(data, primitivePower[i]);
+ if (syndrome[i] != 0) {
+ corruptionFound = true;
+ }
+ }
+ return !corruptionFound;
+ }
}
diff --git a/hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/raid/ReedSolomonDecoder.java b/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/ReedSolomonDecoder.java
similarity index 100%
rename from hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/raid/ReedSolomonDecoder.java
rename to hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/ReedSolomonDecoder.java
diff --git a/hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/raid/ReedSolomonEncoder.java b/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/ReedSolomonEncoder.java
similarity index 100%
rename from hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/raid/ReedSolomonEncoder.java
rename to hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/ReedSolomonEncoder.java
diff --git a/hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/raid/XORDecoder.java b/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/XORDecoder.java
similarity index 100%
rename from hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/raid/XORDecoder.java
rename to hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/XORDecoder.java
diff --git a/hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/raid/XOREncoder.java b/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/XOREncoder.java
similarity index 100%
rename from hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/raid/XOREncoder.java
rename to hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/XOREncoder.java
diff --git a/hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/PolicyInfo.java b/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/protocol/PolicyInfo.java
similarity index 100%
rename from hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/PolicyInfo.java
rename to hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/protocol/PolicyInfo.java
diff --git a/hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/PolicyList.java b/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/protocol/PolicyList.java
similarity index 100%
rename from hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/PolicyList.java
rename to hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/protocol/PolicyList.java
diff --git a/hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/RaidProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/protocol/RaidProtocol.java
similarity index 100%
rename from hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/RaidProtocol.java
rename to hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/protocol/RaidProtocol.java
diff --git a/hadoop-mapreduce-project/src/contrib/raid/bin/start-raidnode-remote.sh b/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/sbin/start-raidnode-remote.sh
similarity index 100%
rename from hadoop-mapreduce-project/src/contrib/raid/bin/start-raidnode-remote.sh
rename to hadoop-hdfs-project/hadoop-hdfs-raid/src/main/sbin/start-raidnode-remote.sh
diff --git a/hadoop-mapreduce-project/src/contrib/raid/bin/start-raidnode.sh b/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/sbin/start-raidnode.sh
similarity index 100%
rename from hadoop-mapreduce-project/src/contrib/raid/bin/start-raidnode.sh
rename to hadoop-hdfs-project/hadoop-hdfs-raid/src/main/sbin/start-raidnode.sh
diff --git a/hadoop-mapreduce-project/src/contrib/raid/bin/stop-raidnode-remote.sh b/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/sbin/stop-raidnode-remote.sh
similarity index 100%
rename from hadoop-mapreduce-project/src/contrib/raid/bin/stop-raidnode-remote.sh
rename to hadoop-hdfs-project/hadoop-hdfs-raid/src/main/sbin/stop-raidnode-remote.sh
diff --git a/hadoop-mapreduce-project/src/contrib/raid/bin/stop-raidnode.sh b/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/sbin/stop-raidnode.sh
similarity index 100%
rename from hadoop-mapreduce-project/src/contrib/raid/bin/stop-raidnode.sh
rename to hadoop-hdfs-project/hadoop-hdfs-raid/src/main/sbin/stop-raidnode.sh
diff --git a/hadoop-mapreduce-project/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java b/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/hdfs/TestRaidDfs.java
similarity index 99%
rename from hadoop-mapreduce-project/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java
rename to hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/hdfs/TestRaidDfs.java
index ffdb4edfa0..36aab52ee5 100644
--- a/hadoop-mapreduce-project/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/hdfs/TestRaidDfs.java
@@ -47,8 +47,8 @@
public class TestRaidDfs extends TestCase {
final static String TEST_DIR = new File(System.getProperty("test.build.data",
- "build/contrib/raid/test/data")).getAbsolutePath();
- final static String LOG_DIR = "/raidlog";
+ "target/test-data")).getAbsolutePath();
+ final static String LOG_DIR = "target/raidlog";
final static long RELOAD_INTERVAL = 1000;
final static Log LOG = LogFactory.getLog("org.apache.hadoop.raid.TestRaidDfs");
final static int NUM_DATANODES = 3;
@@ -414,6 +414,7 @@ public static boolean validateFile(FileSystem fileSys, Path name, long length,
LOG.info(" Newcrc " + newcrc.getValue() + " old crc " + crc);
if (newcrc.getValue() != crc) {
LOG.info("CRC mismatch of file " + name + ": " + newcrc + " vs. " + crc);
+ return false;
}
return true;
}
diff --git a/hadoop-mapreduce-project/src/contrib/raid/src/test/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockPlacementPolicyRaid.java b/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockPlacementPolicyRaid.java
similarity index 100%
rename from hadoop-mapreduce-project/src/contrib/raid/src/test/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockPlacementPolicyRaid.java
rename to hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockPlacementPolicyRaid.java
diff --git a/hadoop-mapreduce-project/src/contrib/raid/src/test/org/apache/hadoop/hdfs/server/namenode/NameNodeRaidTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRaidTestUtil.java
similarity index 100%
rename from hadoop-mapreduce-project/src/contrib/raid/src/test/org/apache/hadoop/hdfs/server/namenode/NameNodeRaidTestUtil.java
rename to hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRaidTestUtil.java
diff --git a/hadoop-mapreduce-project/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixer.java b/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestBlockFixer.java
similarity index 95%
rename from hadoop-mapreduce-project/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixer.java
rename to hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestBlockFixer.java
index 10a7212645..8986bea8da 100644
--- a/hadoop-mapreduce-project/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestBlockFixer.java
@@ -26,12 +26,14 @@
import java.util.Random;
import java.util.zip.CRC32;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.junit.Test;
import static org.junit.Assert.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.JarFinder;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -39,6 +41,8 @@
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
@@ -53,9 +57,11 @@ public class TestBlockFixer {
final static Log LOG = LogFactory.getLog(
"org.apache.hadoop.raid.TestBlockFixer");
final static String TEST_DIR = new File(System.getProperty("test.build.data",
- "build/contrib/raid/test/data")).getAbsolutePath();
+ "target/test-data")).getAbsolutePath();
final static String CONFIG_FILE = new File(TEST_DIR,
"test-raid.xml").getAbsolutePath();
+ public static final String DistBlockFixer_JAR =
+ JarFinder.getJar(DistBlockFixer.class);
final static long RELOAD_INTERVAL = 1000;
final static int NUM_DATANODES = 3;
Configuration conf;
@@ -546,6 +552,8 @@ protected void mySetup(int stripeLength, int timeBeforeHar) throws Exception {
conf.setBoolean("dfs.permissions", false);
+ conf.set("mapreduce.framework.name", "yarn");
+
dfs = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build();
dfs.waitActive();
fileSys = dfs.getFileSystem();
@@ -553,11 +561,28 @@ protected void mySetup(int stripeLength, int timeBeforeHar) throws Exception {
FileSystem.setDefaultUri(conf, namenode);
mr = new MiniMRCluster(4, namenode, 3);
- jobTrackerName = "localhost:" + mr.getJobTrackerPort();
+ JobConf jobConf = mr.createJobConf();
+ jobTrackerName = "localhost:" + jobConf.get(JTConfig.JT_IPC_ADDRESS);
hftp = "hftp://localhost.localdomain:" + dfs.getNameNodePort();
FileSystem.setDefaultUri(conf, namenode);
conf.set("mapred.job.tracker", jobTrackerName);
+ conf.set("mapreduce.framework.name", "yarn");
+ String rmAdress = jobConf.get("yarn.resourcemanager.address");
+ if (rmAdress != null) {
+ conf.set("yarn.resourcemanager.address", rmAdress);
+ }
+ String schedulerAdress =
+ jobConf.get("yarn.resourcemanager.scheduler.address");
+ if (schedulerAdress != null) {
+ conf.set("yarn.resourcemanager.scheduler.address", schedulerAdress);
+ }
+ String jobHistoryAddress =
+ jobConf.get("mapreduce.jobhistory.address");
+ if (jobHistoryAddress != null) {
+ conf.set("mapreduce.jobhistory.address", jobHistoryAddress);
+ }
+ conf.set(JobContext.JAR, TestBlockFixer.DistBlockFixer_JAR);
FileWriter fileWriter = new FileWriter(CONFIG_FILE);
fileWriter.write("\n");
@@ -609,10 +634,11 @@ protected void myTearDown() throws Exception {
if (dfs != null) { dfs.shutdown(); }
}
- private long getCRC(FileSystem fs, Path p) throws IOException {
+ public static long getCRC(FileSystem fs, Path p) throws IOException {
CRC32 crc = new CRC32();
FSDataInputStream stm = fs.open(p);
- for (int b = 0; b > 0; b = stm.read()) {
+ int b;
+ while ((b = stm.read())>=0) {
crc.update(b);
}
stm.close();
diff --git a/hadoop-mapreduce-project/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixerBlockFixDist.java b/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestBlockFixerBlockFixDist.java
similarity index 100%
rename from hadoop-mapreduce-project/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixerBlockFixDist.java
rename to hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestBlockFixerBlockFixDist.java
diff --git a/hadoop-mapreduce-project/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixerDistConcurrency.java b/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestBlockFixerDistConcurrency.java
similarity index 100%
rename from hadoop-mapreduce-project/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixerDistConcurrency.java
rename to hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestBlockFixerDistConcurrency.java
diff --git a/hadoop-mapreduce-project/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixerGeneratedBlockDist.java b/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestBlockFixerGeneratedBlockDist.java
similarity index 100%
rename from hadoop-mapreduce-project/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixerGeneratedBlockDist.java
rename to hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestBlockFixerGeneratedBlockDist.java
diff --git a/hadoop-mapreduce-project/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixerParityBlockFixDist.java b/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestBlockFixerParityBlockFixDist.java
similarity index 100%
rename from hadoop-mapreduce-project/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixerParityBlockFixDist.java
rename to hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestBlockFixerParityBlockFixDist.java
diff --git a/hadoop-mapreduce-project/src/contrib/raid/src/test/org/apache/hadoop/raid/TestDirectoryTraversal.java b/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestDirectoryTraversal.java
similarity index 98%
rename from hadoop-mapreduce-project/src/contrib/raid/src/test/org/apache/hadoop/raid/TestDirectoryTraversal.java
rename to hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestDirectoryTraversal.java
index 93658a5205..4ab1107c02 100644
--- a/hadoop-mapreduce-project/src/contrib/raid/src/test/org/apache/hadoop/raid/TestDirectoryTraversal.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestDirectoryTraversal.java
@@ -40,7 +40,7 @@ public class TestDirectoryTraversal extends TestCase {
final static Log LOG = LogFactory.getLog(
"org.apache.hadoop.raid.TestDirectoryTraversal");
final static String TEST_DIR = new File(System.getProperty("test.build.data",
- "build/contrib/raid/test/data")).getAbsolutePath();
+ "target/test-data")).getAbsolutePath();
MiniDFSCluster dfs = null;
FileSystem fs = null;
@@ -211,7 +211,7 @@ private void createTestFile(Path file) throws IOException {
private void mySetup() throws IOException {
conf = new Configuration();
- dfs = new MiniDFSCluster(conf, 6, true, null);
+ dfs = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
dfs.waitActive();
fs = dfs.getFileSystem();
}
diff --git a/hadoop-mapreduce-project/src/contrib/raid/src/test/org/apache/hadoop/raid/TestErasureCodes.java b/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestErasureCodes.java
similarity index 81%
rename from hadoop-mapreduce-project/src/contrib/raid/src/test/org/apache/hadoop/raid/TestErasureCodes.java
rename to hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestErasureCodes.java
index d1d3f60f9e..6d60b574a3 100644
--- a/hadoop-mapreduce-project/src/contrib/raid/src/test/org/apache/hadoop/raid/TestErasureCodes.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestErasureCodes.java
@@ -169,6 +169,57 @@ public void testXorPerformance() {
assertTrue("Decode failed", java.util.Arrays.equals(copy, message[0]));
}
+ public void testComputeErrorLocations() {
+ for (int i = 0; i < TEST_TIMES; ++i) {
+ verifyErrorLocations(10, 4, 1);
+ verifyErrorLocations(10, 4, 2);
+ }
+ }
+
+ public void verifyErrorLocations(int stripeSize, int paritySize, int errors) {
+ int[] message = new int[stripeSize];
+ int[] parity = new int[paritySize];
+ Set errorLocations = new HashSet();
+ for (int i = 0; i < message.length; ++i) {
+ message[i] = RAND.nextInt(256);
+ }
+ while (errorLocations.size() < errors) {
+ int loc = RAND.nextInt(stripeSize + paritySize);
+ errorLocations.add(loc);
+ }
+ ReedSolomonCode codec = new ReedSolomonCode(stripeSize, paritySize);
+ codec.encode(message, parity);
+ int[] data = combineArrays(parity, message);
+ for (Integer i : errorLocations) {
+ data[i] = randError(data[i]);
+ }
+ Set recoveredLocations = new HashSet();
+ boolean resolved = codec.computeErrorLocations(data, recoveredLocations);
+ if (resolved) {
+ assertEquals(errorLocations, recoveredLocations);
+ }
+ }
+
+ private int randError(int actual) {
+ while (true) {
+ int r = RAND.nextInt(256);
+ if (r != actual) {
+ return r;
+ }
+ }
+ }
+
+ private int[] combineArrays(int[] array1, int[] array2) {
+ int[] result = new int[array1.length + array2.length];
+ for (int i = 0; i < array1.length; ++i) {
+ result[i] = array1[i];
+ }
+ for (int i = 0; i < array2.length; ++i) {
+ result[i + array1.length] = array2[i];
+ }
+ return result;
+ }
+
private int[] randomErasedLocation(int erasedLen, int dataLen) {
int[] erasedLocations = new int[erasedLen];
for (int i = 0; i < erasedLen; i++) {
diff --git a/hadoop-mapreduce-project/src/contrib/raid/src/test/org/apache/hadoop/raid/TestGaloisField.java b/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestGaloisField.java
similarity index 100%
rename from hadoop-mapreduce-project/src/contrib/raid/src/test/org/apache/hadoop/raid/TestGaloisField.java
rename to hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestGaloisField.java
diff --git a/hadoop-mapreduce-project/src/contrib/raid/src/test/org/apache/hadoop/raid/TestHarIndexParser.java b/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestHarIndexParser.java
similarity index 100%
rename from hadoop-mapreduce-project/src/contrib/raid/src/test/org/apache/hadoop/raid/TestHarIndexParser.java
rename to hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestHarIndexParser.java
diff --git a/hadoop-mapreduce-project/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidFilter.java b/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestRaidFilter.java
similarity index 98%
rename from hadoop-mapreduce-project/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidFilter.java
rename to hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestRaidFilter.java
index 4b1fe67205..037c682df3 100644
--- a/hadoop-mapreduce-project/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidFilter.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestRaidFilter.java
@@ -36,7 +36,7 @@
public class TestRaidFilter extends TestCase {
final static String TEST_DIR = new File(System.getProperty("test.build.data",
- "build/contrib/raid/test/data")).getAbsolutePath();
+ "target/test-data")).getAbsolutePath();
final static Log LOG =
LogFactory.getLog("org.apache.hadoop.raid.TestRaidFilter");
diff --git a/hadoop-mapreduce-project/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidHar.java b/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestRaidHar.java
similarity index 92%
rename from hadoop-mapreduce-project/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidHar.java
rename to hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestRaidHar.java
index 34398048b7..d3aeab7bff 100644
--- a/hadoop-mapreduce-project/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidHar.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestRaidHar.java
@@ -22,6 +22,7 @@
import java.io.FileNotFoundException;
import java.util.Random;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -34,6 +35,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MiniMRCluster;
/**
@@ -41,7 +43,7 @@
*/
public class TestRaidHar extends TestCase {
final static String TEST_DIR = new File(System.getProperty("test.build.data",
- "build/contrib/raid/test/data")).getAbsolutePath();
+ "target/test-data")).getAbsolutePath();
final static String CONFIG_FILE = new File(TEST_DIR,
"test-raid.xml").getAbsolutePath();
final static long RELOAD_INTERVAL = 1000;
@@ -96,11 +98,27 @@ private void createClusters(boolean local) throws Exception {
fileSys = dfs.getFileSystem();
namenode = fileSys.getUri().toString();
mr = new MiniMRCluster(taskTrackers, namenode, 3);
- jobTrackerName = "localhost:" + mr.getJobTrackerPort();
+ JobConf jobConf = mr.createJobConf();
+ jobTrackerName = "localhost:" + jobConf.get(JTConfig.JT_IPC_ADDRESS);
hftp = "hftp://localhost.localdomain:" + dfs.getNameNodePort();
FileSystem.setDefaultUri(conf, namenode);
conf.set("mapred.job.tracker", jobTrackerName);
+ conf.set("mapreduce.framework.name", "yarn");
+ String rmAdress = jobConf.get("yarn.resourcemanager.address");
+ if (rmAdress != null) {
+ conf.set("yarn.resourcemanager.address", rmAdress);
+ }
+ String schedulerAdress =
+ jobConf.get("yarn.resourcemanager.scheduler.address");
+ if (schedulerAdress != null) {
+ conf.set("yarn.resourcemanager.scheduler.address", schedulerAdress);
+ }
+ String jobHistoryAddress =
+ jobConf.get("mapreduce.jobhistory.address");
+ if (jobHistoryAddress != null) {
+ conf.set("mapreduce.jobhistory.address", jobHistoryAddress);
+ }
}
/**
diff --git a/hadoop-mapreduce-project/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidNode.java b/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestRaidNode.java
similarity index 88%
rename from hadoop-mapreduce-project/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidNode.java
rename to hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestRaidNode.java
index 355d9ad79d..5bdee75068 100644
--- a/hadoop-mapreduce-project/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestRaidNode.java
@@ -37,9 +37,13 @@
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.raid.protocol.PolicyInfo;
import org.apache.hadoop.raid.protocol.PolicyList;
+import org.apache.hadoop.util.JarFinder;
import org.apache.hadoop.raid.protocol.PolicyInfo.ErasureCodeType;
/**
@@ -49,7 +53,8 @@
*/
public class TestRaidNode extends TestCase {
final static String TEST_DIR = new File(System.getProperty("test.build.data",
- "build/contrib/raid/test/data")).getAbsolutePath();
+ "target/test-data")).getAbsolutePath();
+ public static final String DistRaid_JAR = JarFinder.getJar(DistRaid.class);
final static String CONFIG_FILE = new File(TEST_DIR,
"test-raid.xml").getAbsolutePath();
final static long RELOAD_INTERVAL = 1000;
@@ -76,6 +81,8 @@ private void createClusters(boolean local) throws Exception {
conf.setBoolean("raid.config.reload", true);
conf.setLong("raid.config.reload.interval", RELOAD_INTERVAL);
conf.setBoolean("dfs.permissions.enabled", true);
+ conf.setLong(JobMonitor.JOBMONITOR_INTERVAL_KEY, 20000);
+ conf.setLong(RaidNode.TRIGGER_MONITOR_SLEEP_TIME_KEY, 3000L);
// scan all policies once every 5 second
conf.setLong("raid.policy.rescan.interval", 5000);
@@ -103,11 +110,27 @@ private void createClusters(boolean local) throws Exception {
namenode = fileSys.getUri().toString();
final int taskTrackers = 4;
mr = new MiniMRCluster(taskTrackers, namenode, 3);
- jobTrackerName = "localhost:" + mr.getJobTrackerPort();
+ JobConf jobConf = mr.createJobConf();
+ jobTrackerName = "localhost:" + jobConf.get(JTConfig.JT_IPC_ADDRESS);
hftp = "hftp://localhost.localdomain:" + dfs.getNameNodePort();
FileSystem.setDefaultUri(conf, namenode);
conf.set("mapred.job.tracker", jobTrackerName);
+ conf.set("mapreduce.framework.name", "yarn");
+ String rmAdress = jobConf.get("yarn.resourcemanager.address");
+ if (rmAdress != null) {
+ conf.set("yarn.resourcemanager.address", rmAdress);
+ }
+ String schedulerAdress =
+ jobConf.get("yarn.resourcemanager.scheduler.address");
+ if (schedulerAdress != null) {
+ conf.set("yarn.resourcemanager.scheduler.address", schedulerAdress);
+ }
+ String jobHistoryAddress =
+ jobConf.get("mapreduce.jobhistory.address");
+ if (jobHistoryAddress != null) {
+ conf.set("mapreduce.jobhistory.address", jobHistoryAddress);
+ }
}
class ConfigBuilder {
@@ -238,9 +261,9 @@ public void testPathFilter() throws Exception {
LOG.info("Test testPathFilter started.");
long blockSizes [] = {1024L};
- long stripeLengths [] = {1, 2, 5, 6, 10, 11, 12};
- long targetReplication = 1;
- long metaReplication = 1;
+ int stripeLengths [] = {5, 6, 10, 11, 12};
+ int targetReplication = 1;
+ int metaReplication = 1;
int numBlock = 11;
int iter = 0;
@@ -284,7 +307,8 @@ private void doTestPathFilter(int iter, long targetReplication,
LOG.info("doTestPathFilter created test files for iteration " + iter);
// create an instance of the RaidNode
- cnode = RaidNode.createRaidNode(null, conf);
+ Configuration localConf = new Configuration(conf);
+ cnode = RaidNode.createRaidNode(null, localConf);
FileStatus[] listPaths = null;
// wait till file is raided
@@ -314,7 +338,6 @@ private void doTestPathFilter(int iter, long targetReplication,
}
// assertEquals(listPaths.length, 1); // all files raided
LOG.info("doTestPathFilter all files found in Raid.");
- Thread.sleep(20000); // Without this wait, unit test crashes
// check for error at beginning of file
shell = new RaidShell(conf);
@@ -466,16 +489,23 @@ private void doCheckPolicy() throws Exception {
LOG.info("doCheckPolicy completed:");
}
- private void createTestFiles(String path, String destpath) throws IOException {
+ static public void createTestFiles(FileSystem fileSys,
+ String path, String destpath, int nfile,
+ int nblock) throws IOException {
+ createTestFiles(fileSys, path, destpath, nfile, nblock, (short)1);
+ }
+
+ static void createTestFiles(FileSystem fileSys, String path, String destpath, int nfile,
+ int nblock, short repl) throws IOException {
long blockSize = 1024L;
Path dir = new Path(path);
Path destPath = new Path(destpath);
fileSys.delete(dir, true);
fileSys.delete(destPath, true);
- for(int i = 0 ; i < 10; i++){
+ for(int i = 0 ; i < nfile; i++){
Path file = new Path(path + "file" + i);
- createOldFile(fileSys, file, 1, 7, blockSize);
+ createOldFile(fileSys, file, repl, nblock, blockSize);
}
}
@@ -499,12 +529,15 @@ public void testDistRaid() throws Exception {
RaidNode cnode = null;
try {
- createTestFiles("/user/dhruba/raidtest/", "/destraid/user/dhruba/raidtest");
- createTestFiles("/user/dhruba/raidtest2/", "/destraid/user/dhruba/raidtest2");
+ createTestFiles(fileSys, "/user/dhruba/raidtest/",
+ "/destraid/user/dhruba/raidtest", 5, 7);
+ createTestFiles(fileSys, "/user/dhruba/raidtest2/",
+ "/destraid/user/dhruba/raidtest2", 5, 7);
LOG.info("Test testDistRaid created test files");
Configuration localConf = new Configuration(conf);
localConf.set(RaidNode.RAID_LOCATION_KEY, "/destraid");
+ localConf.set(JobContext.JAR, TestRaidNode.DistRaid_JAR);
cnode = RaidNode.createRaidNode(null, localConf);
// Verify the policies are parsed correctly
for (PolicyList policyList : cnode.getAllPolicies()) {
@@ -540,15 +573,13 @@ public void testDistRaid() throws Exception {
System.currentTimeMillis() - start < MAX_WAITTIME) {
Thread.sleep(1000);
}
- assertEquals(dcnode.jobMonitor.jobsMonitored(), 2);
-
+
start = System.currentTimeMillis();
while (dcnode.jobMonitor.jobsSucceeded() < 2 &&
System.currentTimeMillis() - start < MAX_WAITTIME) {
Thread.sleep(1000);
}
- assertEquals(dcnode.jobMonitor.jobsSucceeded(), 2);
-
+ assertEquals(dcnode.jobMonitor.jobsSucceeded(), dcnode.jobMonitor.jobsMonitored());
LOG.info("Test testDistRaid successful.");
} catch (Exception e) {
@@ -647,24 +678,19 @@ public void testSuspendTraversal() throws Exception {
RaidNode cnode = null;
try {
- createTestFiles(
- "/user/dhruba/raidtest/1/", "/destraid/user/dhruba/raidtest/1");
- createTestFiles(
- "/user/dhruba/raidtest/2/", "/destraid/user/dhruba/raidtest/2");
- createTestFiles(
- "/user/dhruba/raidtest/3/", "/destraid/user/dhruba/raidtest/3");
- createTestFiles(
- "/user/dhruba/raidtest/4/", "/destraid/user/dhruba/raidtest/4");
+ for(int i = 0; i < 4; i++){
+ Path file = new Path("/user/dhruba/raidtest/dir" + i + "/file" + i);
+ createOldFile(fileSys, file, 1, 7, 1024L);
+ }
+
LOG.info("Test testSuspendTraversal created test files");
Configuration localConf = new Configuration(conf);
- localConf.set(RaidNode.RAID_LOCATION_KEY, "/destraid");
- localConf.setInt("raid.distraid.max.files", 3);
+ localConf.setInt("raid.distraid.max.jobs", 2);
+ localConf.setInt("raid.distraid.max.files", 2);
localConf.setInt("raid.directorytraversal.threads", 1);
- // This is too dependent on the implementation of getFilteredFiles().
- // It relies on the threading behavior where two directories are traversed
- // before returning because the list of files is modified in a separate
- // thread from the one that decides if there are enough files.
+ localConf.set(JobContext.JAR, TestRaidNode.DistRaid_JAR);
+ // 4 test files: 2 jobs with 2 files each.
final int numJobsExpected = 2;
cnode = RaidNode.createRaidNode(null, localConf);
@@ -677,10 +703,20 @@ public void testSuspendTraversal() throws Exception {
start = System.currentTimeMillis();
while (dcnode.jobMonitor.jobsSucceeded() < numJobsExpected &&
System.currentTimeMillis() - start < MAX_WAITTIME) {
+ LOG.info("Waiting for num jobs succeeded " + dcnode.jobMonitor.jobsSucceeded() +
+ " to reach " + numJobsExpected);
+ Thread.sleep(3000);
+ }
+ // Wait for any running jobs to finish.
+ start = System.currentTimeMillis();
+ while (dcnode.jobMonitor.runningJobsCount() > 0 &&
+ System.currentTimeMillis() - start < MAX_WAITTIME) {
+ LOG.info("Waiting for zero running jobs: " +
+ dcnode.jobMonitor.runningJobsCount());
Thread.sleep(1000);
}
- assertEquals(dcnode.jobMonitor.jobsMonitored(), numJobsExpected);
- assertEquals(dcnode.jobMonitor.jobsSucceeded(), numJobsExpected);
+ assertEquals(numJobsExpected, dcnode.jobMonitor.jobsMonitored());
+ assertEquals(numJobsExpected, dcnode.jobMonitor.jobsSucceeded());
LOG.info("Test testSuspendTraversal successful.");
diff --git a/hadoop-mapreduce-project/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidPurge.java b/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestRaidPurge.java
similarity index 95%
rename from hadoop-mapreduce-project/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidPurge.java
rename to hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestRaidPurge.java
index accef0b39a..ca6dc4a33d 100644
--- a/hadoop-mapreduce-project/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidPurge.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestRaidPurge.java
@@ -51,6 +51,7 @@
import org.apache.hadoop.raid.protocol.PolicyList;
import org.apache.hadoop.hdfs.TestRaidDfs;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.raid.protocol.PolicyInfo;
/**
@@ -58,7 +59,7 @@
*/
public class TestRaidPurge extends TestCase {
final static String TEST_DIR = new File(System.getProperty("test.build.data",
- "build/contrib/raid/test/data")).getAbsolutePath();
+ "target/test-data")).getAbsolutePath();
final static String CONFIG_FILE = new File(TEST_DIR,
"test-raid.xml").getAbsolutePath();
final static long RELOAD_INTERVAL = 1000;
@@ -113,11 +114,27 @@ private void createClusters(boolean local) throws Exception {
fileSys = dfs.getFileSystem();
namenode = fileSys.getUri().toString();
mr = new MiniMRCluster(taskTrackers, namenode, 3);
- jobTrackerName = "localhost:" + mr.getJobTrackerPort();
+ JobConf jobConf = mr.createJobConf();
+ jobTrackerName = "localhost:" + jobConf.get(JTConfig.JT_IPC_ADDRESS);
hftp = "hftp://localhost.localdomain:" + dfs.getNameNodePort();
FileSystem.setDefaultUri(conf, namenode);
conf.set("mapred.job.tracker", jobTrackerName);
+ conf.set("mapreduce.framework.name", "yarn");
+ String rmAdress = jobConf.get("yarn.resourcemanager.address");
+ if (rmAdress != null) {
+ conf.set("yarn.resourcemanager.address", rmAdress);
+ }
+ String schedulerAdress =
+ jobConf.get("yarn.resourcemanager.scheduler.address");
+ if (schedulerAdress != null) {
+ conf.set("yarn.resourcemanager.scheduler.address", schedulerAdress);
+ }
+ String jobHistoryAddress =
+ jobConf.get("mapreduce.jobhistory.address");
+ if (jobHistoryAddress != null) {
+ conf.set("mapreduce.jobhistory.address", jobHistoryAddress);
+ }
}
/**
@@ -235,6 +252,7 @@ private void doTestPurge(int iter, long targetReplication,
// create an instance of the RaidNode
Configuration localConf = new Configuration(conf);
+
localConf.set(RaidNode.RAID_LOCATION_KEY, "/destraid");
cnode = RaidNode.createRaidNode(null, localConf);
FileStatus[] listPaths = null;
@@ -299,7 +317,7 @@ public void testPurgeHar() throws Exception {
createClusters(true);
mySetup(1, 1, 5, harDelay);
Path dir = new Path("/user/dhruba/raidtest/");
- Path destPath = new Path("/destraid/user/dhruba/raidtest");
+ Path destPath = new Path("/raid/user/dhruba/raidtest");
Path file1 = new Path(dir + "/file");
RaidNode cnode = null;
try {
@@ -308,7 +326,6 @@ public void testPurgeHar() throws Exception {
// create an instance of the RaidNode
Configuration localConf = new Configuration(conf);
- localConf.set(RaidNode.RAID_LOCATION_KEY, "/destraid");
cnode = RaidNode.createRaidNode(null, localConf);
// Wait till har is created.
@@ -334,14 +351,7 @@ public void testPurgeHar() throws Exception {
boolean found = false;
FileStatus[] listPaths = null;
while (!found || listPaths == null || listPaths.length > 1) {
- try {
- listPaths = fileSys.listStatus(destPath);
- } catch (FileNotFoundException e) {
- // If the parent directory is deleted because the har is deleted
- // and the parent is empty, try again.
- Thread.sleep(1000);
- continue;
- }
+ listPaths = fileSys.listStatus(destPath);
if (listPaths != null) {
for (FileStatus s: listPaths) {
LOG.info("testPurgeHar waiting for parity file to be recreated" +
diff --git a/hadoop-mapreduce-project/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidShell.java b/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestRaidShell.java
similarity index 98%
rename from hadoop-mapreduce-project/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidShell.java
rename to hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestRaidShell.java
index 9375e405e1..c283ce996f 100644
--- a/hadoop-mapreduce-project/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidShell.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestRaidShell.java
@@ -47,7 +47,7 @@ public class TestRaidShell extends TestCase {
final static Log LOG = LogFactory.getLog(
"org.apache.hadoop.raid.TestRaidShell");
final static String TEST_DIR = new File(System.getProperty("test.build.data",
- "build/contrib/raid/test/data")).getAbsolutePath();
+ "target/test-data")).getAbsolutePath();
final static String CONFIG_FILE = new File(TEST_DIR,
"test-raid.xml").getAbsolutePath();
final static long RELOAD_INTERVAL = 1000;
@@ -249,7 +249,8 @@ private void myTearDown() throws Exception {
private long getCRC(FileSystem fs, Path p) throws IOException {
CRC32 crc = new CRC32();
FSDataInputStream stm = fs.open(p);
- for (int b = 0; b > 0; b = stm.read()) {
+ int b;
+ while ((b = stm.read())>=0) {
crc.update(b);
}
stm.close();
diff --git a/hadoop-mapreduce-project/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidShellFsck.java b/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestRaidShellFsck.java
similarity index 99%
rename from hadoop-mapreduce-project/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidShellFsck.java
rename to hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestRaidShellFsck.java
index 284162197b..fad14ead74 100644
--- a/hadoop-mapreduce-project/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidShellFsck.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestRaidShellFsck.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http:www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -51,8 +51,8 @@ public class TestRaidShellFsck {
LogFactory.getLog("org.apache.hadoop.raid.TestRaidShellFsck");
final static String TEST_DIR =
new File(System.
- getProperty("test.build.data", "build/contrib/raid/test/data")).
- getAbsolutePath();
+ getProperty("test.build.data", "target/test-data")).getAbsolutePath();
+
final static String CONFIG_FILE = new File(TEST_DIR, "test-raid.xml").
getAbsolutePath();
final static long RELOAD_INTERVAL = 1000;
@@ -262,7 +262,7 @@ private void raidTestFiles(Path raidPath, Path[] filePaths, boolean doHar)
}
} else {
- // case without HAR
+ // case without HAR
for (FileStatus f : listPaths) {
Path found = new Path(f.getPath().toUri().getPath());
if (parityFilePath.equals(found)) {
diff --git a/hadoop-mapreduce-project/src/contrib/raid/src/test/org/apache/hadoop/raid/TestReedSolomonDecoder.java b/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestReedSolomonDecoder.java
similarity index 98%
rename from hadoop-mapreduce-project/src/contrib/raid/src/test/org/apache/hadoop/raid/TestReedSolomonDecoder.java
rename to hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestReedSolomonDecoder.java
index 31704afbe6..5f47cee293 100644
--- a/hadoop-mapreduce-project/src/contrib/raid/src/test/org/apache/hadoop/raid/TestReedSolomonDecoder.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestReedSolomonDecoder.java
@@ -42,7 +42,7 @@ public class TestReedSolomonDecoder extends TestCase {
final static Log LOG = LogFactory.getLog(
"org.apache.hadoop.raid.TestReedSolomonDecoder");
final static String TEST_DIR = new File(System.getProperty("test.build.data",
- "build/contrib/raid/test/data")).getAbsolutePath();
+ "target/test-data")).getAbsolutePath();
final static int NUM_DATANODES = 3;
Configuration conf;
diff --git a/hadoop-mapreduce-project/src/contrib/raid/src/test/org/apache/hadoop/raid/TestReedSolomonEncoder.java b/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestReedSolomonEncoder.java
similarity index 98%
rename from hadoop-mapreduce-project/src/contrib/raid/src/test/org/apache/hadoop/raid/TestReedSolomonEncoder.java
rename to hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestReedSolomonEncoder.java
index 9815591c54..bd1201413a 100644
--- a/hadoop-mapreduce-project/src/contrib/raid/src/test/org/apache/hadoop/raid/TestReedSolomonEncoder.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestReedSolomonEncoder.java
@@ -49,7 +49,7 @@ public class TestReedSolomonEncoder extends TestCase {
final static Log LOG = LogFactory.getLog(
"org.apache.hadoop.raid.TestReedSolomonEncoder");
final static String TEST_DIR = new File(System.getProperty("test.build.data",
- "build/contrib/raid/test/data")).getAbsolutePath();
+ "target/test-data")).getAbsolutePath();
final static int NUM_DATANODES = 3;
Configuration conf;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
index a6df657c8a..1e043f43fd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
@@ -31,7 +31,7 @@
/** I-node for closed file. */
@InterfaceAudience.Private
-class INodeFile extends INode implements BlockCollection {
+public class INodeFile extends INode implements BlockCollection {
static final FsPermission UMASK = FsPermission.createImmutable((short)0111);
//Number of bits for Block size
diff --git a/hadoop-hdfs-project/pom.xml b/hadoop-hdfs-project/pom.xml
index 27161004a3..38c93fa8bd 100644
--- a/hadoop-hdfs-project/pom.xml
+++ b/hadoop-hdfs-project/pom.xml
@@ -34,6 +34,7 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
hadoop-hdfs
hadoop-hdfs-httpfs
hadoop-hdfs/src/contrib/bkjournal
+ hadoop-hdfs-raid
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index ce0e8e031c..04e09a6a6f 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -118,6 +118,8 @@ Trunk (unreleased changes)
MAPREDUCE-3990. MRBench allows Long-sized input-lines value
but parses CLI argument as an Integer. (harsh)
+ MAPREDUCE-3868. Make Raid Compile. (Weiyan Wang via schen)
+
Branch-2 ( Unreleased changes )
INCOMPATIBLE CHANGES
diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml
index 5378c61911..ace8f75f99 100644
--- a/hadoop-project/pom.xml
+++ b/hadoop-project/pom.xml
@@ -257,6 +257,12 @@
hadoop-client
${project.version}
+
+ org.apache.hadoop
+ hadoop-hdfs-raid
+ ${project.version}
+
+
org.apache.hadoop
hadoop-minicluster