HDDS-1530. Freon support big files larger than 2GB and add --bufferSize and --validateWrites options. Contributed by Xudong Cao. (#830)

This commit is contained in:
Xudong Cao 2019-05-30 06:57:48 +08:00 committed by Xiaoyu Yao
parent 0ead2090a6
commit 9ad7cad205
2 changed files with 112 additions and 60 deletions

View File

@ -24,10 +24,8 @@
import java.io.PrintStream; import java.io.PrintStream;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
@ -67,7 +65,6 @@
import com.fasterxml.jackson.databind.ObjectWriter; import com.fasterxml.jackson.databind.ObjectWriter;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import static java.lang.Math.min; import static java.lang.Math.min;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.time.DurationFormatUtils; import org.apache.commons.lang3.time.DurationFormatUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -75,6 +72,8 @@
import picocli.CommandLine.Command; import picocli.CommandLine.Command;
import picocli.CommandLine.Option; import picocli.CommandLine.Option;
import picocli.CommandLine.ParentCommand; import picocli.CommandLine.ParentCommand;
import java.util.concurrent.LinkedBlockingQueue;
import java.security.MessageDigest;
/** /**
* Data generator tool to generate as much keys as possible. * Data generator tool to generate as much keys as possible.
@ -103,6 +102,12 @@ enum FreonOps {
private static final int QUANTILES = 10; private static final int QUANTILES = 10;
private byte[] keyValueBuffer = null;
private static final String DIGEST_ALGORITHM = "MD5";
// A common initial MesssageDigest for each key without its UUID
private MessageDigest commonInitialMD = null;
private static final Logger LOG = private static final Logger LOG =
LoggerFactory.getLogger(RandomKeyGenerator.class); LoggerFactory.getLogger(RandomKeyGenerator.class);
@ -136,7 +141,20 @@ enum FreonOps {
description = "Specifies the size of Key in bytes to be created", description = "Specifies the size of Key in bytes to be created",
defaultValue = "10240" defaultValue = "10240"
) )
private int keySize = 10240; private long keySize = 10240;
@Option(
names = "--validateWrites",
description = "Specifies whether to validate keys after writing"
)
private boolean validateWrites = false;
@Option(
names = "--bufferSize",
description = "Specifies the buffer size while writing",
defaultValue = "4096"
)
private int bufferSize = 4096;
@Option( @Option(
names = "--json", names = "--json",
@ -159,9 +177,6 @@ enum FreonOps {
private ReplicationFactor factor = ReplicationFactor.ONE; private ReplicationFactor factor = ReplicationFactor.ONE;
private int threadPoolSize; private int threadPoolSize;
private byte[] keyValue = null;
private boolean validateWrites;
private OzoneClient ozoneClient; private OzoneClient ozoneClient;
private ObjectStore objectStore; private ObjectStore objectStore;
@ -185,7 +200,7 @@ enum FreonOps {
private Long writeValidationSuccessCount; private Long writeValidationSuccessCount;
private Long writeValidationFailureCount; private Long writeValidationFailureCount;
private BlockingQueue<KeyValue> validationQueue; private BlockingQueue<KeyValidate> validationQueue;
private ArrayList<Histogram> histograms = new ArrayList<>(); private ArrayList<Histogram> histograms = new ArrayList<>();
private OzoneConfiguration ozoneConfiguration; private OzoneConfiguration ozoneConfiguration;
@ -228,8 +243,20 @@ public Void call() throws Exception {
init(freon.createOzoneConfiguration()); init(freon.createOzoneConfiguration());
} }
keyValue = keyValueBuffer = DFSUtil.string2Bytes(
DFSUtil.string2Bytes(RandomStringUtils.randomAscii(keySize - 36)); RandomStringUtils.randomAscii(bufferSize));
// Compute the common initial digest for all keys without their UUID
if (validateWrites) {
commonInitialMD = DigestUtils.getDigest(DIGEST_ALGORITHM);
int uuidLength = UUID.randomUUID().toString().length();
keySize = Math.max(uuidLength, keySize);
for (long nrRemaining = keySize - uuidLength; nrRemaining > 0;
nrRemaining -= bufferSize) {
int curSize = (int)Math.min(bufferSize, nrRemaining);
commonInitialMD.update(keyValueBuffer, 0, curSize);
}
}
LOG.info("Number of Threads: " + numOfThreads); LOG.info("Number of Threads: " + numOfThreads);
threadPoolSize = threadPoolSize =
@ -241,6 +268,7 @@ public Void call() throws Exception {
LOG.info("Number of Buckets per Volume: {}.", numOfBuckets); LOG.info("Number of Buckets per Volume: {}.", numOfBuckets);
LOG.info("Number of Keys per Bucket: {}.", numOfKeys); LOG.info("Number of Keys per Bucket: {}.", numOfKeys);
LOG.info("Key size: {} bytes", keySize); LOG.info("Key size: {} bytes", keySize);
LOG.info("Buffer size: {} bytes", bufferSize);
for (int i = 0; i < numOfVolumes; i++) { for (int i = 0; i < numOfVolumes; i++) {
String volume = "vol-" + i + "-" + String volume = "vol-" + i + "-" +
RandomStringUtils.randomNumeric(5); RandomStringUtils.randomNumeric(5);
@ -253,8 +281,7 @@ public Void call() throws Exception {
writeValidationSuccessCount = 0L; writeValidationSuccessCount = 0L;
writeValidationFailureCount = 0L; writeValidationFailureCount = 0L;
validationQueue = validationQueue = new LinkedBlockingQueue<>();
new ArrayBlockingQueue<>(numOfThreads);
validator = new Thread(new Validator()); validator = new Thread(new Validator());
validator.start(); validator.start();
LOG.info("Data validation is enabled."); LOG.info("Data validation is enabled.");
@ -512,43 +539,35 @@ long getUnsuccessfulValidationCount() {
} }
/** /**
* Returns the length of the common key value initialized. * Wrapper to hold ozone keyValidate entry.
*
* @return key value length initialized.
*/ */
@VisibleForTesting private static class KeyValidate {
long getKeyValueLength() {
return keyValue.length;
}
/** /**
* Wrapper to hold ozone key-value pair. * Bucket name.
*/
private static class KeyValue {
/**
* Bucket name associated with the key-value.
*/ */
private OzoneBucket bucket; private OzoneBucket bucket;
/**
* Key name associated with the key-value.
*/
private String key;
/**
* Value associated with the key-value.
*/
private byte[] value;
/** /**
* Constructs a new ozone key-value pair. * Key name.
*
* @param key key part
* @param value value part
*/ */
KeyValue(OzoneBucket bucket, String key, byte[] value) { private String keyName;
/**
* Digest of this key's full value.
*/
private byte[] digest;
/**
* Constructs a new ozone keyValidate.
*
* @param bucket bucket part
* @param keyName key part
* @param keyName digest of this key's full value
*/
KeyValidate(OzoneBucket bucket, String keyName, byte[] digest) {
this.bucket = bucket; this.bucket = bucket;
this.key = key; this.keyName = keyName;
this.value = value; this.digest = digest;
} }
} }
@ -625,7 +644,11 @@ public void run() {
try (Scope writeScope = GlobalTracer.get() try (Scope writeScope = GlobalTracer.get()
.buildSpan("writeKeyData") .buildSpan("writeKeyData")
.startActive(true)) { .startActive(true)) {
os.write(keyValue); for (long nrRemaining = keySize - randomValue.length;
nrRemaining > 0; nrRemaining -= bufferSize) {
int curSize = (int)Math.min(bufferSize, nrRemaining);
os.write(keyValueBuffer, 0, curSize);
}
os.write(randomValue); os.write(randomValue);
os.close(); os.close();
} }
@ -639,9 +662,10 @@ public void run() {
numberOfKeysAdded.getAndIncrement(); numberOfKeysAdded.getAndIncrement();
} }
if (validateWrites) { if (validateWrites) {
byte[] value = ArrayUtils.addAll(keyValue, randomValue); MessageDigest tmpMD = (MessageDigest)commonInitialMD.clone();
tmpMD.update(randomValue);
boolean validate = validationQueue.offer( boolean validate = validationQueue.offer(
new KeyValue(bucket, key, value)); new KeyValidate(bucket, key, tmpMD.digest()));
if (validate) { if (validate) {
LOG.trace("Key {}, is queued for validation.", key); LOG.trace("Key {}, is queued for validation.", key);
} }
@ -678,7 +702,8 @@ private final class FreonJobInfo {
private String replicationFactor; private String replicationFactor;
private String replicationType; private String replicationType;
private int keySize; private long keySize;
private int bufferSize;
private String totalThroughputPerSecond; private String totalThroughputPerSecond;
@ -705,6 +730,7 @@ private FreonJobInfo() {
this.numOfKeys = RandomKeyGenerator.this.numOfKeys; this.numOfKeys = RandomKeyGenerator.this.numOfKeys;
this.numOfThreads = RandomKeyGenerator.this.numOfThreads; this.numOfThreads = RandomKeyGenerator.this.numOfThreads;
this.keySize = RandomKeyGenerator.this.keySize; this.keySize = RandomKeyGenerator.this.keySize;
this.bufferSize = RandomKeyGenerator.this.bufferSize;
this.jobStartTime = Time.formatTime(RandomKeyGenerator.this.jobStartTime); this.jobStartTime = Time.formatTime(RandomKeyGenerator.this.jobStartTime);
this.replicationFactor = RandomKeyGenerator.this.factor.name(); this.replicationFactor = RandomKeyGenerator.this.factor.name();
this.replicationType = RandomKeyGenerator.this.type.name(); this.replicationType = RandomKeyGenerator.this.type.name();
@ -856,10 +882,14 @@ public String getStatus() {
return status; return status;
} }
public int getKeySize() { public long getKeySize() {
return keySize; return keySize;
} }
public int getBufferSize() {
return bufferSize;
}
public String getGitBaseRevision() { public String getGitBaseRevision() {
return gitBaseRevision; return gitBaseRevision;
} }
@ -925,28 +955,32 @@ public String[] getTenQuantileKeyWriteTime() {
* Validates the write done in ozone cluster. * Validates the write done in ozone cluster.
*/ */
private class Validator implements Runnable { private class Validator implements Runnable {
@Override @Override
public void run() { public void run() {
while (!completed) { DigestUtils dig = new DigestUtils(DIGEST_ALGORITHM);
try {
KeyValue kv = validationQueue.poll(5, TimeUnit.SECONDS);
if (kv != null) {
OzoneInputStream is = kv.bucket.readKey(kv.key); while (true) {
byte[] value = new byte[kv.value.length]; if (completed && validationQueue.isEmpty()) {
int length = is.read(value); return;
}
try {
KeyValidate kv = validationQueue.poll(5, TimeUnit.SECONDS);
if (kv != null) {
OzoneInputStream is = kv.bucket.readKey(kv.keyName);
dig.getMessageDigest().reset();
byte[] curDigest = dig.digest(is);
totalWritesValidated++; totalWritesValidated++;
if (length == kv.value.length && Arrays.equals(value, kv.value)) { if (MessageDigest.isEqual(kv.digest, curDigest)) {
writeValidationSuccessCount++; writeValidationSuccessCount++;
} else { } else {
writeValidationFailureCount++; writeValidationFailureCount++;
LOG.warn("Data validation error for key {}/{}/{}", LOG.warn("Data validation error for key {}/{}/{}",
kv.bucket.getVolumeName(), kv.bucket, kv.key); kv.bucket.getVolumeName(), kv.bucket, kv.keyName);
LOG.warn("Expected checksum: {}, Actual checksum: {}", LOG.warn("Expected checksum: {}, Actual checksum: {}",
DigestUtils.md5Hex(kv.value), kv.digest, curDigest);
DigestUtils.md5Hex(value));
} }
is.close();
} }
} catch (IOException | InterruptedException ex) { } catch (IOException | InterruptedException ex) {
LOG.error("Exception while validating write: " + ex.getMessage()); LOG.error("Exception while validating write: " + ex.getMessage());
@ -976,7 +1010,7 @@ public void setNumOfThreads(int numOfThreads) {
} }
@VisibleForTesting @VisibleForTesting
public void setKeySize(int keySize) { public void setKeySize(long keySize) {
this.keySize = keySize; this.keySize = keySize;
} }

View File

@ -73,7 +73,6 @@ public void defaultTest() throws Exception {
Assert.assertEquals(2, randomKeyGenerator.getNumberOfVolumesCreated()); Assert.assertEquals(2, randomKeyGenerator.getNumberOfVolumesCreated());
Assert.assertEquals(10, randomKeyGenerator.getNumberOfBucketsCreated()); Assert.assertEquals(10, randomKeyGenerator.getNumberOfBucketsCreated());
Assert.assertEquals(100, randomKeyGenerator.getNumberOfKeysAdded()); Assert.assertEquals(100, randomKeyGenerator.getNumberOfKeysAdded());
Assert.assertEquals(10240 - 36, randomKeyGenerator.getKeyValueLength());
} }
@Test @Test
@ -109,4 +108,23 @@ public void ratisTest3() throws Exception {
Assert.assertEquals(10, randomKeyGenerator.getNumberOfBucketsCreated()); Assert.assertEquals(10, randomKeyGenerator.getNumberOfBucketsCreated());
Assert.assertEquals(100, randomKeyGenerator.getNumberOfKeysAdded()); Assert.assertEquals(100, randomKeyGenerator.getNumberOfKeysAdded());
} }
@Test
public void bigFileThan2GB() throws Exception {
RandomKeyGenerator randomKeyGenerator =
new RandomKeyGenerator((OzoneConfiguration) cluster.getConf());
randomKeyGenerator.setNumOfVolumes(1);
randomKeyGenerator.setNumOfBuckets(1);
randomKeyGenerator.setNumOfKeys(1);
randomKeyGenerator.setNumOfThreads(1);
randomKeyGenerator.setKeySize(10L + Integer.MAX_VALUE);
randomKeyGenerator.setFactor(ReplicationFactor.THREE);
randomKeyGenerator.setType(ReplicationType.RATIS);
randomKeyGenerator.setValidateWrites(true);
randomKeyGenerator.call();
Assert.assertEquals(1, randomKeyGenerator.getNumberOfVolumesCreated());
Assert.assertEquals(1, randomKeyGenerator.getNumberOfBucketsCreated());
Assert.assertEquals(1, randomKeyGenerator.getNumberOfKeysAdded());
Assert.assertEquals(1, randomKeyGenerator.getSuccessfulValidationCount());
}
} }