HDDS-419. ChunkInputStream bulk read api does not read from all the chunks. Contributed by Lokesh Jain and Mukul Kumar.
This commit is contained in:
parent
488806baca
commit
6f037468bc
@ -121,12 +121,17 @@ public synchronized int read(byte[] b, int off, int len) throws IOException {
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
checkOpen();
|
checkOpen();
|
||||||
|
int total = 0;
|
||||||
|
while (len > 0) {
|
||||||
int available = prepareRead(len);
|
int available = prepareRead(len);
|
||||||
if (available == EOF) {
|
if (available == EOF) {
|
||||||
return EOF;
|
return total != 0 ? total : EOF;
|
||||||
}
|
}
|
||||||
buffers.get(bufferIndex).get(b, off, available);
|
buffers.get(bufferIndex).get(b, off + total, available);
|
||||||
return available;
|
len -= available;
|
||||||
|
total += available;
|
||||||
|
}
|
||||||
|
return total;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -196,13 +201,20 @@ private synchronized void readChunkFromContainer() throws IOException {
|
|||||||
// next chunk
|
// next chunk
|
||||||
chunkIndex += 1;
|
chunkIndex += 1;
|
||||||
final ReadChunkResponseProto readChunkResponse;
|
final ReadChunkResponseProto readChunkResponse;
|
||||||
|
final ChunkInfo chunkInfo = chunks.get(chunkIndex);
|
||||||
try {
|
try {
|
||||||
readChunkResponse = ContainerProtocolCalls.readChunk(xceiverClient,
|
readChunkResponse = ContainerProtocolCalls
|
||||||
chunks.get(chunkIndex), blockID, traceID);
|
.readChunk(xceiverClient, chunkInfo, blockID, traceID);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new IOException("Unexpected OzoneException: " + e.toString(), e);
|
throw new IOException("Unexpected OzoneException: " + e.toString(), e);
|
||||||
}
|
}
|
||||||
ByteString byteString = readChunkResponse.getData();
|
ByteString byteString = readChunkResponse.getData();
|
||||||
|
if (byteString.size() != chunkInfo.getLen()) {
|
||||||
|
// Bytes read from chunk should be equal to chunk size.
|
||||||
|
throw new IOException(String
|
||||||
|
.format("Inconsistent read for chunk=%s len=%d bytesRead=%d",
|
||||||
|
chunkInfo.getChunkName(), chunkInfo.getLen(), byteString.size()));
|
||||||
|
}
|
||||||
buffers = byteString.asReadOnlyByteBufferList();
|
buffers = byteString.asReadOnlyByteBufferList();
|
||||||
bufferIndex = 0;
|
bufferIndex = 0;
|
||||||
}
|
}
|
||||||
@ -260,4 +272,8 @@ public synchronized long getPos() throws IOException {
|
|||||||
public boolean seekToNewSource(long targetPos) throws IOException {
|
public boolean seekToNewSource(long targetPos) throws IOException {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public BlockID getBlockID() {
|
||||||
|
return blockID;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -115,19 +115,20 @@ public synchronized int read(byte[] b, int off, int len) throws IOException {
|
|||||||
return totalReadLen == 0 ? EOF : totalReadLen;
|
return totalReadLen == 0 ? EOF : totalReadLen;
|
||||||
}
|
}
|
||||||
ChunkInputStreamEntry current = streamEntries.get(currentStreamIndex);
|
ChunkInputStreamEntry current = streamEntries.get(currentStreamIndex);
|
||||||
int readLen = Math.min(len, (int)current.getRemaining());
|
int numBytesToRead = Math.min(len, (int)current.getRemaining());
|
||||||
int actualLen = current.read(b, off, readLen);
|
int numBytesRead = current.read(b, off, numBytesToRead);
|
||||||
// this means the underlying stream has nothing at all, return
|
if (numBytesRead != numBytesToRead) {
|
||||||
if (actualLen == EOF) {
|
// This implies that there is either data loss or corruption in the
|
||||||
return totalReadLen > 0 ? totalReadLen : EOF;
|
// chunk entries. Even EOF in the current stream would be covered in
|
||||||
|
// this case.
|
||||||
|
throw new IOException(String.format(
|
||||||
|
"Inconsistent read for blockID=%s length=%d numBytesRead=%d",
|
||||||
|
current.chunkInputStream.getBlockID(), current.length,
|
||||||
|
numBytesRead));
|
||||||
}
|
}
|
||||||
totalReadLen += actualLen;
|
totalReadLen += numBytesRead;
|
||||||
// this means there is no more data to read beyond this point, return
|
off += numBytesRead;
|
||||||
if (actualLen != readLen) {
|
len -= numBytesRead;
|
||||||
return totalReadLen;
|
|
||||||
}
|
|
||||||
off += readLen;
|
|
||||||
len -= readLen;
|
|
||||||
if (current.getRemaining() <= 0) {
|
if (current.getRemaining() <= 0) {
|
||||||
currentStreamIndex += 1;
|
currentStreamIndex += 1;
|
||||||
}
|
}
|
||||||
|
@ -68,7 +68,7 @@ public void ratisTestLargeKey() throws Exception {
|
|||||||
randomKeyGenerator.setNumOfKeys(1);
|
randomKeyGenerator.setNumOfKeys(1);
|
||||||
randomKeyGenerator.setType(ReplicationType.RATIS);
|
randomKeyGenerator.setType(ReplicationType.RATIS);
|
||||||
randomKeyGenerator.setFactor(ReplicationFactor.THREE);
|
randomKeyGenerator.setFactor(ReplicationFactor.THREE);
|
||||||
randomKeyGenerator.setKeySize(104857600);
|
randomKeyGenerator.setKeySize(20971520);
|
||||||
randomKeyGenerator.setValidateWrites(true);
|
randomKeyGenerator.setValidateWrites(true);
|
||||||
randomKeyGenerator.call();
|
randomKeyGenerator.call();
|
||||||
Assert.assertEquals(1, randomKeyGenerator.getNumberOfVolumesCreated());
|
Assert.assertEquals(1, randomKeyGenerator.getNumberOfVolumesCreated());
|
||||||
@ -84,7 +84,7 @@ public void standaloneTestLargeKey() throws Exception {
|
|||||||
randomKeyGenerator.setNumOfVolumes(1);
|
randomKeyGenerator.setNumOfVolumes(1);
|
||||||
randomKeyGenerator.setNumOfBuckets(1);
|
randomKeyGenerator.setNumOfBuckets(1);
|
||||||
randomKeyGenerator.setNumOfKeys(1);
|
randomKeyGenerator.setNumOfKeys(1);
|
||||||
randomKeyGenerator.setKeySize(104857600);
|
randomKeyGenerator.setKeySize(20971520);
|
||||||
randomKeyGenerator.setValidateWrites(true);
|
randomKeyGenerator.setValidateWrites(true);
|
||||||
randomKeyGenerator.call();
|
randomKeyGenerator.call();
|
||||||
Assert.assertEquals(1, randomKeyGenerator.getNumberOfVolumesCreated());
|
Assert.assertEquals(1, randomKeyGenerator.getNumberOfVolumesCreated());
|
||||||
|
@ -35,6 +35,7 @@
|
|||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
import java.util.function.Supplier;
|
import java.util.function.Supplier;
|
||||||
|
|
||||||
|
import org.apache.commons.codec.digest.DigestUtils;
|
||||||
import org.apache.hadoop.hdds.cli.HddsVersionProvider;
|
import org.apache.hadoop.hdds.cli.HddsVersionProvider;
|
||||||
import org.apache.hadoop.hdds.client.OzoneQuota;
|
import org.apache.hadoop.hdds.client.OzoneQuota;
|
||||||
import org.apache.hadoop.hdds.client.ReplicationFactor;
|
import org.apache.hadoop.hdds.client.ReplicationFactor;
|
||||||
@ -984,9 +985,9 @@ public void run() {
|
|||||||
writeValidationFailureCount++;
|
writeValidationFailureCount++;
|
||||||
LOG.warn("Data validation error for key {}/{}/{}",
|
LOG.warn("Data validation error for key {}/{}/{}",
|
||||||
kv.bucket.getVolumeName(), kv.bucket, kv.key);
|
kv.bucket.getVolumeName(), kv.bucket, kv.key);
|
||||||
LOG.warn("Expected: {}, Actual: {}",
|
LOG.warn("Expected checksum: {}, Actual checksum: {}",
|
||||||
DFSUtil.bytes2String(kv.value),
|
DigestUtils.md5Hex(kv.value),
|
||||||
DFSUtil.bytes2String(value));
|
DigestUtils.md5Hex(value));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (IOException | InterruptedException ex) {
|
} catch (IOException | InterruptedException ex) {
|
||||||
|
Loading…
Reference in New Issue
Block a user