diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/SignedChunksInputStream.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/SignedChunksInputStream.java
new file mode 100644
index 0000000000..a35133f965
--- /dev/null
+++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/SignedChunksInputStream.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.hadoop.ozone.s3;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Input stream implementation to read body with chunked signatures.
+ *
+ * see: https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-streaming.html
+ */
+public class SignedChunksInputStream extends InputStream {
+
+ private Pattern signatureLinePattern =
+ Pattern.compile("([0-9A-Fa-f]+);chunk-signature=.*");
+
+ private InputStream originalStream;
+
+ /**
+ * Numer of following databits. If zero, the signature line should be parsed.
+ */
+ private int remainingData = 0;
+
+ public SignedChunksInputStream(InputStream inputStream) {
+ originalStream = inputStream;
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (remainingData > 0) {
+ int curr = originalStream.read();
+ remainingData--;
+ if (remainingData == 0) {
+ //read the "\r\n" at the end of the data section
+ originalStream.read();
+ originalStream.read();
+ }
+ return curr;
+ } else {
+ remainingData = readHeader();
+ if (remainingData == -1) {
+ return -1;
+ }
+ return read();
+ }
+ }
+
+ private int readHeader() throws IOException {
+ int prev = -1;
+ int curr = 0;
+ StringBuilder buf = new StringBuilder();
+
+ //read everything until the next \r\n
+ while (!eol(prev, curr) && curr != -1) {
+ int next = originalStream.read();
+ if (next != -1) {
+ buf.append((char) next);
+ }
+ prev = curr;
+ curr = next;
+ }
+ String signatureLine = buf.toString().trim();
+ if (signatureLine.length() == 0) {
+ return -1;
+ }
+
+ //parse the data length.
+ Matcher matcher = signatureLinePattern.matcher(signatureLine);
+ if (matcher.matches()) {
+ return Integer.parseInt(matcher.group(1), 16);
+ } else {
+ throw new IOException("Invalid signature line: " + signatureLine);
+ }
+ }
+
+ private boolean eol(int prev, int curr) {
+ return prev == 13 && curr == 10;
+ }
+}
diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
index 3f88af964e..c62293827e 100644
--- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
+++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
@@ -50,9 +50,11 @@
import org.apache.hadoop.ozone.client.OzoneKeyDetails;
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.s3.SignedChunksInputStream;
import org.apache.hadoop.ozone.s3.exception.OS3Exception;
import org.apache.hadoop.ozone.s3.exception.S3ErrorTable;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.io.IOUtils;
import org.apache.http.HttpStatus;
import org.slf4j.Logger;
@@ -67,6 +69,9 @@ public class ObjectEndpoint extends EndpointBase {
private static final Logger LOG =
LoggerFactory.getLogger(ObjectEndpoint.class);
+ @Context
+ private HttpHeaders headers;
+
private List customizableGetHeaders = new ArrayList<>();
public ObjectEndpoint() {
@@ -86,7 +91,6 @@ public ObjectEndpoint() {
*/
@PUT
public Response put(
- @Context HttpHeaders headers,
@PathParam("bucket") String bucketName,
@PathParam("path") String keyPath,
@DefaultValue("STAND_ALONE") @QueryParam("replicationType")
@@ -106,6 +110,11 @@ public Response put(
OzoneOutputStream output = bucket
.createKey(keyPath, length, replicationType, replicationFactor);
+ if ("STREAMING-AWS4-HMAC-SHA256-PAYLOAD"
+ .equals(headers.getHeaderString("x-amz-content-sha256"))) {
+ body = new SignedChunksInputStream(body);
+ }
+
IOUtils.copy(body, output);
output.close();
@@ -125,7 +134,6 @@ public Response put(
*/
@GET
public Response get(
- @Context HttpHeaders headers,
@PathParam("bucket") String bucketName,
@PathParam("path") String keyPath,
InputStream body) throws IOException, OS3Exception {
@@ -227,4 +235,8 @@ public Response delete(
}
+ @VisibleForTesting
+ public void setHeaders(HttpHeaders headers) {
+ this.headers = headers;
+ }
}
diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/TestSignedChunksInputStream.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/TestSignedChunksInputStream.java
new file mode 100644
index 0000000000..27344e2e56
--- /dev/null
+++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/TestSignedChunksInputStream.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.s3;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+
+import org.apache.commons.io.IOUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test input stream parsing with signatures.
+ */
+public class TestSignedChunksInputStream {
+
+ @Test
+ public void emptyfile() throws IOException {
+ InputStream is = fileContent("0;chunk-signature"
+ +
+ "=23abb2bd920ddeeaac78a63ed808bc59fa6e7d3ef0e356474b82cdc2f8c93c40");
+ String result = IOUtils.toString(is, Charset.forName("UTF-8"));
+ Assert.assertEquals("", result);
+
+ is = fileContent("0;chunk-signature"
+ +
+ "=23abb2bd920ddeeaac78a63ed808bc59fa6e7d3ef0e356474b82cdc2f8c93c40\r"
+ + "\n");
+ result = IOUtils.toString(is, Charset.forName("UTF-8"));
+ Assert.assertEquals("", result);
+ }
+
+ @Test
+ public void singlechunk() throws IOException {
+ InputStream is = fileContent("0A;chunk-signature"
+ +
+ "=23abb2bd920ddeeaac78a63ed808bc59fa6e7d3ef0e356474b82cdc2f8c93c40\r"
+ + "\n1234567890\r\n");
+ String result = IOUtils.toString(is, Charset.forName("UTF-8"));
+ Assert.assertEquals("1234567890", result);
+ }
+
+ @Test
+ public void singlechunkwithoutend() throws IOException {
+ InputStream is = fileContent("0A;chunk-signature"
+ +
+ "=23abb2bd920ddeeaac78a63ed808bc59fa6e7d3ef0e356474b82cdc2f8c93c40\r"
+ + "\n1234567890\r\n");
+ String result = IOUtils.toString(is, Charset.forName("UTF-8"));
+ Assert.assertEquals("1234567890", result);
+ }
+
+ @Test
+ public void multichunks() throws IOException {
+ InputStream is = fileContent("0a;chunk-signature=signature\r\n"
+ + "1234567890\r\n"
+ + "05;chunk-signature=signature\r\n"
+ + "abcde\r\n");
+ String result = IOUtils.toString(is, Charset.forName("UTF-8"));
+ Assert.assertEquals("1234567890abcde", result);
+ }
+
+ private InputStream fileContent(String content) {
+ return new SignedChunksInputStream(
+ new ByteArrayInputStream(content.getBytes()));
+ }
+}
\ No newline at end of file
diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectGet.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectGet.java
index 65abb1d57e..bd54896661 100644
--- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectGet.java
+++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectGet.java
@@ -62,11 +62,11 @@ public void get() throws IOException, OS3Exception {
ObjectEndpoint rest = new ObjectEndpoint();
rest.setClient(client);
HttpHeaders headers = Mockito.mock(HttpHeaders.class);
-
+ rest.setHeaders(headers);
ByteArrayInputStream body = new ByteArrayInputStream(CONTENT.getBytes());
//WHEN
- rest.get(headers, "b1", "key1", body);
+ rest.get("b1", "key1", body);
//THEN
OzoneInputStream ozoneInputStream =
diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPutObject.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPutObject.java
index c3607dab4b..03b9a0f21c 100644
--- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPutObject.java
+++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPutObject.java
@@ -38,6 +38,7 @@
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
+import static org.mockito.Mockito.when;
/**
* Test put object.
@@ -70,11 +71,12 @@ public void testPutObject() throws IOException, OS3Exception {
//GIVEN
HttpHeaders headers = Mockito.mock(HttpHeaders.class);
ByteArrayInputStream body = new ByteArrayInputStream(CONTENT.getBytes());
+ objectEndpoint.setHeaders(headers);
//WHEN
- Response response = objectEndpoint.put(headers, bucketName, keyName,
+ Response response = objectEndpoint.put(bucketName, keyName,
ReplicationType.STAND_ALONE, ReplicationFactor.ONE, "32 * 1024 * 1024",
- CONTENT.length(), body);
+ CONTENT.length(), body);
//THEN
String volumeName = clientStub.getObjectStore()
@@ -88,4 +90,39 @@ public void testPutObject() throws IOException, OS3Exception {
Assert.assertEquals(200, response.getStatus());
Assert.assertEquals(CONTENT, keyContent);
}
+
+ @Test
+ public void testPutObjectWithSignedChunks() throws IOException, OS3Exception {
+ //GIVEN
+ HttpHeaders headers = Mockito.mock(HttpHeaders.class);
+ objectEndpoint.setHeaders(headers);
+
+ String chunkedContent = "0a;chunk-signature=signature\r\n"
+ + "1234567890\r\n"
+ + "05;chunk-signature=signature\r\n"
+ + "abcde\r\n";
+
+ when(headers.getHeaderString("x-amz-content-sha256"))
+ .thenReturn("STREAMING-AWS4-HMAC-SHA256-PAYLOAD");
+
+ //WHEN
+ Response response = objectEndpoint.put(bucketName, keyName,
+ ReplicationType.STAND_ALONE,
+ ReplicationFactor.ONE,
+ "32 * 1024 * 1024",
+ chunkedContent.length(),
+ new ByteArrayInputStream(chunkedContent.getBytes()));
+
+ //THEN
+ String volumeName = clientStub.getObjectStore()
+ .getOzoneVolumeName(bucketName);
+ OzoneInputStream ozoneInputStream =
+ clientStub.getObjectStore().getVolume(volumeName).getBucket(bucketName)
+ .readKey(keyName);
+ String keyContent =
+ IOUtils.toString(ozoneInputStream, Charset.forName("UTF-8"));
+
+ Assert.assertEquals(200, response.getStatus());
+ Assert.assertEquals("1234567890abcde", keyContent);
+ }
}
\ No newline at end of file