HDFS-9646. ErasureCodingWorker may fail when recovering data blocks with length less than the first internal block. Contributed by Jing Zhao.

This commit is contained in:
Jing Zhao 2016-01-22 09:46:02 -08:00
parent 34a3900773
commit 95363bcc7d
8 changed files with 270 additions and 133 deletions

View File

@ -32,7 +32,6 @@
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException; import java.util.concurrent.CancellationException;
@ -909,7 +908,8 @@ private synchronized int readBuffer(ReaderStrategy reader, int off, int len,
} }
} }
protected synchronized int readWithStrategy(ReaderStrategy strategy, int off, int len) throws IOException { protected synchronized int readWithStrategy(ReaderStrategy strategy, int off,
int len) throws IOException {
dfsClient.checkOpen(); dfsClient.checkOpen();
if (closed.get()) { if (closed.get()) {
throw new IOException("Stream closed"); throw new IOException("Stream closed");
@ -959,7 +959,7 @@ protected synchronized int readWithStrategy(ReaderStrategy strategy, int off, in
// Check if need to report block replicas corruption either read // Check if need to report block replicas corruption either read
// was successful or ChecksumException occured. // was successful or ChecksumException occured.
reportCheckSumFailure(corruptedBlockMap, reportCheckSumFailure(corruptedBlockMap,
currentLocatedBlock.getLocations().length); currentLocatedBlock.getLocations().length, false);
} }
} }
} }
@ -1492,7 +1492,8 @@ private int pread(long position, byte[] buffer, int offset, int length)
// Check and report if any block replicas are corrupted. // Check and report if any block replicas are corrupted.
// BlockMissingException may be caught if all block replicas are // BlockMissingException may be caught if all block replicas are
// corrupted. // corrupted.
reportCheckSumFailure(corruptedBlockMap, blk.getLocations().length); reportCheckSumFailure(corruptedBlockMap, blk.getLocations().length,
false);
} }
remaining -= bytesToRead; remaining -= bytesToRead;
@ -1508,6 +1509,7 @@ private int pread(long position, byte[] buffer, int offset, int length)
/** /**
* DFSInputStream reports checksum failure. * DFSInputStream reports checksum failure.
* For replicated blocks, we have the following logic:
* Case I : client has tried multiple data nodes and at least one of the * Case I : client has tried multiple data nodes and at least one of the
* attempts has succeeded. We report the other failures as corrupted block to * attempts has succeeded. We report the other failures as corrupted block to
* namenode. * namenode.
@ -1515,29 +1517,39 @@ private int pread(long position, byte[] buffer, int offset, int length)
* only report if the total number of replica is 1. We do not * only report if the total number of replica is 1. We do not
* report otherwise since this maybe due to the client is a handicapped client * report otherwise since this maybe due to the client is a handicapped client
* (who can not read). * (who can not read).
*
* For erasure-coded blocks, each block in corruptedBlockMap is an internal
* block in a block group, and there is usually only one DataNode
* corresponding to each internal block. For this case we simply report the
* corrupted blocks to NameNode and ignore the above logic.
*
* @param corruptedBlockMap map of corrupted blocks * @param corruptedBlockMap map of corrupted blocks
* @param dataNodeCount number of data nodes who contains the block replicas * @param dataNodeCount number of data nodes who contains the block replicas
*/ */
protected void reportCheckSumFailure( protected void reportCheckSumFailure(
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap, Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap,
int dataNodeCount) { int dataNodeCount, boolean isStriped) {
if (corruptedBlockMap.isEmpty()) { if (corruptedBlockMap.isEmpty()) {
return; return;
} }
Iterator<Entry<ExtendedBlock, Set<DatanodeInfo>>> it = corruptedBlockMap List<LocatedBlock> reportList = new ArrayList<>(corruptedBlockMap.size());
.entrySet().iterator(); for (Map.Entry<ExtendedBlock, Set<DatanodeInfo>> entry :
Entry<ExtendedBlock, Set<DatanodeInfo>> entry = it.next(); corruptedBlockMap.entrySet()) {
ExtendedBlock blk = entry.getKey(); ExtendedBlock blk = entry.getKey();
Set<DatanodeInfo> dnSet = entry.getValue(); Set<DatanodeInfo> dnSet = entry.getValue();
if (((dnSet.size() < dataNodeCount) && (dnSet.size() > 0)) if (isStriped || ((dnSet.size() < dataNodeCount) && (dnSet.size() > 0))
|| ((dataNodeCount == 1) && (dnSet.size() == dataNodeCount))) { || ((dataNodeCount == 1) && (dnSet.size() == dataNodeCount))) {
DatanodeInfo[] locs = new DatanodeInfo[dnSet.size()]; DatanodeInfo[] locs = new DatanodeInfo[dnSet.size()];
int i = 0; int i = 0;
for (DatanodeInfo dn:dnSet) { for (DatanodeInfo dn:dnSet) {
locs[i++] = dn; locs[i++] = dn;
} }
LocatedBlock [] lblocks = { new LocatedBlock(blk, locs) }; reportList.add(new LocatedBlock(blk, locs));
dfsClient.reportChecksumFailure(src, lblocks); }
}
if (reportList.size() > 0) {
dfsClient.reportChecksumFailure(src,
reportList.toArray(new LocatedBlock[reportList.size()]));
} }
corruptedBlockMap.clear(); corruptedBlockMap.clear();
} }

View File

@ -451,7 +451,7 @@ protected synchronized int readWithStrategy(ReaderStrategy strategy,
// Check if need to report block replicas corruption either read // Check if need to report block replicas corruption either read
// was successful or ChecksumException occured. // was successful or ChecksumException occured.
reportCheckSumFailure(corruptedBlockMap, reportCheckSumFailure(corruptedBlockMap,
currentLocatedBlock.getLocations().length); currentLocatedBlock.getLocations().length, true);
} }
} }
return -1; return -1;

View File

@ -22,7 +22,6 @@
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSStripedOutputStream; import org.apache.hadoop.hdfs.DFSStripedOutputStream;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@ -35,6 +34,8 @@
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder; import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.*; import java.util.*;
@ -72,6 +73,8 @@
@InterfaceAudience.Private @InterfaceAudience.Private
public class StripedBlockUtil { public class StripedBlockUtil {
public static final Logger LOG = LoggerFactory.getLogger(StripedBlockUtil.class);
/** /**
* This method parses a striped block group into individual blocks. * This method parses a striped block group into individual blocks.
* *
@ -221,15 +224,11 @@ public static StripingChunkReadResult getNextCompletedStripedRead(
return new StripingChunkReadResult(StripingChunkReadResult.TIMEOUT); return new StripingChunkReadResult(StripingChunkReadResult.TIMEOUT);
} }
} catch (ExecutionException e) { } catch (ExecutionException e) {
if (DFSClient.LOG.isDebugEnabled()) { LOG.debug("Exception during striped read task", e);
DFSClient.LOG.debug("Exception during striped read task", e);
}
return new StripingChunkReadResult(futures.remove(future), return new StripingChunkReadResult(futures.remove(future),
StripingChunkReadResult.FAILED); StripingChunkReadResult.FAILED);
} catch (CancellationException e) { } catch (CancellationException e) {
if (DFSClient.LOG.isDebugEnabled()) { LOG.debug("Exception during striped read task", e);
DFSClient.LOG.debug("Exception during striped read task", e);
}
return new StripingChunkReadResult(futures.remove(future), return new StripingChunkReadResult(futures.remove(future),
StripingChunkReadResult.CANCELLED); StripingChunkReadResult.CANCELLED);
} }

View File

@ -406,6 +406,9 @@ Trunk (Unreleased)
HDFS-9615. Fix variable name typo in DFSConfigKeys. (Ray Chiang via HDFS-9615. Fix variable name typo in DFSConfigKeys. (Ray Chiang via
Arpit Agarwal) Arpit Agarwal)
HDFS-9646. ErasureCodingWorker may fail when recovering data blocks with
length less than the first internal block. (jing9)
BREAKDOWN OF HDFS-7285 SUBTASKS AND RELATED JIRAS BREAKDOWN OF HDFS-7285 SUBTASKS AND RELATED JIRAS
HDFS-7347. Configurable erasure coding policy for individual files and HDFS-7347. Configurable erasure coding policy for individual files and

View File

@ -32,8 +32,10 @@
import java.util.Collection; import java.util.Collection;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService; import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorCompletionService;
@ -46,6 +48,7 @@
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.BlockReader; import org.apache.hadoop.hdfs.BlockReader;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
@ -121,9 +124,8 @@ private RawErasureDecoder newDecoder(int numDataUnits, int numParityUnits) {
} }
private void initializeStripedReadThreadPool(int num) { private void initializeStripedReadThreadPool(int num) {
if (LOG.isDebugEnabled()) {
LOG.debug("Using striped reads; pool threads=" + num); LOG.debug("Using striped reads; pool threads=" + num);
}
STRIPED_READ_THREAD_POOL = new ThreadPoolExecutor(1, num, 60, STRIPED_READ_THREAD_POOL = new ThreadPoolExecutor(1, num, 60,
TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
new Daemon.DaemonFactory() { new Daemon.DaemonFactory() {
@ -148,9 +150,7 @@ public void rejectedExecution(Runnable runnable, ThreadPoolExecutor e) {
} }
private void initializeStripedBlkRecoveryThreadPool(int num) { private void initializeStripedBlkRecoveryThreadPool(int num) {
if (LOG.isDebugEnabled()) {
LOG.debug("Using striped block recovery; pool threads=" + num); LOG.debug("Using striped block recovery; pool threads=" + num);
}
STRIPED_BLK_RECOVERY_THREAD_POOL = new ThreadPoolExecutor(2, num, 60, STRIPED_BLK_RECOVERY_THREAD_POOL = new ThreadPoolExecutor(2, num, 60,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
new Daemon.DaemonFactory() { new Daemon.DaemonFactory() {
@ -368,11 +368,11 @@ private long getBlockLen(ExtendedBlock blockGroup, int i) {
* @return StripedReader * @return StripedReader
*/ */
private StripedReader addStripedReader(int i, long offsetInBlock) { private StripedReader addStripedReader(int i, long offsetInBlock) {
StripedReader reader = new StripedReader(liveIndices[i]); final ExtendedBlock block = getBlock(blockGroup, liveIndices[i]);
StripedReader reader = new StripedReader(liveIndices[i], block, sources[i]);
stripedReaders.add(reader); stripedReaders.add(reader);
BlockReader blockReader = newBlockReader( BlockReader blockReader = newBlockReader(block, offsetInBlock, sources[i]);
getBlock(blockGroup, liveIndices[i]), offsetInBlock, sources[i]);
if (blockReader != null) { if (blockReader != null) {
initChecksumAndBufferSizeIfNeeded(blockReader); initChecksumAndBufferSizeIfNeeded(blockReader);
reader.blockReader = blockReader; reader.blockReader = blockReader;
@ -435,19 +435,27 @@ public void run() {
throw new IOException(error); throw new IOException(error);
} }
long firstStripedBlockLength = getBlockLen(blockGroup, 0); long maxTargetLength = 0;
while (positionInBlock < firstStripedBlockLength) { for (short targetIndex : targetIndices) {
int toRead = Math.min( maxTargetLength = Math.max(maxTargetLength,
bufferSize, (int)(firstStripedBlockLength - positionInBlock)); getBlockLen(blockGroup, targetIndex));
}
while (positionInBlock < maxTargetLength) {
final int toRecover = (int) Math.min(
bufferSize, maxTargetLength - positionInBlock);
// step1: read from minimum source DNs required for reconstruction. // step1: read from minimum source DNs required for reconstruction.
// The returned success list is the source DNs we do real read from // The returned success list is the source DNs we do real read from
success = readMinimumStripedData4Recovery(success); Map<ExtendedBlock, Set<DatanodeInfo>> corruptionMap = new HashMap<>();
try {
success = readMinimumStripedData4Recovery(success, toRecover,
corruptionMap);
} finally {
// report corrupted blocks to NN
reportCorruptedBlocks(corruptionMap);
}
// step2: decode to reconstruct targets // step2: decode to reconstruct targets
long remaining = firstStripedBlockLength - positionInBlock; recoverTargets(success, targetsStatus, toRecover);
int toRecoverLen = remaining < bufferSize ?
(int)remaining : bufferSize;
recoverTargets(success, targetsStatus, toRecoverLen);
// step3: transfer data // step3: transfer data
if (transferData2Targets(targetsStatus) == 0) { if (transferData2Targets(targetsStatus) == 0) {
@ -456,7 +464,7 @@ public void run() {
} }
clearBuffers(); clearBuffers();
positionInBlock += toRead; positionInBlock += toRecover;
} }
endTargetBlocks(targetsStatus); endTargetBlocks(targetsStatus);
@ -513,10 +521,11 @@ private void getTargetIndices() {
} }
} }
private long getReadLength(int index) { /** the reading length should not exceed the length for recovery */
private int getReadLength(int index, int recoverLength) {
long blockLen = getBlockLen(blockGroup, index); long blockLen = getBlockLen(blockGroup, index);
long remaining = blockLen - positionInBlock; long remaining = blockLen - positionInBlock;
return remaining > bufferSize ? bufferSize : remaining; return (int) Math.min(remaining, recoverLength);
} }
/** /**
@ -529,11 +538,15 @@ private long getReadLength(int index) {
* operations and next iteration read. * operations and next iteration read.
* *
* @param success the initial success list of source DNs we think best * @param success the initial success list of source DNs we think best
* @param recoverLength the length to recover.
* @return updated success list of source DNs we do real read * @return updated success list of source DNs we do real read
* @throws IOException * @throws IOException
*/ */
private int[] readMinimumStripedData4Recovery(final int[] success) private int[] readMinimumStripedData4Recovery(final int[] success,
int recoverLength, Map<ExtendedBlock, Set<DatanodeInfo>> corruptionMap)
throws IOException { throws IOException {
Preconditions.checkArgument(recoverLength >= 0 &&
recoverLength <= bufferSize);
int nsuccess = 0; int nsuccess = 0;
int[] newSuccess = new int[minRequiredSources]; int[] newSuccess = new int[minRequiredSources];
BitSet used = new BitSet(sources.length); BitSet used = new BitSet(sources.length);
@ -543,9 +556,11 @@ private int[] readMinimumStripedData4Recovery(final int[] success)
*/ */
for (int i = 0; i < minRequiredSources; i++) { for (int i = 0; i < minRequiredSources; i++) {
StripedReader reader = stripedReaders.get(success[i]); StripedReader reader = stripedReaders.get(success[i]);
if (getReadLength(liveIndices[success[i]]) > 0) { final int toRead = getReadLength(liveIndices[success[i]],
Callable<Void> readCallable = readFromBlock( recoverLength);
reader.blockReader, reader.buffer); if (toRead > 0) {
Callable<Void> readCallable = readFromBlock(reader, reader.buffer,
toRead, corruptionMap);
Future<Void> f = readService.submit(readCallable); Future<Void> f = readService.submit(readCallable);
futures.put(f, success[i]); futures.put(f, success[i]);
} else { } else {
@ -570,10 +585,10 @@ private int[] readMinimumStripedData4Recovery(final int[] success)
StripedReader failedReader = stripedReaders.get(result.index); StripedReader failedReader = stripedReaders.get(result.index);
closeBlockReader(failedReader.blockReader); closeBlockReader(failedReader.blockReader);
failedReader.blockReader = null; failedReader.blockReader = null;
resultIndex = scheduleNewRead(used); resultIndex = scheduleNewRead(used, recoverLength, corruptionMap);
} else if (result.state == StripingChunkReadResult.TIMEOUT) { } else if (result.state == StripingChunkReadResult.TIMEOUT) {
// If timeout, we also schedule a new read. // If timeout, we also schedule a new read.
resultIndex = scheduleNewRead(used); resultIndex = scheduleNewRead(used, recoverLength, corruptionMap);
} }
if (resultIndex >= 0) { if (resultIndex >= 0) {
newSuccess[nsuccess++] = resultIndex; newSuccess[nsuccess++] = resultIndex;
@ -601,6 +616,9 @@ private int[] readMinimumStripedData4Recovery(final int[] success)
} }
private void paddingBufferToLen(ByteBuffer buffer, int len) { private void paddingBufferToLen(ByteBuffer buffer, int len) {
if (len > buffer.limit()) {
buffer.limit(len);
}
int toPadding = len - buffer.position(); int toPadding = len - buffer.position();
for (int i = 0; i < toPadding; i++) { for (int i = 0; i < toPadding; i++) {
buffer.put((byte) 0); buffer.put((byte) 0);
@ -648,8 +666,8 @@ private void recoverTargets(int[] success, boolean[] targetsStatus,
int m = 0; int m = 0;
for (int i = 0; i < targetBuffers.length; i++) { for (int i = 0; i < targetBuffers.length; i++) {
if (targetsStatus[i]) { if (targetsStatus[i]) {
targetBuffers[i].limit(toRecoverLen);
outputs[m++] = targetBuffers[i]; outputs[m++] = targetBuffers[i];
outputs[i].limit(toRecoverLen);
} }
} }
decoder.decode(inputs, erasedIndices, outputs); decoder.decode(inputs, erasedIndices, outputs);
@ -658,7 +676,7 @@ private void recoverTargets(int[] success, boolean[] targetsStatus,
if (targetsStatus[i]) { if (targetsStatus[i]) {
long blockLen = getBlockLen(blockGroup, targetIndices[i]); long blockLen = getBlockLen(blockGroup, targetIndices[i]);
long remaining = blockLen - positionInBlock; long remaining = blockLen - positionInBlock;
if (remaining < 0) { if (remaining <= 0) {
targetBuffers[i].limit(0); targetBuffers[i].limit(0);
} else if (remaining < toRecoverLen) { } else if (remaining < toRecoverLen) {
targetBuffers[i].limit((int)remaining); targetBuffers[i].limit((int)remaining);
@ -678,16 +696,19 @@ private void recoverTargets(int[] success, boolean[] targetsStatus,
* @param used the used source DNs in this iteration. * @param used the used source DNs in this iteration.
* @return the array index of source DN if don't need to do real read. * @return the array index of source DN if don't need to do real read.
*/ */
private int scheduleNewRead(BitSet used) { private int scheduleNewRead(BitSet used, int recoverLength,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptionMap) {
StripedReader reader = null; StripedReader reader = null;
// step1: initially we may only have <code>minRequiredSources</code> // step1: initially we may only have <code>minRequiredSources</code>
// number of StripedReader, and there may be some source DNs we never // number of StripedReader, and there may be some source DNs we never
// read before, so will try to create StripedReader for one new source DN // read before, so will try to create StripedReader for one new source DN
// and try to read from it. If found, go to step 3. // and try to read from it. If found, go to step 3.
int m = stripedReaders.size(); int m = stripedReaders.size();
int toRead = 0;
while (reader == null && m < sources.length) { while (reader == null && m < sources.length) {
reader = addStripedReader(m, positionInBlock); reader = addStripedReader(m, positionInBlock);
if (getReadLength(liveIndices[m]) > 0) { toRead = getReadLength(liveIndices[m], recoverLength);
if (toRead > 0) {
if (reader.blockReader == null) { if (reader.blockReader == null) {
reader = null; reader = null;
m++; m++;
@ -706,12 +727,14 @@ private int scheduleNewRead(BitSet used) {
for (int i = 0; reader == null && i < stripedReaders.size(); i++) { for (int i = 0; reader == null && i < stripedReaders.size(); i++) {
if (!used.get(i)) { if (!used.get(i)) {
StripedReader r = stripedReaders.get(i); StripedReader r = stripedReaders.get(i);
if (getReadLength(liveIndices[i]) > 0) { toRead = getReadLength(liveIndices[i], recoverLength);
if (toRead > 0) {
closeBlockReader(r.blockReader); closeBlockReader(r.blockReader);
r.blockReader = newBlockReader( r.blockReader = newBlockReader(
getBlock(blockGroup, liveIndices[i]), positionInBlock, getBlock(blockGroup, liveIndices[i]), positionInBlock,
sources[i]); sources[i]);
if (r.blockReader != null) { if (r.blockReader != null) {
r.buffer.position(0);
m = i; m = i;
reader = r; reader = r;
} }
@ -725,8 +748,8 @@ private int scheduleNewRead(BitSet used) {
// step3: schedule if find a correct source DN and need to do real read. // step3: schedule if find a correct source DN and need to do real read.
if (reader != null) { if (reader != null) {
Callable<Void> readCallable = readFromBlock( Callable<Void> readCallable = readFromBlock(reader, reader.buffer,
reader.blockReader, reader.buffer); toRead, corruptionMap);
Future<Void> f = readService.submit(readCallable); Future<Void> f = readService.submit(readCallable);
futures.put(f, m); futures.put(f, m);
used.set(m); used.set(m);
@ -742,15 +765,22 @@ private void cancelReads(Collection<Future<Void>> futures) {
} }
} }
private Callable<Void> readFromBlock(final BlockReader reader, private Callable<Void> readFromBlock(final StripedReader reader,
final ByteBuffer buf) { final ByteBuffer buf, final int length,
final Map<ExtendedBlock, Set<DatanodeInfo>> corruptionMap) {
return new Callable<Void>() { return new Callable<Void>() {
@Override @Override
public Void call() throws Exception { public Void call() throws Exception {
try { try {
actualReadFromBlock(reader, buf); buf.limit(length);
actualReadFromBlock(reader.blockReader, buf);
return null; return null;
} catch (ChecksumException e) {
LOG.warn("Found Checksum error for " + reader.block + " from "
+ reader.source + " at " + e.getPos());
addCorruptedBlock(reader.block, reader.source, corruptionMap);
throw e;
} catch (IOException e) { } catch (IOException e) {
LOG.info(e.getMessage()); LOG.info(e.getMessage());
throw e; throw e;
@ -760,6 +790,30 @@ public Void call() throws Exception {
}; };
} }
private void reportCorruptedBlocks(
Map<ExtendedBlock, Set<DatanodeInfo>> corruptionMap) throws IOException {
if (!corruptionMap.isEmpty()) {
for (Map.Entry<ExtendedBlock, Set<DatanodeInfo>> entry :
corruptionMap.entrySet()) {
for (DatanodeInfo dnInfo : entry.getValue()) {
datanode.reportRemoteBadBlock(dnInfo, entry.getKey());
}
}
}
}
private void addCorruptedBlock(ExtendedBlock blk, DatanodeInfo node,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptionMap) {
Set<DatanodeInfo> dnSet = corruptionMap.get(blk);
if (dnSet == null) {
dnSet = new HashSet<>();
corruptionMap.put(blk, dnSet);
}
if (!dnSet.contains(node)) {
dnSet.add(node);
}
}
/** /**
* Read bytes from block * Read bytes from block
*/ */
@ -900,14 +954,14 @@ private void clearBuffers() {
} }
if (zeroStripeBuffers != null) { if (zeroStripeBuffers != null) {
for (int i = 0; i < zeroStripeBuffers.length; i++) { for (ByteBuffer zeroStripeBuffer : zeroStripeBuffers) {
zeroStripeBuffers[i].clear(); zeroStripeBuffer.clear();
} }
} }
for (int i = 0; i < targetBuffers.length; i++) { for (ByteBuffer targetBuffer : targetBuffers) {
if (targetBuffers[i] != null) { if (targetBuffer != null) {
targetBuffers[i].clear(); targetBuffer.clear();
} }
} }
} }
@ -998,9 +1052,13 @@ private static class StripedReader {
private final short index; // internal block index private final short index; // internal block index
private BlockReader blockReader; private BlockReader blockReader;
private ByteBuffer buffer; private ByteBuffer buffer;
private final ExtendedBlock block;
private final DatanodeInfo source;
private StripedReader(short index) { StripedReader(short index, ExtendedBlock block, DatanodeInfo source) {
this.index = index; this.index = index;
this.block = block;
this.source = source;
} }
} }
} }

View File

@ -136,7 +136,7 @@ public String toString() {
.append("Recovering ").append(block).append(" From: ") .append("Recovering ").append(block).append(" From: ")
.append(Arrays.asList(sources)).append(" To: [") .append(Arrays.asList(sources)).append(" To: [")
.append(Arrays.asList(targets)).append(")\n") .append(Arrays.asList(targets)).append(")\n")
.append(" Block Indices: ").append(Arrays.asList(liveBlockIndices)) .append(" Block Indices: ").append(Arrays.toString(liveBlockIndices))
.toString(); .toString();
} }
} }

View File

@ -19,6 +19,7 @@
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -29,12 +30,16 @@
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
@ -51,6 +56,14 @@
public class TestReadStripedFileWithDecoding { public class TestReadStripedFileWithDecoding {
static final Log LOG = LogFactory.getLog(TestReadStripedFileWithDecoding.class); static final Log LOG = LogFactory.getLog(TestReadStripedFileWithDecoding.class);
static {
((Log4JLogger)LogFactory.getLog(BlockPlacementPolicy.class))
.getLogger().setLevel(Level.ALL);
GenericTestUtils.setLogLevel(BlockManager.LOG, Level.ALL);
GenericTestUtils.setLogLevel(BlockManager.blockLog, Level.ALL);
GenericTestUtils.setLogLevel(NameNode.stateChangeLog, Level.ALL);
}
private MiniDFSCluster cluster; private MiniDFSCluster cluster;
private DistributedFileSystem fs; private DistributedFileSystem fs;
private final short dataBlocks = StripedFileTestUtil.NUM_DATA_BLOCKS; private final short dataBlocks = StripedFileTestUtil.NUM_DATA_BLOCKS;
@ -66,9 +79,9 @@ public void setup() throws IOException {
Configuration conf = new HdfsConfiguration(); Configuration conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0); conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY, 0);
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, false); conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, false);
cluster = new MiniDFSCluster.Builder(new HdfsConfiguration()) cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
.numDataNodes(numDNs).build();
cluster.getFileSystem().getClient().setErasureCodingPolicy("/", null); cluster.getFileSystem().getClient().setErasureCodingPolicy("/", null);
fs = cluster.getFileSystem(); fs = cluster.getFileSystem();
} }

View File

@ -23,11 +23,12 @@
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet; import java.util.BitSet;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ThreadLocalRandom; import java.util.Random;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -38,6 +39,7 @@
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
@ -64,17 +66,25 @@ public class TestRecoverStripedFile {
static { static {
GenericTestUtils.setLogLevel(DFSClient.LOG, Level.ALL); GenericTestUtils.setLogLevel(DFSClient.LOG, Level.ALL);
GenericTestUtils.setLogLevel(BlockManager.LOG, Level.ALL);
GenericTestUtils.setLogLevel(BlockManager.blockLog, Level.ALL);
}
enum RecoveryType {
DataOnly,
ParityOnly,
Any
} }
private MiniDFSCluster cluster; private MiniDFSCluster cluster;
private Configuration conf;
private DistributedFileSystem fs; private DistributedFileSystem fs;
// Map: DatanodeID -> datanode index in cluster // Map: DatanodeID -> datanode index in cluster
private Map<DatanodeID, Integer> dnMap = new HashMap<DatanodeID, Integer>(); private Map<DatanodeID, Integer> dnMap = new HashMap<>();
private final Random random = new Random();
@Before @Before
public void setup() throws IOException { public void setup() throws IOException {
conf = new Configuration(); final Configuration conf = new Configuration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
conf.setInt(DFSConfigKeys.DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_KEY, conf.setInt(DFSConfigKeys.DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_KEY,
cellSize - 1); cellSize - 1);
@ -104,73 +114,138 @@ public void tearDown() {
@Test(timeout = 120000) @Test(timeout = 120000)
public void testRecoverOneParityBlock() throws Exception { public void testRecoverOneParityBlock() throws Exception {
int fileLen = 10 * blockSize + blockSize/10; int fileLen = 10 * blockSize + blockSize/10;
assertFileBlocksRecovery("/testRecoverOneParityBlock", fileLen, 0, 1); assertFileBlocksRecovery("/testRecoverOneParityBlock", fileLen,
RecoveryType.ParityOnly, 1);
} }
@Test(timeout = 120000) @Test(timeout = 120000)
public void testRecoverOneParityBlock1() throws Exception { public void testRecoverOneParityBlock1() throws Exception {
int fileLen = cellSize + cellSize/10; int fileLen = cellSize + cellSize/10;
assertFileBlocksRecovery("/testRecoverOneParityBlock1", fileLen, 0, 1); assertFileBlocksRecovery("/testRecoverOneParityBlock1", fileLen,
RecoveryType.ParityOnly, 1);
} }
@Test(timeout = 120000) @Test(timeout = 120000)
public void testRecoverOneParityBlock2() throws Exception { public void testRecoverOneParityBlock2() throws Exception {
int fileLen = 1; int fileLen = 1;
assertFileBlocksRecovery("/testRecoverOneParityBlock2", fileLen, 0, 1); assertFileBlocksRecovery("/testRecoverOneParityBlock2", fileLen,
RecoveryType.ParityOnly, 1);
} }
@Test(timeout = 120000) @Test(timeout = 120000)
public void testRecoverOneParityBlock3() throws Exception { public void testRecoverOneParityBlock3() throws Exception {
int fileLen = 3 * blockSize + blockSize/10; int fileLen = 3 * blockSize + blockSize/10;
assertFileBlocksRecovery("/testRecoverOneParityBlock3", fileLen, 0, 1); assertFileBlocksRecovery("/testRecoverOneParityBlock3", fileLen,
RecoveryType.ParityOnly, 1);
} }
@Test(timeout = 120000) @Test(timeout = 120000)
public void testRecoverThreeParityBlocks() throws Exception { public void testRecoverThreeParityBlocks() throws Exception {
int fileLen = 10 * blockSize + blockSize/10; int fileLen = 10 * blockSize + blockSize/10;
assertFileBlocksRecovery("/testRecoverThreeParityBlocks", fileLen, 0, 3); assertFileBlocksRecovery("/testRecoverThreeParityBlocks", fileLen,
RecoveryType.ParityOnly, 3);
} }
@Test(timeout = 120000) @Test(timeout = 120000)
public void testRecoverThreeDataBlocks() throws Exception { public void testRecoverThreeDataBlocks() throws Exception {
int fileLen = 10 * blockSize + blockSize/10; int fileLen = 10 * blockSize + blockSize/10;
assertFileBlocksRecovery("/testRecoverThreeDataBlocks", fileLen, 1, 3); assertFileBlocksRecovery("/testRecoverThreeDataBlocks", fileLen,
RecoveryType.DataOnly, 3);
} }
@Test(timeout = 120000) @Test(timeout = 120000)
public void testRecoverThreeDataBlocks1() throws Exception { public void testRecoverThreeDataBlocks1() throws Exception {
int fileLen = 3 * blockSize + blockSize/10; int fileLen = 3 * blockSize + blockSize/10;
assertFileBlocksRecovery("/testRecoverThreeDataBlocks1", fileLen, 1, 3); assertFileBlocksRecovery("/testRecoverThreeDataBlocks1", fileLen,
RecoveryType.DataOnly, 3);
} }
@Test(timeout = 120000) @Test(timeout = 120000)
public void testRecoverOneDataBlock() throws Exception { public void testRecoverOneDataBlock() throws Exception {
int fileLen = 10 * blockSize + blockSize/10; int fileLen = 10 * blockSize + blockSize/10;
assertFileBlocksRecovery("/testRecoverOneDataBlock", fileLen, 1, 1); assertFileBlocksRecovery("/testRecoverOneDataBlock", fileLen,
RecoveryType.DataOnly, 1);
} }
@Test(timeout = 120000) @Test(timeout = 120000)
public void testRecoverOneDataBlock1() throws Exception { public void testRecoverOneDataBlock1() throws Exception {
int fileLen = cellSize + cellSize/10; int fileLen = cellSize + cellSize/10;
assertFileBlocksRecovery("/testRecoverOneDataBlock1", fileLen, 1, 1); assertFileBlocksRecovery("/testRecoverOneDataBlock1", fileLen,
RecoveryType.DataOnly, 1);
} }
@Test(timeout = 120000) @Test(timeout = 120000)
public void testRecoverOneDataBlock2() throws Exception { public void testRecoverOneDataBlock2() throws Exception {
int fileLen = 1; int fileLen = 1;
assertFileBlocksRecovery("/testRecoverOneDataBlock2", fileLen, 1, 1); assertFileBlocksRecovery("/testRecoverOneDataBlock2", fileLen,
RecoveryType.DataOnly, 1);
} }
@Test(timeout = 120000) @Test(timeout = 120000)
public void testRecoverAnyBlocks() throws Exception { public void testRecoverAnyBlocks() throws Exception {
int fileLen = 3 * blockSize + blockSize/10; int fileLen = 3 * blockSize + blockSize/10;
assertFileBlocksRecovery("/testRecoverAnyBlocks", fileLen, 2, 2); assertFileBlocksRecovery("/testRecoverAnyBlocks", fileLen,
RecoveryType.Any, 2);
} }
@Test(timeout = 120000) @Test(timeout = 120000)
public void testRecoverAnyBlocks1() throws Exception { public void testRecoverAnyBlocks1() throws Exception {
int fileLen = 10 * blockSize + blockSize/10; int fileLen = 10 * blockSize + blockSize/10;
assertFileBlocksRecovery("/testRecoverAnyBlocks1", fileLen, 2, 3); assertFileBlocksRecovery("/testRecoverAnyBlocks1", fileLen,
RecoveryType.Any, 3);
}
private int[] generateDeadDnIndices(RecoveryType type, int deadNum,
byte[] indices) {
List<Integer> deadList = new ArrayList<>(deadNum);
while (deadList.size() < deadNum) {
int dead = random.nextInt(indices.length);
boolean isOfType = true;
if (type == RecoveryType.DataOnly) {
isOfType = indices[dead] < dataBlkNum;
} else if (type == RecoveryType.ParityOnly) {
isOfType = indices[dead] >= dataBlkNum;
}
if (isOfType && !deadList.contains(dead)) {
deadList.add(dead);
}
}
int[] d = new int[deadNum];
for (int i = 0; i < deadNum; i++) {
d[i] = deadList.get(i);
}
return d;
}
private void shutdownDataNodes(DataNode dn) throws IOException {
/*
* Kill the datanode which contains one replica
* We need to make sure it dead in namenode: clear its update time and
* trigger NN to check heartbeat.
*/
dn.shutdown();
cluster.setDataNodeDead(dn.getDatanodeId());
}
private int generateErrors(Map<ExtendedBlock, DataNode> corruptTargets,
RecoveryType type)
throws IOException {
int stoppedDN = 0;
for (Map.Entry<ExtendedBlock, DataNode> target : corruptTargets.entrySet()) {
if (stoppedDN == 0 || type != RecoveryType.DataOnly
|| random.nextBoolean()) {
// stop at least one DN to trigger recovery
LOG.info("Note: stop DataNode " + target.getValue().getDisplayName()
+ " with internal block " + target.getKey());
shutdownDataNodes(target.getValue());
stoppedDN++;
} else { // corrupt the data on the DN
LOG.info("Note: corrupt data on " + target.getValue().getDisplayName()
+ " with internal block " + target.getKey());
cluster.corruptReplica(target.getValue(), target.getKey());
}
}
return stoppedDN;
} }
/** /**
@ -180,11 +255,7 @@ public void testRecoverAnyBlocks1() throws Exception {
* 2. Read the file and verify content. * 2. Read the file and verify content.
*/ */
private void assertFileBlocksRecovery(String fileName, int fileLen, private void assertFileBlocksRecovery(String fileName, int fileLen,
int recovery, int toRecoverBlockNum) throws Exception { RecoveryType type, int toRecoverBlockNum) throws Exception {
if (recovery != 0 && recovery != 1 && recovery != 2) {
Assert.fail("Invalid recovery: 0 is to recovery parity blocks,"
+ "1 is to recovery data blocks, 2 is any.");
}
if (toRecoverBlockNum < 1 || toRecoverBlockNum > parityBlkNum) { if (toRecoverBlockNum < 1 || toRecoverBlockNum > parityBlkNum) {
Assert.fail("toRecoverBlockNum should be between 1 ~ " + parityBlkNum); Assert.fail("toRecoverBlockNum should be between 1 ~ " + parityBlkNum);
} }
@ -192,7 +263,7 @@ private void assertFileBlocksRecovery(String fileName, int fileLen,
Path file = new Path(fileName); Path file = new Path(fileName);
final byte[] data = new byte[fileLen]; final byte[] data = new byte[fileLen];
ThreadLocalRandom.current().nextBytes(data); Arrays.fill(data, (byte) 1);
DFSTestUtil.writeFile(fs, file, data); DFSTestUtil.writeFile(fs, file, data);
StripedFileTestUtil.waitBlockGroupsReported(fs, fileName); StripedFileTestUtil.waitBlockGroupsReported(fs, fileName);
@ -210,25 +281,9 @@ private void assertFileBlocksRecovery(String fileName, int fileLen,
bitset.set(dnMap.get(storageInfo)); bitset.set(dnMap.get(storageInfo));
} }
int[] toDead = new int[toRecoverBlockNum]; int[] dead = generateDeadDnIndices(type, toRecoverBlockNum, indices);
int n = 0; LOG.info("Note: indices == " + Arrays.toString(indices)
for (int i = 0; i < indices.length; i++) { + ". Generate errors on datanodes: " + Arrays.toString(dead));
if (n < toRecoverBlockNum) {
if (recovery == 0) {
if (indices[i] >= dataBlkNum) {
toDead[n++] = i;
}
} else if (recovery == 1) {
if (indices[i] < dataBlkNum) {
toDead[n++] = i;
}
} else {
toDead[n++] = i;
}
} else {
break;
}
}
DatanodeInfo[] dataDNs = new DatanodeInfo[toRecoverBlockNum]; DatanodeInfo[] dataDNs = new DatanodeInfo[toRecoverBlockNum];
int[] deadDnIndices = new int[toRecoverBlockNum]; int[] deadDnIndices = new int[toRecoverBlockNum];
@ -236,46 +291,41 @@ private void assertFileBlocksRecovery(String fileName, int fileLen,
File[] replicas = new File[toRecoverBlockNum]; File[] replicas = new File[toRecoverBlockNum];
File[] metadatas = new File[toRecoverBlockNum]; File[] metadatas = new File[toRecoverBlockNum];
byte[][] replicaContents = new byte[toRecoverBlockNum][]; byte[][] replicaContents = new byte[toRecoverBlockNum][];
Map<ExtendedBlock, DataNode> errorMap = new HashMap<>(dead.length);
for (int i = 0; i < toRecoverBlockNum; i++) { for (int i = 0; i < toRecoverBlockNum; i++) {
dataDNs[i] = storageInfos[toDead[i]]; dataDNs[i] = storageInfos[dead[i]];
deadDnIndices[i] = dnMap.get(dataDNs[i]); deadDnIndices[i] = dnMap.get(dataDNs[i]);
// Check the block replica file on deadDn before it dead. // Check the block replica file on deadDn before it dead.
blocks[i] = StripedBlockUtil.constructInternalBlock( blocks[i] = StripedBlockUtil.constructInternalBlock(
lastBlock.getBlock(), cellSize, dataBlkNum, indices[toDead[i]]); lastBlock.getBlock(), cellSize, dataBlkNum, indices[dead[i]]);
errorMap.put(blocks[i], cluster.getDataNodes().get(deadDnIndices[i]));
replicas[i] = cluster.getBlockFile(deadDnIndices[i], blocks[i]); replicas[i] = cluster.getBlockFile(deadDnIndices[i], blocks[i]);
metadatas[i] = cluster.getBlockMetadataFile(deadDnIndices[i], blocks[i]); metadatas[i] = cluster.getBlockMetadataFile(deadDnIndices[i], blocks[i]);
// the block replica on the datanode should be the same as expected // the block replica on the datanode should be the same as expected
assertEquals(replicas[i].length(), assertEquals(replicas[i].length(),
StripedBlockUtil.getInternalBlockLength( StripedBlockUtil.getInternalBlockLength(
lastBlock.getBlockSize(), cellSize, dataBlkNum, indices[toDead[i]])); lastBlock.getBlockSize(), cellSize, dataBlkNum, indices[dead[i]]));
assertTrue(metadatas[i].getName(). assertTrue(metadatas[i].getName().
endsWith(blocks[i].getGenerationStamp() + ".meta")); endsWith(blocks[i].getGenerationStamp() + ".meta"));
LOG.info("replica " + i + " locates in file: " + replicas[i]);
replicaContents[i] = DFSTestUtil.readFileAsBytes(replicas[i]); replicaContents[i] = DFSTestUtil.readFileAsBytes(replicas[i]);
} }
int cellsNum = (fileLen - 1) / cellSize + 1; int cellsNum = (fileLen - 1) / cellSize + 1;
int groupSize = Math.min(cellsNum, dataBlkNum) + parityBlkNum; int groupSize = Math.min(cellsNum, dataBlkNum) + parityBlkNum;
for (int i = 0; i < toRecoverBlockNum; i++) { // shutdown datanodes or generate corruption
/* int stoppedDN = generateErrors(errorMap, type);
* Kill the datanode which contains one replica
* We need to make sure it dead in namenode: clear its update time and
* trigger NN to check heartbeat.
*/
DataNode dn = cluster.getDataNodes().get(deadDnIndices[i]);
dn.shutdown();
cluster.setDataNodeDead(dn.getDatanodeId());
}
// Check the locatedBlocks of the file again // Check the locatedBlocks of the file again
locatedBlocks = getLocatedBlocks(file); locatedBlocks = getLocatedBlocks(file);
lastBlock = (LocatedStripedBlock)locatedBlocks.getLastLocatedBlock(); lastBlock = (LocatedStripedBlock)locatedBlocks.getLastLocatedBlock();
storageInfos = lastBlock.getLocations(); storageInfos = lastBlock.getLocations();
assertEquals(storageInfos.length, groupSize - toRecoverBlockNum); assertEquals(storageInfos.length, groupSize - stoppedDN);
int[] targetDNs = new int[dnNum - groupSize]; int[] targetDNs = new int[dnNum - groupSize];
n = 0; int n = 0;
for (int i = 0; i < dnNum; i++) { for (int i = 0; i < dnNum; i++) {
if (!bitset.get(i)) { // not contain replica of the block. if (!bitset.get(i)) { // not contain replica of the block.
targetDNs[n++] = i; targetDNs[n++] = i;
@ -289,9 +339,11 @@ private void assertFileBlocksRecovery(String fileName, int fileLen,
// Check the replica on the new target node. // Check the replica on the new target node.
for (int i = 0; i < toRecoverBlockNum; i++) { for (int i = 0; i < toRecoverBlockNum; i++) {
File replicaAfterRecovery = cluster.getBlockFile(targetDNs[i], blocks[i]); File replicaAfterRecovery = cluster.getBlockFile(targetDNs[i], blocks[i]);
LOG.info("replica after recovery " + replicaAfterRecovery);
File metadataAfterRecovery = File metadataAfterRecovery =
cluster.getBlockMetadataFile(targetDNs[i], blocks[i]); cluster.getBlockMetadataFile(targetDNs[i], blocks[i]);
assertEquals(replicaAfterRecovery.length(), replicas[i].length()); assertEquals(replicaAfterRecovery.length(), replicas[i].length());
LOG.info("replica before " + replicas[i]);
assertTrue(metadataAfterRecovery.getName(). assertTrue(metadataAfterRecovery.getName().
endsWith(blocks[i].getGenerationStamp() + ".meta")); endsWith(blocks[i].getGenerationStamp() + ".meta"));
byte[] replicaContentAfterRecovery = byte[] replicaContentAfterRecovery =
@ -366,7 +418,7 @@ public void testProcessErasureCodingTasksSubmitionShouldSucceed()
BlockECRecoveryInfo invalidECInfo = new BlockECRecoveryInfo( BlockECRecoveryInfo invalidECInfo = new BlockECRecoveryInfo(
new ExtendedBlock("bp-id", 123456), dataDNs, dnStorageInfo, liveIndices, new ExtendedBlock("bp-id", 123456), dataDNs, dnStorageInfo, liveIndices,
ErasureCodingPolicyManager.getSystemDefaultPolicy()); ErasureCodingPolicyManager.getSystemDefaultPolicy());
List<BlockECRecoveryInfo> ecTasks = new ArrayList<BlockECRecoveryInfo>(); List<BlockECRecoveryInfo> ecTasks = new ArrayList<>();
ecTasks.add(invalidECInfo); ecTasks.add(invalidECInfo);
dataNode.getErasureCodingWorker().processErasureCodingTasks(ecTasks); dataNode.getErasureCodingWorker().processErasureCodingTasks(ecTasks);
} }