HDDS-898. Continue token should contain the previous dir in Ozone s3g object list. Contributed by Elek Marton.
This commit is contained in:
parent
f048512bb8
commit
54b11de2c0
@ -46,17 +46,16 @@
|
|||||||
import org.apache.hadoop.ozone.s3.endpoint.MultiDeleteResponse.Error;
|
import org.apache.hadoop.ozone.s3.endpoint.MultiDeleteResponse.Error;
|
||||||
import org.apache.hadoop.ozone.s3.exception.OS3Exception;
|
import org.apache.hadoop.ozone.s3.exception.OS3Exception;
|
||||||
import org.apache.hadoop.ozone.s3.exception.S3ErrorTable;
|
import org.apache.hadoop.ozone.s3.exception.S3ErrorTable;
|
||||||
|
import org.apache.hadoop.ozone.s3.util.ContinueToken;
|
||||||
|
import org.apache.hadoop.ozone.s3.util.S3StorageType;
|
||||||
|
|
||||||
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
|
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.hadoop.ozone.s3.util.S3StorageType;
|
import static org.apache.hadoop.ozone.s3.util.S3Consts.ENCODING_TYPE;
|
||||||
import org.apache.hadoop.ozone.s3.util.S3utils;
|
|
||||||
import org.apache.http.HttpStatus;
|
import org.apache.http.HttpStatus;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import static org.apache.hadoop.ozone.s3.util.S3Consts.ENCODING_TYPE;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Bucket level rest endpoints.
|
* Bucket level rest endpoints.
|
||||||
*/
|
*/
|
||||||
@ -104,16 +103,17 @@ public Response list(
|
|||||||
|
|
||||||
Iterator<? extends OzoneKey> ozoneKeyIterator;
|
Iterator<? extends OzoneKey> ozoneKeyIterator;
|
||||||
|
|
||||||
String decodedToken = S3utils.decodeContinueToken(continueToken);
|
ContinueToken decodedToken =
|
||||||
|
ContinueToken.decodeFromString(continueToken);
|
||||||
|
|
||||||
if (startAfter != null && continueToken != null) {
|
if (startAfter != null && continueToken != null) {
|
||||||
// If continuation token and start after both are provided, then we
|
// If continuation token and start after both are provided, then we
|
||||||
// ignore start After
|
// ignore start After
|
||||||
ozoneKeyIterator = bucket.listKeys(prefix, decodedToken);
|
ozoneKeyIterator = bucket.listKeys(prefix, decodedToken.getLastKey());
|
||||||
} else if (startAfter != null && continueToken == null) {
|
} else if (startAfter != null && continueToken == null) {
|
||||||
ozoneKeyIterator = bucket.listKeys(prefix, startAfter);
|
ozoneKeyIterator = bucket.listKeys(prefix, startAfter);
|
||||||
} else if (startAfter == null && continueToken != null){
|
} else if (startAfter == null && continueToken != null){
|
||||||
ozoneKeyIterator = bucket.listKeys(prefix, decodedToken);
|
ozoneKeyIterator = bucket.listKeys(prefix, decodedToken.getLastKey());
|
||||||
} else {
|
} else {
|
||||||
ozoneKeyIterator = bucket.listKeys(prefix);
|
ozoneKeyIterator = bucket.listKeys(prefix);
|
||||||
}
|
}
|
||||||
@ -130,6 +130,9 @@ public Response list(
|
|||||||
response.setContinueToken(continueToken);
|
response.setContinueToken(continueToken);
|
||||||
|
|
||||||
String prevDir = null;
|
String prevDir = null;
|
||||||
|
if (continueToken != null) {
|
||||||
|
prevDir = decodedToken.getLastDir();
|
||||||
|
}
|
||||||
String lastKey = null;
|
String lastKey = null;
|
||||||
int count = 0;
|
int count = 0;
|
||||||
while (ozoneKeyIterator.hasNext()) {
|
while (ozoneKeyIterator.hasNext()) {
|
||||||
@ -176,7 +179,8 @@ public Response list(
|
|||||||
response.setTruncated(false);
|
response.setTruncated(false);
|
||||||
} else if(ozoneKeyIterator.hasNext()) {
|
} else if(ozoneKeyIterator.hasNext()) {
|
||||||
response.setTruncated(true);
|
response.setTruncated(true);
|
||||||
response.setNextToken(S3utils.generateContinueToken(lastKey));
|
ContinueToken nextToken = new ContinueToken(lastKey, prevDir);
|
||||||
|
response.setNextToken(nextToken.encodeToString());
|
||||||
} else {
|
} else {
|
||||||
response.setTruncated(false);
|
response.setTruncated(false);
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,173 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.ozone.s3.util;
|
||||||
|
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.util.Objects;
|
||||||
|
|
||||||
|
import org.apache.hadoop.ozone.s3.exception.OS3Exception;
|
||||||
|
import org.apache.hadoop.ozone.s3.exception.S3ErrorTable;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
import org.apache.commons.codec.DecoderException;
|
||||||
|
import org.apache.commons.codec.binary.Hex;
|
||||||
|
import org.apache.commons.codec.digest.DigestUtils;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Token which holds enough information to continue the key iteration.
|
||||||
|
*/
|
||||||
|
public class ContinueToken {
|
||||||
|
|
||||||
|
private String lastKey;
|
||||||
|
|
||||||
|
private String lastDir;
|
||||||
|
|
||||||
|
private static final String CONTINUE_TOKEN_SEPERATOR = "-";
|
||||||
|
|
||||||
|
public ContinueToken(String lastKey, String lastDir) {
|
||||||
|
Preconditions.checkNotNull(lastKey,
|
||||||
|
"The last key can't be null in the continue token.");
|
||||||
|
this.lastKey = lastKey;
|
||||||
|
if (lastDir != null && lastDir.length() > 0) {
|
||||||
|
this.lastDir = lastDir;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Generate a continuation token which is used in get Bucket.
|
||||||
|
*
|
||||||
|
* @return if key is not null return continuation token, else returns null.
|
||||||
|
*/
|
||||||
|
public String encodeToString() {
|
||||||
|
if (this.lastKey != null) {
|
||||||
|
|
||||||
|
ByteBuffer buffer = ByteBuffer
|
||||||
|
.allocate(4 + lastKey.length()
|
||||||
|
+ (lastDir == null ? 0 : lastDir.length()));
|
||||||
|
buffer.putInt(lastKey.length());
|
||||||
|
buffer.put(lastKey.getBytes(StandardCharsets.UTF_8));
|
||||||
|
if (lastDir != null) {
|
||||||
|
buffer.put(lastDir.getBytes(StandardCharsets.UTF_8));
|
||||||
|
}
|
||||||
|
|
||||||
|
String hex = Hex.encodeHexString(buffer.array());
|
||||||
|
String digest = DigestUtils.sha256Hex(hex);
|
||||||
|
return hex + CONTINUE_TOKEN_SEPERATOR + digest;
|
||||||
|
} else {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Decode a continuation token which is used in get Bucket.
|
||||||
|
*
|
||||||
|
* @param key
|
||||||
|
* @return if key is not null return decoded token, otherwise returns null.
|
||||||
|
* @throws OS3Exception
|
||||||
|
*/
|
||||||
|
public static ContinueToken decodeFromString(String key) throws OS3Exception {
|
||||||
|
if (key != null) {
|
||||||
|
int indexSeparator = key.indexOf(CONTINUE_TOKEN_SEPERATOR);
|
||||||
|
if (indexSeparator == -1) {
|
||||||
|
throw S3ErrorTable.newError(S3ErrorTable.INVALID_ARGUMENT, key);
|
||||||
|
}
|
||||||
|
String hex = key.substring(0, indexSeparator);
|
||||||
|
String digest = key.substring(indexSeparator + 1);
|
||||||
|
try {
|
||||||
|
checkHash(key, hex, digest);
|
||||||
|
|
||||||
|
ByteBuffer buffer = ByteBuffer.wrap(Hex.decodeHex(hex));
|
||||||
|
int keySize = buffer.getInt();
|
||||||
|
|
||||||
|
byte[] actualKeyBytes = new byte[keySize];
|
||||||
|
buffer.get(actualKeyBytes);
|
||||||
|
|
||||||
|
byte[] actualDirBytes = new byte[buffer.remaining()];
|
||||||
|
buffer.get(actualDirBytes);
|
||||||
|
|
||||||
|
return new ContinueToken(
|
||||||
|
new String(actualKeyBytes, StandardCharsets.UTF_8),
|
||||||
|
new String(actualDirBytes, StandardCharsets.UTF_8)
|
||||||
|
);
|
||||||
|
|
||||||
|
} catch (DecoderException ex) {
|
||||||
|
OS3Exception os3Exception = S3ErrorTable.newError(S3ErrorTable
|
||||||
|
.INVALID_ARGUMENT, key);
|
||||||
|
os3Exception.setErrorMessage("The continuation token provided is " +
|
||||||
|
"incorrect");
|
||||||
|
throw os3Exception;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void checkHash(String key, String hex, String digest)
|
||||||
|
throws OS3Exception {
|
||||||
|
String digestActualKey = DigestUtils.sha256Hex(hex);
|
||||||
|
if (!digest.equals(digestActualKey)) {
|
||||||
|
OS3Exception ex = S3ErrorTable.newError(S3ErrorTable
|
||||||
|
.INVALID_ARGUMENT, key);
|
||||||
|
ex.setErrorMessage("The continuation token provided is incorrect");
|
||||||
|
throw ex;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getLastKey() {
|
||||||
|
return lastKey;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setLastKey(String lastKey) {
|
||||||
|
this.lastKey = lastKey;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getLastDir() {
|
||||||
|
return lastDir;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setLastDir(String lastDir) {
|
||||||
|
this.lastDir = lastDir;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object o) {
|
||||||
|
if (this == o) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (o == null || getClass() != o.getClass()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
ContinueToken that = (ContinueToken) o;
|
||||||
|
return lastKey.equals(that.lastKey) &&
|
||||||
|
Objects.equals(lastDir, that.lastDir);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
return Objects.hash(lastKey);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "ContinueToken{" +
|
||||||
|
"lastKey='" + lastKey + '\'' +
|
||||||
|
", lastDir='" + lastDir + '\'' +
|
||||||
|
'}';
|
||||||
|
}
|
||||||
|
}
|
@ -18,19 +18,11 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.hadoop.ozone.s3.util;
|
package org.apache.hadoop.ozone.s3.util;
|
||||||
|
|
||||||
import org.apache.commons.codec.DecoderException;
|
|
||||||
import org.apache.commons.codec.binary.Hex;
|
|
||||||
import org.apache.commons.codec.digest.DigestUtils;
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
|
||||||
import org.apache.hadoop.ozone.s3.exception.OS3Exception;
|
|
||||||
import org.apache.hadoop.ozone.s3.exception.S3ErrorTable;
|
|
||||||
|
|
||||||
|
|
||||||
import java.nio.charset.StandardCharsets;
|
|
||||||
import java.util.regex.Matcher;
|
import java.util.regex.Matcher;
|
||||||
|
|
||||||
import static org.apache.hadoop.ozone.s3.util.S3Consts
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
.RANGE_HEADER_MATCH_PATTERN;
|
|
||||||
|
import static org.apache.hadoop.ozone.s3.util.S3Consts.RANGE_HEADER_MATCH_PATTERN;
|
||||||
/**
|
/**
|
||||||
* Utility class for S3.
|
* Utility class for S3.
|
||||||
*/
|
*/
|
||||||
@ -40,60 +32,9 @@ public final class S3utils {
|
|||||||
private S3utils() {
|
private S3utils() {
|
||||||
|
|
||||||
}
|
}
|
||||||
private static final String CONTINUE_TOKEN_SEPERATOR = "-";
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Generate a continuation token which is used in get Bucket.
|
|
||||||
* @param key
|
|
||||||
* @return if key is not null return continuation token, else returns null.
|
|
||||||
*/
|
|
||||||
public static String generateContinueToken(String key) {
|
|
||||||
if (key != null) {
|
|
||||||
byte[] byteData = key.getBytes(StandardCharsets.UTF_8);
|
|
||||||
String hex = Hex.encodeHexString(byteData);
|
|
||||||
String digest = DigestUtils.sha256Hex(key);
|
|
||||||
return hex + CONTINUE_TOKEN_SEPERATOR + digest;
|
|
||||||
} else {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Decode a continuation token which is used in get Bucket.
|
|
||||||
* @param key
|
|
||||||
* @return if key is not null return decoded token, otherwise returns null.
|
|
||||||
* @throws OS3Exception
|
|
||||||
*/
|
|
||||||
public static String decodeContinueToken(String key) throws OS3Exception {
|
|
||||||
if (key != null) {
|
|
||||||
int indexSeparator = key.indexOf(CONTINUE_TOKEN_SEPERATOR);
|
|
||||||
if (indexSeparator == -1) {
|
|
||||||
throw S3ErrorTable.newError(S3ErrorTable.INVALID_ARGUMENT, key);
|
|
||||||
}
|
|
||||||
String hex = key.substring(0, indexSeparator);
|
|
||||||
String digest = key.substring(indexSeparator + 1);
|
|
||||||
try {
|
|
||||||
byte[] actualKeyBytes = Hex.decodeHex(hex);
|
|
||||||
String digestActualKey = DigestUtils.sha256Hex(actualKeyBytes);
|
|
||||||
if (digest.equals(digestActualKey)) {
|
|
||||||
return new String(actualKeyBytes, StandardCharsets.UTF_8);
|
|
||||||
} else {
|
|
||||||
OS3Exception ex = S3ErrorTable.newError(S3ErrorTable
|
|
||||||
.INVALID_ARGUMENT, key);
|
|
||||||
ex.setErrorMessage("The continuation token provided is incorrect");
|
|
||||||
throw ex;
|
|
||||||
}
|
|
||||||
} catch (DecoderException ex) {
|
|
||||||
OS3Exception os3Exception = S3ErrorTable.newError(S3ErrorTable
|
|
||||||
.INVALID_ARGUMENT, key);
|
|
||||||
os3Exception.setErrorMessage("The continuation token provided is " +
|
|
||||||
"incorrect");
|
|
||||||
throw os3Exception;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -211,6 +211,53 @@ public void listWithContinuationToken() throws OS3Exception, IOException {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void listWithContinuationTokenDirBreak()
|
||||||
|
throws OS3Exception, IOException {
|
||||||
|
|
||||||
|
BucketEndpoint getBucket = new BucketEndpoint();
|
||||||
|
|
||||||
|
OzoneClient ozoneClient =
|
||||||
|
createClientWithKeys(
|
||||||
|
"test/dir1/file1",
|
||||||
|
"test/dir1/file2",
|
||||||
|
"test/dir1/file3",
|
||||||
|
"test/dir2/file4",
|
||||||
|
"test/dir2/file5",
|
||||||
|
"test/dir2/file6",
|
||||||
|
"test/dir3/file7",
|
||||||
|
"test/file8");
|
||||||
|
|
||||||
|
getBucket.setClient(ozoneClient);
|
||||||
|
|
||||||
|
int maxKeys = 2;
|
||||||
|
|
||||||
|
ListObjectResponse getBucketResponse;
|
||||||
|
|
||||||
|
getBucketResponse =
|
||||||
|
(ListObjectResponse) getBucket.list("b1", "/", null, null, maxKeys,
|
||||||
|
"test/", null, null, null, null).getEntity();
|
||||||
|
|
||||||
|
Assert.assertEquals(0, getBucketResponse.getContents().size());
|
||||||
|
Assert.assertEquals(2, getBucketResponse.getCommonPrefixes().size());
|
||||||
|
Assert.assertEquals("test/dir1/",
|
||||||
|
getBucketResponse.getCommonPrefixes().get(0).getPrefix());
|
||||||
|
Assert.assertEquals("test/dir2/",
|
||||||
|
getBucketResponse.getCommonPrefixes().get(1).getPrefix());
|
||||||
|
|
||||||
|
getBucketResponse =
|
||||||
|
(ListObjectResponse) getBucket.list("b1", "/", null, null, maxKeys,
|
||||||
|
"test/", null, getBucketResponse.getNextToken(), null, null)
|
||||||
|
.getEntity();
|
||||||
|
Assert.assertEquals(1, getBucketResponse.getContents().size());
|
||||||
|
Assert.assertEquals(1, getBucketResponse.getCommonPrefixes().size());
|
||||||
|
Assert.assertEquals("test/dir3/",
|
||||||
|
getBucketResponse.getCommonPrefixes().get(0).getPrefix());
|
||||||
|
Assert.assertEquals("test/file8",
|
||||||
|
getBucketResponse.getContents().get(0).getKey());
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
/**
|
/**
|
||||||
* This test is with prefix and delimiter and verify continuation-token
|
* This test is with prefix and delimiter and verify continuation-token
|
||||||
@ -237,7 +284,6 @@ public void listWithContinuationToken1() throws OS3Exception, IOException {
|
|||||||
Assert.assertTrue(getBucketResponse.isTruncated());
|
Assert.assertTrue(getBucketResponse.isTruncated());
|
||||||
Assert.assertTrue(getBucketResponse.getCommonPrefixes().size() == 2);
|
Assert.assertTrue(getBucketResponse.getCommonPrefixes().size() == 2);
|
||||||
|
|
||||||
|
|
||||||
// 2nd time
|
// 2nd time
|
||||||
String continueToken = getBucketResponse.getNextToken();
|
String continueToken = getBucketResponse.getNextToken();
|
||||||
getBucketResponse =
|
getBucketResponse =
|
||||||
@ -246,7 +292,6 @@ public void listWithContinuationToken1() throws OS3Exception, IOException {
|
|||||||
Assert.assertTrue(getBucketResponse.isTruncated());
|
Assert.assertTrue(getBucketResponse.isTruncated());
|
||||||
Assert.assertTrue(getBucketResponse.getCommonPrefixes().size() == 2);
|
Assert.assertTrue(getBucketResponse.getCommonPrefixes().size() == 2);
|
||||||
|
|
||||||
|
|
||||||
//3rd time
|
//3rd time
|
||||||
continueToken = getBucketResponse.getNextToken();
|
continueToken = getBucketResponse.getNextToken();
|
||||||
getBucketResponse =
|
getBucketResponse =
|
||||||
|
@ -0,0 +1,50 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.ozone.s3.util;
|
||||||
|
|
||||||
|
import org.apache.hadoop.ozone.s3.exception.OS3Exception;
|
||||||
|
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test encode/decode of the continue token.
|
||||||
|
*/
|
||||||
|
public class TestContinueToken {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void encodeDecode() throws OS3Exception {
|
||||||
|
ContinueToken ct = new ContinueToken("key1", "dir1");
|
||||||
|
|
||||||
|
ContinueToken parsedToken =
|
||||||
|
ContinueToken.decodeFromString(ct.encodeToString());
|
||||||
|
|
||||||
|
Assert.assertEquals(ct, parsedToken);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void encodeDecodeNullDir() throws OS3Exception {
|
||||||
|
ContinueToken ct = new ContinueToken("key1", null);
|
||||||
|
|
||||||
|
ContinueToken parsedToken =
|
||||||
|
ContinueToken.decodeFromString(ct.encodeToString());
|
||||||
|
|
||||||
|
Assert.assertEquals(ct, parsedToken);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user