Compare commits
10 Commits
4e6432a0ab
...
f931ede86b
Author | SHA1 | Date | |
---|---|---|---|
|
f931ede86b | ||
|
6589d9f6aa | ||
|
c63aafd7d1 | ||
|
78a08b3b78 | ||
|
9321e322d2 | ||
|
e4b070025b | ||
|
dc56fc385a | ||
|
50e6b49e05 | ||
|
1f0d9df887 | ||
|
5ea3a1bd0a |
@ -1571,12 +1571,21 @@ function hadoop_finalize_hadoop_opts
|
|||||||
|
|
||||||
## @description Finish configuring JPMS that enforced for JDK 17 and higher
|
## @description Finish configuring JPMS that enforced for JDK 17 and higher
|
||||||
## @description prior to executing Java
|
## @description prior to executing Java
|
||||||
|
## @description keep this list sync with hadoop-project/pom.xml extraJavaTestArgs
|
||||||
## @audience private
|
## @audience private
|
||||||
## @stability evolving
|
## @stability evolving
|
||||||
## @replaceable yes
|
## @replaceable yes
|
||||||
function hadoop_finalize_jpms_opts
|
function hadoop_finalize_jpms_opts
|
||||||
{
|
{
|
||||||
hadoop_add_param HADOOP_OPTS IgnoreUnrecognizedVMOptions "-XX:+IgnoreUnrecognizedVMOptions"
|
hadoop_add_param HADOOP_OPTS IgnoreUnrecognizedVMOptions "-XX:+IgnoreUnrecognizedVMOptions"
|
||||||
|
hadoop_add_param HADOOP_OPTS open.java.io "--add-opens=java.base/java.io=ALL-UNNAMED"
|
||||||
|
hadoop_add_param HADOOP_OPTS open.java.lang "--add-opens=java.base/java.lang=ALL-UNNAMED"
|
||||||
|
hadoop_add_param HADOOP_OPTS open.java.lang.reflect "--add-opens=java.base/java.lang.reflect=ALL-UNNAMED"
|
||||||
|
hadoop_add_param HADOOP_OPTS open.java.math "--add-opens=java.base/java.math=ALL-UNNAMED"
|
||||||
|
hadoop_add_param HADOOP_OPTS open.java.net "--add-opens=java.base/java.net=ALL-UNNAMED"
|
||||||
|
hadoop_add_param HADOOP_OPTS open.java.text "--add-opens=java.base/java.text=ALL-UNNAMED"
|
||||||
|
hadoop_add_param HADOOP_OPTS open.java.util "--add-opens=java.base/java.util=ALL-UNNAMED"
|
||||||
|
hadoop_add_param HADOOP_OPTS open.java.util.concurrent "--add-opens=java.base/java.util.concurrent=ALL-UNNAMED"
|
||||||
hadoop_add_param HADOOP_OPTS open.java.util.zip "--add-opens=java.base/java.util.zip=ALL-UNNAMED"
|
hadoop_add_param HADOOP_OPTS open.java.util.zip "--add-opens=java.base/java.util.zip=ALL-UNNAMED"
|
||||||
hadoop_add_param HADOOP_OPTS open.sun.security.util "--add-opens=java.base/sun.security.util=ALL-UNNAMED"
|
hadoop_add_param HADOOP_OPTS open.sun.security.util "--add-opens=java.base/sun.security.util=ALL-UNNAMED"
|
||||||
hadoop_add_param HADOOP_OPTS open.sun.security.x509 "--add-opens=java.base/sun.security.x509=ALL-UNNAMED"
|
hadoop_add_param HADOOP_OPTS open.sun.security.x509 "--add-opens=java.base/sun.security.x509=ALL-UNNAMED"
|
||||||
|
@ -68,7 +68,8 @@
|
|||||||
import org.apache.hadoop.util.Shell;
|
import org.apache.hadoop.util.Shell;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
|
||||||
import static org.apache.hadoop.fs.VectoredReadUtils.validateAndSortRanges;
|
import static org.apache.hadoop.fs.VectoredReadUtils.sortRangeList;
|
||||||
|
import static org.apache.hadoop.fs.VectoredReadUtils.validateRangeRequest;
|
||||||
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
|
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
|
||||||
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BYTES;
|
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BYTES;
|
||||||
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_EXCEPTIONS;
|
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_EXCEPTIONS;
|
||||||
@ -320,10 +321,10 @@ public void readVectored(List<? extends FileRange> ranges,
|
|||||||
IntFunction<ByteBuffer> allocate) throws IOException {
|
IntFunction<ByteBuffer> allocate) throws IOException {
|
||||||
|
|
||||||
// Validate, but do not pass in a file length as it may change.
|
// Validate, but do not pass in a file length as it may change.
|
||||||
List<? extends FileRange> sortedRanges = validateAndSortRanges(ranges,
|
List<? extends FileRange> sortedRanges = sortRangeList(ranges);
|
||||||
Optional.empty());
|
|
||||||
// Set up all of the futures, so that we can use them if things fail
|
// Set up all of the futures, so that we can use them if things fail
|
||||||
for(FileRange range: sortedRanges) {
|
for(FileRange range: sortedRanges) {
|
||||||
|
validateRangeRequest(range);
|
||||||
range.setData(new CompletableFuture<>());
|
range.setData(new CompletableFuture<>());
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
|
@ -29,6 +29,7 @@
|
|||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.StringJoiner;
|
import java.util.StringJoiner;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.function.Supplier;
|
import java.util.function.Supplier;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
@ -40,6 +41,7 @@
|
|||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.fs.audit.CommonAuditContext;
|
import org.apache.hadoop.fs.audit.CommonAuditContext;
|
||||||
import org.apache.hadoop.fs.store.LogExactlyOnce;
|
import org.apache.hadoop.fs.store.LogExactlyOnce;
|
||||||
|
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
|
||||||
import org.apache.http.NameValuePair;
|
import org.apache.http.NameValuePair;
|
||||||
import org.apache.http.client.utils.URLEncodedUtils;
|
import org.apache.http.client.utils.URLEncodedUtils;
|
||||||
|
|
||||||
@ -57,6 +59,13 @@
|
|||||||
* {@code org.apache.hadoop.fs.s3a.audit.TestHttpReferrerAuditHeader}
|
* {@code org.apache.hadoop.fs.s3a.audit.TestHttpReferrerAuditHeader}
|
||||||
* so as to verify that header generation in the S3A auditors, and
|
* so as to verify that header generation in the S3A auditors, and
|
||||||
* S3 log parsing, all work.
|
* S3 log parsing, all work.
|
||||||
|
* <p>
|
||||||
|
* This header may be shared across multiple threads at the same time.
|
||||||
|
* so some methods are marked as synchronized, specifically those reading
|
||||||
|
* or writing the attribute map.
|
||||||
|
* <p>
|
||||||
|
* For the same reason, maps and lists passed down during construction are
|
||||||
|
* copied into thread safe structures.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
@InterfaceStability.Unstable
|
@InterfaceStability.Unstable
|
||||||
@ -81,6 +90,14 @@ public final class HttpReferrerAuditHeader {
|
|||||||
private static final LogExactlyOnce WARN_OF_URL_CREATION =
|
private static final LogExactlyOnce WARN_OF_URL_CREATION =
|
||||||
new LogExactlyOnce(LOG);
|
new LogExactlyOnce(LOG);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Log for warning of an exception raised when building
|
||||||
|
* the referrer header, including building the evaluated
|
||||||
|
* attributes.
|
||||||
|
*/
|
||||||
|
private static final LogExactlyOnce ERROR_BUILDING_REFERRER_HEADER =
|
||||||
|
new LogExactlyOnce(LOG);
|
||||||
|
|
||||||
/** Context ID. */
|
/** Context ID. */
|
||||||
private final String contextId;
|
private final String contextId;
|
||||||
|
|
||||||
@ -122,7 +139,11 @@ public final class HttpReferrerAuditHeader {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Instantiate.
|
* Instantiate.
|
||||||
*
|
* <p>
|
||||||
|
* All maps/enums passed down are copied into thread safe equivalents.
|
||||||
|
* as their origin is unknown and cannot be guaranteed to
|
||||||
|
* not be shared.
|
||||||
|
* <p>
|
||||||
* Context and operationId are expected to be well formed
|
* Context and operationId are expected to be well formed
|
||||||
* numeric/hex strings, at least adequate to be
|
* numeric/hex strings, at least adequate to be
|
||||||
* used as individual path elements in a URL.
|
* used as individual path elements in a URL.
|
||||||
@ -130,15 +151,15 @@ public final class HttpReferrerAuditHeader {
|
|||||||
private HttpReferrerAuditHeader(
|
private HttpReferrerAuditHeader(
|
||||||
final Builder builder) {
|
final Builder builder) {
|
||||||
this.contextId = requireNonNull(builder.contextId);
|
this.contextId = requireNonNull(builder.contextId);
|
||||||
this.evaluated = builder.evaluated;
|
this.evaluated = new ConcurrentHashMap<>(builder.evaluated);
|
||||||
this.filter = builder.filter;
|
this.filter = ImmutableSet.copyOf(builder.filter);
|
||||||
this.operationName = requireNonNull(builder.operationName);
|
this.operationName = requireNonNull(builder.operationName);
|
||||||
this.path1 = builder.path1;
|
this.path1 = builder.path1;
|
||||||
this.path2 = builder.path2;
|
this.path2 = builder.path2;
|
||||||
this.spanId = requireNonNull(builder.spanId);
|
this.spanId = requireNonNull(builder.spanId);
|
||||||
|
|
||||||
// copy the parameters from the builder and extend
|
// copy the parameters from the builder and extend
|
||||||
attributes = builder.attributes;
|
attributes = new ConcurrentHashMap<>(builder.attributes);
|
||||||
|
|
||||||
addAttribute(PARAM_OP, operationName);
|
addAttribute(PARAM_OP, operationName);
|
||||||
addAttribute(PARAM_PATH, path1);
|
addAttribute(PARAM_PATH, path1);
|
||||||
@ -166,17 +187,18 @@ private HttpReferrerAuditHeader(
|
|||||||
* per entry, and "" returned.
|
* per entry, and "" returned.
|
||||||
* @return a referrer string or ""
|
* @return a referrer string or ""
|
||||||
*/
|
*/
|
||||||
public String buildHttpReferrer() {
|
public synchronized String buildHttpReferrer() {
|
||||||
|
|
||||||
String header;
|
String header;
|
||||||
try {
|
try {
|
||||||
|
Map<String, String> requestAttrs = new HashMap<>(attributes);
|
||||||
String queries;
|
String queries;
|
||||||
// Update any params which are dynamically evaluated
|
// Update any params which are dynamically evaluated
|
||||||
evaluated.forEach((key, eval) ->
|
evaluated.forEach((key, eval) ->
|
||||||
addAttribute(key, eval.get()));
|
requestAttrs.put(key, eval.get()));
|
||||||
// now build the query parameters from all attributes, static and
|
// now build the query parameters from all attributes, static and
|
||||||
// evaluated, stripping out any from the filter
|
// evaluated, stripping out any from the filter
|
||||||
queries = attributes.entrySet().stream()
|
queries = requestAttrs.entrySet().stream()
|
||||||
.filter(e -> !filter.contains(e.getKey()))
|
.filter(e -> !filter.contains(e.getKey()))
|
||||||
.map(e -> e.getKey() + "=" + e.getValue())
|
.map(e -> e.getKey() + "=" + e.getValue())
|
||||||
.collect(Collectors.joining("&"));
|
.collect(Collectors.joining("&"));
|
||||||
@ -189,7 +211,14 @@ public String buildHttpReferrer() {
|
|||||||
} catch (URISyntaxException e) {
|
} catch (URISyntaxException e) {
|
||||||
WARN_OF_URL_CREATION.warn("Failed to build URI for auditor: " + e, e);
|
WARN_OF_URL_CREATION.warn("Failed to build URI for auditor: " + e, e);
|
||||||
header = "";
|
header = "";
|
||||||
|
} catch (RuntimeException e) {
|
||||||
|
// do not let failure to build the header stop the request being
|
||||||
|
// issued.
|
||||||
|
ERROR_BUILDING_REFERRER_HEADER.warn("Failed to construct referred header {}", e.toString());
|
||||||
|
LOG.debug("Full stack", e);
|
||||||
|
header = "";
|
||||||
}
|
}
|
||||||
|
|
||||||
return header;
|
return header;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -200,7 +229,7 @@ public String buildHttpReferrer() {
|
|||||||
* @param key query key
|
* @param key query key
|
||||||
* @param value query value
|
* @param value query value
|
||||||
*/
|
*/
|
||||||
private void addAttribute(String key,
|
private synchronized void addAttribute(String key,
|
||||||
String value) {
|
String value) {
|
||||||
if (StringUtils.isNotEmpty(value)) {
|
if (StringUtils.isNotEmpty(value)) {
|
||||||
attributes.put(key, value);
|
attributes.put(key, value);
|
||||||
|
@ -623,8 +623,13 @@ support -and fallback everywhere else.
|
|||||||
|
|
||||||
The restriction "no overlapping ranges" was only initially enforced in
|
The restriction "no overlapping ranges" was only initially enforced in
|
||||||
the S3A connector, which would raise `UnsupportedOperationException`.
|
the S3A connector, which would raise `UnsupportedOperationException`.
|
||||||
Adding the range check as a precondition for all implementations guarantees
|
Adding the range check as a precondition for all implementations (Raw Local
|
||||||
consistent behavior everywhere.
|
being an exception) guarantees consistent behavior everywhere.
|
||||||
|
The reason Raw Local doesn't have this precondition is ChecksumFileSystem
|
||||||
|
creates the chunked ranges based on the checksum chunk size and then calls
|
||||||
|
readVectored on Raw Local which may lead to overlapping ranges in some cases.
|
||||||
|
For details see [HADOOP-19291](https://issues.apache.org/jira/browse/HADOOP-19291)
|
||||||
|
|
||||||
For reliable use with older hadoop releases with the API: sort the list of ranges
|
For reliable use with older hadoop releases with the API: sort the list of ranges
|
||||||
and check for overlaps before calling `readVectored()`.
|
and check for overlaps before calling `readVectored()`.
|
||||||
|
|
||||||
|
@ -270,13 +270,23 @@ public void testSomeRangesMergedSomeUnmerged() throws Exception {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Vectored IO doesn't support overlapping ranges.
|
* Most file systems won't support overlapping ranges.
|
||||||
|
* Currently, only Raw Local supports it.
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testOverlappingRanges() throws Exception {
|
public void testOverlappingRanges() throws Exception {
|
||||||
verifyExceptionalVectoredRead(
|
if (!isSupported(VECTOR_IO_OVERLAPPING_RANGES)) {
|
||||||
getSampleOverlappingRanges(),
|
verifyExceptionalVectoredRead(
|
||||||
IllegalArgumentException.class);
|
getSampleOverlappingRanges(),
|
||||||
|
IllegalArgumentException.class);
|
||||||
|
} else {
|
||||||
|
try (FSDataInputStream in = openVectorFile()) {
|
||||||
|
List<FileRange> fileRanges = getSampleOverlappingRanges();
|
||||||
|
in.readVectored(fileRanges, allocate);
|
||||||
|
validateVectoredReadResult(fileRanges, DATASET, 0);
|
||||||
|
returnBuffersToPoolPostRead(fileRanges, pool);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -284,9 +294,18 @@ public void testOverlappingRanges() throws Exception {
|
|||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testSameRanges() throws Exception {
|
public void testSameRanges() throws Exception {
|
||||||
verifyExceptionalVectoredRead(
|
if (!isSupported(VECTOR_IO_OVERLAPPING_RANGES)) {
|
||||||
getSampleSameRanges(),
|
verifyExceptionalVectoredRead(
|
||||||
IllegalArgumentException.class);
|
getSampleSameRanges(),
|
||||||
|
IllegalArgumentException.class);
|
||||||
|
} else {
|
||||||
|
try (FSDataInputStream in = openVectorFile()) {
|
||||||
|
List<FileRange> fileRanges = getSampleSameRanges();
|
||||||
|
in.readVectored(fileRanges, allocate);
|
||||||
|
validateVectoredReadResult(fileRanges, DATASET, 0);
|
||||||
|
returnBuffersToPoolPostRead(fileRanges, pool);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -329,10 +348,9 @@ public void testSomeRandomNonOverlappingRanges() throws Exception {
|
|||||||
public void testConsecutiveRanges() throws Exception {
|
public void testConsecutiveRanges() throws Exception {
|
||||||
List<FileRange> fileRanges = new ArrayList<>();
|
List<FileRange> fileRanges = new ArrayList<>();
|
||||||
final int offset = 500;
|
final int offset = 500;
|
||||||
final int length = 100;
|
final int length = 2011;
|
||||||
range(fileRanges, offset, length);
|
range(fileRanges, offset, length);
|
||||||
range(fileRanges, 600, 200);
|
range(fileRanges, offset + length, length);
|
||||||
range(fileRanges, 800, 100);
|
|
||||||
try (FSDataInputStream in = openVectorFile()) {
|
try (FSDataInputStream in = openVectorFile()) {
|
||||||
in.readVectored(fileRanges, allocate);
|
in.readVectored(fileRanges, allocate);
|
||||||
validateVectoredReadResult(fileRanges, DATASET, 0);
|
validateVectoredReadResult(fileRanges, DATASET, 0);
|
||||||
|
@ -261,4 +261,6 @@ public interface ContractOptions {
|
|||||||
* Does vector read check file length on open rather than in the read call?
|
* Does vector read check file length on open rather than in the read call?
|
||||||
*/
|
*/
|
||||||
String VECTOR_IO_EARLY_EOF_CHECK = "vector-io-early-eof-check";
|
String VECTOR_IO_EARLY_EOF_CHECK = "vector-io-early-eof-check";
|
||||||
|
|
||||||
|
String VECTOR_IO_OVERLAPPING_RANGES = "vector-io-overlapping-ranges";
|
||||||
}
|
}
|
||||||
|
@ -142,4 +142,9 @@
|
|||||||
<value>true</value>
|
<value>true</value>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>fs.contract.vector-io-overlapping-ranges</name>
|
||||||
|
<value>true</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
</configuration>
|
</configuration>
|
||||||
|
@ -233,7 +233,7 @@
|
|||||||
<configuration>
|
<configuration>
|
||||||
<reuseForks>false</reuseForks>
|
<reuseForks>false</reuseForks>
|
||||||
<forkedProcessTimeoutInSeconds>900</forkedProcessTimeoutInSeconds>
|
<forkedProcessTimeoutInSeconds>900</forkedProcessTimeoutInSeconds>
|
||||||
<argLine>-Xmx1024m -XX:+HeapDumpOnOutOfMemoryError</argLine>
|
<argLine>${maven-surefire-plugin.argLine} -Xmx1024m -XX:+HeapDumpOnOutOfMemoryError</argLine>
|
||||||
<environmentVariables>
|
<environmentVariables>
|
||||||
<!-- HADOOP_HOME required for tests on Windows to find winutils -->
|
<!-- HADOOP_HOME required for tests on Windows to find winutils -->
|
||||||
<HADOOP_HOME>${hadoop.common.build.dir}</HADOOP_HOME>
|
<HADOOP_HOME>${hadoop.common.build.dir}</HADOOP_HOME>
|
||||||
|
@ -1370,7 +1370,7 @@ public byte[] getXAttr(Path f, String name) throws IOException {
|
|||||||
JSONObject json = (JSONObject) HttpFSUtils.jsonParse(conn);
|
JSONObject json = (JSONObject) HttpFSUtils.jsonParse(conn);
|
||||||
Map<String, byte[]> xAttrs = createXAttrMap(
|
Map<String, byte[]> xAttrs = createXAttrMap(
|
||||||
(JSONArray) json.get(XATTRS_JSON));
|
(JSONArray) json.get(XATTRS_JSON));
|
||||||
return xAttrs != null ? xAttrs.get(name) : null;
|
return xAttrs.get(name);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Convert xAttrs json to xAttrs map */
|
/** Convert xAttrs json to xAttrs map */
|
||||||
|
@ -193,6 +193,8 @@ protected FSEditLogOp nextOp() throws IOException {
|
|||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
prevException = e;
|
prevException = e;
|
||||||
state = State.STREAM_FAILED;
|
state = State.STREAM_FAILED;
|
||||||
|
LOG.warn("Got error skipUntil edit log input stream {}.", streams[curIdx].getName());
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
state = State.OK;
|
state = State.OK;
|
||||||
break;
|
break;
|
||||||
|
@ -0,0 +1,67 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.namenode;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.MkdirOp;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
public class TestRedundantEditLogInputStream {
|
||||||
|
private static final String FAKE_EDIT_STREAM_NAME = "FAKE_STREAM";
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNextOp() throws IOException {
|
||||||
|
EditLogInputStream fakeStream1 = mock(EditLogInputStream.class);
|
||||||
|
EditLogInputStream fakeStream2 = mock(EditLogInputStream.class);
|
||||||
|
ArrayList<EditLogInputStream> list = new ArrayList();
|
||||||
|
list.add(fakeStream1);
|
||||||
|
list.add(fakeStream2);
|
||||||
|
for (int i = 0; i < list.size(); i++) {
|
||||||
|
EditLogInputStream stream = list.get(i);
|
||||||
|
when(stream.getName()).thenReturn(FAKE_EDIT_STREAM_NAME + i);
|
||||||
|
when(stream.getFirstTxId()).thenReturn(1L);
|
||||||
|
when(stream.getLastTxId()).thenReturn(2L);
|
||||||
|
when(stream.length()).thenReturn(1L);
|
||||||
|
}
|
||||||
|
when(fakeStream1.skipUntil(1)).thenThrow(new IOException("skipUntil failed."));
|
||||||
|
when(fakeStream2.skipUntil(1)).thenReturn(true);
|
||||||
|
FSEditLogOp op = new MkdirOp();
|
||||||
|
op.setTransactionId(100);
|
||||||
|
when(fakeStream2.readOp()).thenReturn(op);
|
||||||
|
|
||||||
|
LogCapturer capture = LogCapturer.captureLogs(RedundantEditLogInputStream.LOG);
|
||||||
|
RedundantEditLogInputStream redundantEditLogInputStream =
|
||||||
|
new RedundantEditLogInputStream(list, 1);
|
||||||
|
|
||||||
|
FSEditLogOp returnOp = redundantEditLogInputStream.nextOp();
|
||||||
|
String log = capture.getOutput();
|
||||||
|
assertTrue(log.contains("Got error skipUntil edit log input stream FAKE_STREAM0"));
|
||||||
|
assertTrue(log.contains("Got error reading edit log input stream FAKE_STREAM0; "
|
||||||
|
+ "failing over to edit log FAKE_STREAM1"));
|
||||||
|
assertEquals(op, returnOp);
|
||||||
|
}
|
||||||
|
}
|
Binary file not shown.
@ -302,6 +302,8 @@ public synchronized void close() throws IOException {
|
|||||||
try {
|
try {
|
||||||
if (in != null) {
|
if (in != null) {
|
||||||
in.close();
|
in.close();
|
||||||
|
} else if (fileIn != null) {
|
||||||
|
fileIn.close();
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
if (decompressor != null) {
|
if (decompressor != null) {
|
||||||
|
@ -98,48 +98,53 @@ public void initialize(InputSplit genericSplit,
|
|||||||
MRJobConfig.INPUT_FILE_OPTION_PREFIX,
|
MRJobConfig.INPUT_FILE_OPTION_PREFIX,
|
||||||
MRJobConfig.INPUT_FILE_MANDATORY_PREFIX);
|
MRJobConfig.INPUT_FILE_MANDATORY_PREFIX);
|
||||||
fileIn = FutureIO.awaitFuture(builder.build());
|
fileIn = FutureIO.awaitFuture(builder.build());
|
||||||
|
|
||||||
CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);
|
|
||||||
if (null!=codec) {
|
|
||||||
isCompressedInput = true;
|
|
||||||
decompressor = CodecPool.getDecompressor(codec);
|
|
||||||
if (codec instanceof SplittableCompressionCodec) {
|
|
||||||
final SplitCompressionInputStream cIn =
|
|
||||||
((SplittableCompressionCodec)codec).createInputStream(
|
|
||||||
fileIn, decompressor, start, end,
|
|
||||||
SplittableCompressionCodec.READ_MODE.BYBLOCK);
|
|
||||||
in = new CompressedSplitLineReader(cIn, job,
|
|
||||||
this.recordDelimiterBytes);
|
|
||||||
start = cIn.getAdjustedStart();
|
|
||||||
end = cIn.getAdjustedEnd();
|
|
||||||
filePosition = cIn;
|
|
||||||
} else {
|
|
||||||
if (start != 0) {
|
|
||||||
// So we have a split that is only part of a file stored using
|
|
||||||
// a Compression codec that cannot be split.
|
|
||||||
throw new IOException("Cannot seek in " +
|
|
||||||
codec.getClass().getSimpleName() + " compressed stream");
|
|
||||||
}
|
|
||||||
|
|
||||||
in = new SplitLineReader(codec.createInputStream(fileIn,
|
try {
|
||||||
decompressor), job, this.recordDelimiterBytes);
|
CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);
|
||||||
|
if (null!=codec) {
|
||||||
|
isCompressedInput = true;
|
||||||
|
decompressor = CodecPool.getDecompressor(codec);
|
||||||
|
if (codec instanceof SplittableCompressionCodec) {
|
||||||
|
final SplitCompressionInputStream cIn =
|
||||||
|
((SplittableCompressionCodec)codec).createInputStream(
|
||||||
|
fileIn, decompressor, start, end,
|
||||||
|
SplittableCompressionCodec.READ_MODE.BYBLOCK);
|
||||||
|
in = new CompressedSplitLineReader(cIn, job,
|
||||||
|
this.recordDelimiterBytes);
|
||||||
|
start = cIn.getAdjustedStart();
|
||||||
|
end = cIn.getAdjustedEnd();
|
||||||
|
filePosition = cIn;
|
||||||
|
} else {
|
||||||
|
if (start != 0) {
|
||||||
|
// So we have a split that is only part of a file stored using
|
||||||
|
// a Compression codec that cannot be split.
|
||||||
|
throw new IOException("Cannot seek in " +
|
||||||
|
codec.getClass().getSimpleName() + " compressed stream");
|
||||||
|
}
|
||||||
|
|
||||||
|
in = new SplitLineReader(codec.createInputStream(fileIn,
|
||||||
|
decompressor), job, this.recordDelimiterBytes);
|
||||||
|
filePosition = fileIn;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
fileIn.seek(start);
|
||||||
|
in = new UncompressedSplitLineReader(
|
||||||
|
fileIn, job, this.recordDelimiterBytes, split.getLength());
|
||||||
filePosition = fileIn;
|
filePosition = fileIn;
|
||||||
}
|
}
|
||||||
} else {
|
// If this is not the first split, we always throw away first record
|
||||||
fileIn.seek(start);
|
// because we always (except the last split) read one extra line in
|
||||||
in = new UncompressedSplitLineReader(
|
// next() method.
|
||||||
fileIn, job, this.recordDelimiterBytes, split.getLength());
|
if (start != 0) {
|
||||||
filePosition = fileIn;
|
start += in.readLine(new Text(), 0, maxBytesToConsume(start));
|
||||||
|
}
|
||||||
|
this.pos = start;
|
||||||
|
} catch (Exception e) {
|
||||||
|
fileIn.close();
|
||||||
|
throw e;
|
||||||
}
|
}
|
||||||
// If this is not the first split, we always throw away first record
|
|
||||||
// because we always (except the last split) read one extra line in
|
|
||||||
// next() method.
|
|
||||||
if (start != 0) {
|
|
||||||
start += in.readLine(new Text(), 0, maxBytesToConsume(start));
|
|
||||||
}
|
|
||||||
this.pos = start;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private int maxBytesToConsume(long pos) {
|
private int maxBytesToConsume(long pos) {
|
||||||
return isCompressedInput
|
return isCompressedInput
|
||||||
|
@ -167,8 +167,19 @@
|
|||||||
<enforced.java.version>[${javac.version},)</enforced.java.version>
|
<enforced.java.version>[${javac.version},)</enforced.java.version>
|
||||||
<enforced.maven.version>[3.3.0,)</enforced.maven.version>
|
<enforced.maven.version>[3.3.0,)</enforced.maven.version>
|
||||||
|
|
||||||
|
<!-- keep this list sync with
|
||||||
|
hadoop-common-project/hadoop-common/src/main/bin/hadoop-functions.sh#hadoop_finalize_jpms_opts
|
||||||
|
-->
|
||||||
<extraJavaTestArgs>
|
<extraJavaTestArgs>
|
||||||
-XX:+IgnoreUnrecognizedVMOptions
|
-XX:+IgnoreUnrecognizedVMOptions
|
||||||
|
--add-opens=java.base/java.io=ALL-UNNAMED
|
||||||
|
--add-opens=java.base/java.lang=ALL-UNNAMED
|
||||||
|
--add-opens=java.base/java.lang.reflect=ALL-UNNAMED
|
||||||
|
--add-opens=java.base/java.math=ALL-UNNAMED
|
||||||
|
--add-opens=java.base/java.net=ALL-UNNAMED
|
||||||
|
--add-opens=java.base/java.text=ALL-UNNAMED
|
||||||
|
--add-opens=java.base/java.util=ALL-UNNAMED
|
||||||
|
--add-opens=java.base/java.util.concurrent=ALL-UNNAMED
|
||||||
--add-opens=java.base/java.util.zip=ALL-UNNAMED
|
--add-opens=java.base/java.util.zip=ALL-UNNAMED
|
||||||
--add-opens=java.base/sun.security.util=ALL-UNNAMED
|
--add-opens=java.base/sun.security.util=ALL-UNNAMED
|
||||||
--add-opens=java.base/sun.security.x509=ALL-UNNAMED
|
--add-opens=java.base/sun.security.x509=ALL-UNNAMED
|
||||||
@ -2743,6 +2754,16 @@
|
|||||||
</dependencies>
|
</dependencies>
|
||||||
</dependencyManagement>
|
</dependencyManagement>
|
||||||
</profile>
|
</profile>
|
||||||
|
<!-- We added this profile to support compilation for JDK 9 and above. -->
|
||||||
|
<profile>
|
||||||
|
<id>java9</id>
|
||||||
|
<activation>
|
||||||
|
<jdk>[9,)</jdk>
|
||||||
|
</activation>
|
||||||
|
<properties>
|
||||||
|
<maven.compiler.release>${javac.version}</maven.compiler.release>
|
||||||
|
</properties>
|
||||||
|
</profile>
|
||||||
</profiles>
|
</profiles>
|
||||||
|
|
||||||
<repositories>
|
<repositories>
|
||||||
|
@ -398,6 +398,21 @@ private Constants() {
|
|||||||
public static final Duration DEFAULT_CONNECTION_ACQUISITION_TIMEOUT_DURATION =
|
public static final Duration DEFAULT_CONNECTION_ACQUISITION_TIMEOUT_DURATION =
|
||||||
Duration.ofSeconds(60);
|
Duration.ofSeconds(60);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Timeout for uploading all of a small object or a single part
|
||||||
|
* of a larger one.
|
||||||
|
* {@value}.
|
||||||
|
* Default unit is milliseconds for consistency with other options.
|
||||||
|
*/
|
||||||
|
public static final String PART_UPLOAD_TIMEOUT =
|
||||||
|
"fs.s3a.connection.part.upload.timeout";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Default part upload timeout: 15 minutes.
|
||||||
|
*/
|
||||||
|
public static final Duration DEFAULT_PART_UPLOAD_TIMEOUT =
|
||||||
|
Duration.ofMinutes(15);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Should TCP Keepalive be enabled on the socket?
|
* Should TCP Keepalive be enabled on the socket?
|
||||||
* This adds some network IO, but finds failures faster.
|
* This adds some network IO, but finds failures faster.
|
||||||
|
@ -271,8 +271,10 @@ protected ClientOverrideConfiguration.Builder createClientOverrideConfiguration(
|
|||||||
* <li>If endpoint is configured via via fs.s3a.endpoint, set it.
|
* <li>If endpoint is configured via via fs.s3a.endpoint, set it.
|
||||||
* If no region is configured, try to parse region from endpoint. </li>
|
* If no region is configured, try to parse region from endpoint. </li>
|
||||||
* <li> If no region is configured, and it could not be parsed from the endpoint,
|
* <li> If no region is configured, and it could not be parsed from the endpoint,
|
||||||
* set the default region as US_EAST_2 and enable cross region access. </li>
|
* set the default region as US_EAST_2</li>
|
||||||
* <li> If configured region is empty, fallback to SDK resolution chain. </li>
|
* <li> If configured region is empty, fallback to SDK resolution chain. </li>
|
||||||
|
* <li> S3 cross region is enabled by default irrespective of region or endpoint
|
||||||
|
* is set or not.</li>
|
||||||
* </ol>
|
* </ol>
|
||||||
*
|
*
|
||||||
* @param builder S3 client builder.
|
* @param builder S3 client builder.
|
||||||
|
@ -1286,6 +1286,13 @@ protected RequestFactory createRequestFactory() {
|
|||||||
STORAGE_CLASS);
|
STORAGE_CLASS);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// optional custom timeout for bulk uploads
|
||||||
|
Duration partUploadTimeout = ConfigurationHelper.getDuration(getConf(),
|
||||||
|
PART_UPLOAD_TIMEOUT,
|
||||||
|
DEFAULT_PART_UPLOAD_TIMEOUT,
|
||||||
|
TimeUnit.MILLISECONDS,
|
||||||
|
Duration.ZERO);
|
||||||
|
|
||||||
return RequestFactoryImpl.builder()
|
return RequestFactoryImpl.builder()
|
||||||
.withBucket(requireNonNull(bucket))
|
.withBucket(requireNonNull(bucket))
|
||||||
.withCannedACL(getCannedACL())
|
.withCannedACL(getCannedACL())
|
||||||
@ -1295,6 +1302,7 @@ protected RequestFactory createRequestFactory() {
|
|||||||
.withContentEncoding(contentEncoding)
|
.withContentEncoding(contentEncoding)
|
||||||
.withStorageClass(storageClass)
|
.withStorageClass(storageClass)
|
||||||
.withMultipartUploadEnabled(isMultipartUploadEnabled)
|
.withMultipartUploadEnabled(isMultipartUploadEnabled)
|
||||||
|
.withPartUploadTimeout(partUploadTimeout)
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -700,7 +700,8 @@ public SdkResponse modifyResponse(Context.ModifyResponse context,
|
|||||||
* span is deactivated.
|
* span is deactivated.
|
||||||
* Package-private for testing.
|
* Package-private for testing.
|
||||||
*/
|
*/
|
||||||
private final class WrappingAuditSpan extends AbstractAuditSpanImpl {
|
@VisibleForTesting
|
||||||
|
final class WrappingAuditSpan extends AbstractAuditSpanImpl {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Inner span.
|
* Inner span.
|
||||||
@ -792,6 +793,15 @@ public boolean isValidSpan() {
|
|||||||
return isValid && span.isValidSpan();
|
return isValid && span.isValidSpan();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the inner span.
|
||||||
|
* @return the span.
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
AuditSpanS3A getSpan() {
|
||||||
|
return span;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Forward to the inner span.
|
* Forward to the inner span.
|
||||||
* {@inheritDoc}
|
* {@inheritDoc}
|
||||||
|
@ -38,6 +38,7 @@
|
|||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.VisibleForTesting;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.audit.AuditConstants;
|
import org.apache.hadoop.fs.audit.AuditConstants;
|
||||||
import org.apache.hadoop.fs.audit.CommonAuditContext;
|
import org.apache.hadoop.fs.audit.CommonAuditContext;
|
||||||
@ -252,6 +253,17 @@ private void setLastHeader(final String lastHeader) {
|
|||||||
this.lastHeader = lastHeader;
|
this.lastHeader = lastHeader;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the referrer provided the span is an instance or
|
||||||
|
* subclass of LoggingAuditSpan.
|
||||||
|
* @param span span
|
||||||
|
* @return the referrer
|
||||||
|
* @throws ClassCastException if a different span type was passed in
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
HttpReferrerAuditHeader getReferrer(AuditSpanS3A span) {
|
||||||
|
return ((LoggingAuditSpan) span).getReferrer();
|
||||||
|
}
|
||||||
/**
|
/**
|
||||||
* Span which logs at debug and sets the HTTP referrer on
|
* Span which logs at debug and sets the HTTP referrer on
|
||||||
* invocations.
|
* invocations.
|
||||||
@ -441,10 +453,10 @@ public String toString() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the referrer; visible for tests.
|
* Get the referrer.
|
||||||
* @return the referrer.
|
* @return the referrer.
|
||||||
*/
|
*/
|
||||||
HttpReferrerAuditHeader getReferrer() {
|
private HttpReferrerAuditHeader getReferrer() {
|
||||||
return referrer;
|
return referrer;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -26,6 +26,8 @@
|
|||||||
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
import software.amazon.awssdk.awscore.AwsRequest;
|
||||||
|
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
|
||||||
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
|
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
|
||||||
import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
|
import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
|
||||||
import software.amazon.awssdk.core.retry.RetryMode;
|
import software.amazon.awssdk.core.retry.RetryMode;
|
||||||
@ -623,4 +625,24 @@ static ConnectionSettings createConnectionSettings(Configuration conf) {
|
|||||||
socketTimeout);
|
socketTimeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set a custom ApiCallTimeout for a single request.
|
||||||
|
* This allows for a longer timeout to be used in data upload
|
||||||
|
* requests than that for all other S3 interactions;
|
||||||
|
* This does not happen by default in the V2 SDK
|
||||||
|
* (see HADOOP-19295).
|
||||||
|
* <p>
|
||||||
|
* If the timeout is zero, the request is not patched.
|
||||||
|
* @param builder builder to patch.
|
||||||
|
* @param timeout timeout
|
||||||
|
*/
|
||||||
|
public static void setRequestTimeout(AwsRequest.Builder builder, Duration timeout) {
|
||||||
|
if (!timeout.isZero()) {
|
||||||
|
builder.overrideConfiguration(
|
||||||
|
AwsRequestOverrideConfiguration.builder()
|
||||||
|
.apiCallTimeout(timeout)
|
||||||
|
.apiCallAttemptTimeout(timeout)
|
||||||
|
.build());
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -18,6 +18,7 @@
|
|||||||
|
|
||||||
package org.apache.hadoop.fs.s3a.impl;
|
package org.apache.hadoop.fs.s3a.impl;
|
||||||
|
|
||||||
|
import java.time.Duration;
|
||||||
import java.util.Base64;
|
import java.util.Base64;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@ -59,7 +60,9 @@
|
|||||||
import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets;
|
import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets;
|
||||||
|
|
||||||
import static org.apache.commons.lang3.StringUtils.isNotEmpty;
|
import static org.apache.commons.lang3.StringUtils.isNotEmpty;
|
||||||
|
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_PART_UPLOAD_TIMEOUT;
|
||||||
import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.UNKNOWN_ALGORITHM;
|
import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.UNKNOWN_ALGORITHM;
|
||||||
|
import static org.apache.hadoop.fs.s3a.impl.AWSClientConfig.setRequestTimeout;
|
||||||
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DEFAULT_UPLOAD_PART_COUNT_LIMIT;
|
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DEFAULT_UPLOAD_PART_COUNT_LIMIT;
|
||||||
import static org.apache.hadoop.util.Preconditions.checkArgument;
|
import static org.apache.hadoop.util.Preconditions.checkArgument;
|
||||||
import static org.apache.hadoop.util.Preconditions.checkNotNull;
|
import static org.apache.hadoop.util.Preconditions.checkNotNull;
|
||||||
@ -128,6 +131,12 @@ public class RequestFactoryImpl implements RequestFactory {
|
|||||||
*/
|
*/
|
||||||
private final boolean isMultipartUploadEnabled;
|
private final boolean isMultipartUploadEnabled;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Timeout for uploading objects/parts.
|
||||||
|
* This will be set on data put/post operations only.
|
||||||
|
*/
|
||||||
|
private final Duration partUploadTimeout;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructor.
|
* Constructor.
|
||||||
* @param builder builder with all the configuration.
|
* @param builder builder with all the configuration.
|
||||||
@ -142,6 +151,7 @@ protected RequestFactoryImpl(
|
|||||||
this.contentEncoding = builder.contentEncoding;
|
this.contentEncoding = builder.contentEncoding;
|
||||||
this.storageClass = builder.storageClass;
|
this.storageClass = builder.storageClass;
|
||||||
this.isMultipartUploadEnabled = builder.isMultipartUploadEnabled;
|
this.isMultipartUploadEnabled = builder.isMultipartUploadEnabled;
|
||||||
|
this.partUploadTimeout = builder.partUploadTimeout;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -344,6 +354,11 @@ public PutObjectRequest.Builder newPutObjectRequestBuilder(String key,
|
|||||||
putObjectRequestBuilder.storageClass(storageClass);
|
putObjectRequestBuilder.storageClass(storageClass);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Set the timeout for object uploads but not directory markers.
|
||||||
|
if (!isDirectoryMarker) {
|
||||||
|
setRequestTimeout(putObjectRequestBuilder, partUploadTimeout);
|
||||||
|
}
|
||||||
|
|
||||||
return prepareRequest(putObjectRequestBuilder);
|
return prepareRequest(putObjectRequestBuilder);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -595,6 +610,9 @@ public UploadPartRequest.Builder newUploadPartRequestBuilder(
|
|||||||
.partNumber(partNumber)
|
.partNumber(partNumber)
|
||||||
.contentLength(size);
|
.contentLength(size);
|
||||||
uploadPartEncryptionParameters(builder);
|
uploadPartEncryptionParameters(builder);
|
||||||
|
|
||||||
|
// Set the request timeout for the part upload
|
||||||
|
setRequestTimeout(builder, partUploadTimeout);
|
||||||
return prepareRequest(builder);
|
return prepareRequest(builder);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -702,6 +720,13 @@ public static final class RequestFactoryBuilder {
|
|||||||
*/
|
*/
|
||||||
private boolean isMultipartUploadEnabled = true;
|
private boolean isMultipartUploadEnabled = true;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Timeout for uploading objects/parts.
|
||||||
|
* This will be set on data put/post operations only.
|
||||||
|
* A zero value means "no custom timeout"
|
||||||
|
*/
|
||||||
|
private Duration partUploadTimeout = DEFAULT_PART_UPLOAD_TIMEOUT;
|
||||||
|
|
||||||
private RequestFactoryBuilder() {
|
private RequestFactoryBuilder() {
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -799,6 +824,18 @@ public RequestFactoryBuilder withMultipartUploadEnabled(
|
|||||||
this.isMultipartUploadEnabled = value;
|
this.isMultipartUploadEnabled = value;
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Timeout for uploading objects/parts.
|
||||||
|
* This will be set on data put/post operations only.
|
||||||
|
* A zero value means "no custom timeout"
|
||||||
|
* @param value new value
|
||||||
|
* @return the builder
|
||||||
|
*/
|
||||||
|
public RequestFactoryBuilder withPartUploadTimeout(final Duration value) {
|
||||||
|
partUploadTimeout = value;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -26,6 +26,7 @@
|
|||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.UncheckedIOException;
|
import java.io.UncheckedIOException;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
|
import java.time.LocalDateTime;
|
||||||
import java.util.function.Supplier;
|
import java.util.function.Supplier;
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
|
|
||||||
@ -224,6 +225,12 @@ public static abstract class BaseContentProvider<T extends InputStream>
|
|||||||
*/
|
*/
|
||||||
private T currentStream;
|
private T currentStream;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* When did this upload start?
|
||||||
|
* Use in error messages.
|
||||||
|
*/
|
||||||
|
private final LocalDateTime startTime;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructor.
|
* Constructor.
|
||||||
* @param size size of the data. Must be non-negative.
|
* @param size size of the data. Must be non-negative.
|
||||||
@ -241,6 +248,7 @@ protected BaseContentProvider(int size, @Nullable Supplier<Boolean> isOpen) {
|
|||||||
checkArgument(size >= 0, "size is negative: %s", size);
|
checkArgument(size >= 0, "size is negative: %s", size);
|
||||||
this.size = size;
|
this.size = size;
|
||||||
this.isOpen = isOpen;
|
this.isOpen = isOpen;
|
||||||
|
this.startTime = LocalDateTime.now();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -274,8 +282,11 @@ public final InputStream newStream() {
|
|||||||
close();
|
close();
|
||||||
checkOpen();
|
checkOpen();
|
||||||
streamCreationCount++;
|
streamCreationCount++;
|
||||||
if (streamCreationCount > 1) {
|
if (streamCreationCount == 2) {
|
||||||
LOG.info("Stream created more than once: {}", this);
|
// the stream has been recreated for the first time.
|
||||||
|
// notify only once for this stream, so as not to flood
|
||||||
|
// the logs.
|
||||||
|
LOG.info("Stream recreated: {}", this);
|
||||||
}
|
}
|
||||||
return setCurrentStream(createNewStream());
|
return setCurrentStream(createNewStream());
|
||||||
}
|
}
|
||||||
@ -302,6 +313,14 @@ public int getSize() {
|
|||||||
return size;
|
return size;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* When did this upload start?
|
||||||
|
* @return start time
|
||||||
|
*/
|
||||||
|
public LocalDateTime getStartTime() {
|
||||||
|
return startTime;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Current stream.
|
* Current stream.
|
||||||
* When {@link #newStream()} is called, this is set to the new value,
|
* When {@link #newStream()} is called, this is set to the new value,
|
||||||
@ -330,6 +349,7 @@ protected T setCurrentStream(T stream) {
|
|||||||
public String toString() {
|
public String toString() {
|
||||||
return "BaseContentProvider{" +
|
return "BaseContentProvider{" +
|
||||||
"size=" + size +
|
"size=" + size +
|
||||||
|
", initiated at " + startTime +
|
||||||
", streamCreationCount=" + streamCreationCount +
|
", streamCreationCount=" + streamCreationCount +
|
||||||
", currentStream=" + currentStream +
|
", currentStream=" + currentStream +
|
||||||
'}';
|
'}';
|
||||||
|
@ -356,6 +356,10 @@ public void testWithOutCrossRegionAccess() throws Exception {
|
|||||||
// skip the test if the region is sa-east-1
|
// skip the test if the region is sa-east-1
|
||||||
skipCrossRegionTest();
|
skipCrossRegionTest();
|
||||||
final Configuration newConf = new Configuration(getConfiguration());
|
final Configuration newConf = new Configuration(getConfiguration());
|
||||||
|
removeBaseAndBucketOverrides(newConf,
|
||||||
|
ENDPOINT,
|
||||||
|
AWS_S3_CROSS_REGION_ACCESS_ENABLED,
|
||||||
|
AWS_REGION);
|
||||||
// disable cross region access
|
// disable cross region access
|
||||||
newConf.setBoolean(AWS_S3_CROSS_REGION_ACCESS_ENABLED, false);
|
newConf.setBoolean(AWS_S3_CROSS_REGION_ACCESS_ENABLED, false);
|
||||||
newConf.set(AWS_REGION, SA_EAST_1);
|
newConf.set(AWS_REGION, SA_EAST_1);
|
||||||
@ -374,6 +378,7 @@ public void testWithCrossRegionAccess() throws Exception {
|
|||||||
skipCrossRegionTest();
|
skipCrossRegionTest();
|
||||||
final Configuration newConf = new Configuration(getConfiguration());
|
final Configuration newConf = new Configuration(getConfiguration());
|
||||||
removeBaseAndBucketOverrides(newConf,
|
removeBaseAndBucketOverrides(newConf,
|
||||||
|
ENDPOINT,
|
||||||
AWS_S3_CROSS_REGION_ACCESS_ENABLED,
|
AWS_S3_CROSS_REGION_ACCESS_ENABLED,
|
||||||
AWS_REGION);
|
AWS_REGION);
|
||||||
// enable cross region access
|
// enable cross region access
|
||||||
|
@ -41,6 +41,7 @@
|
|||||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertLacksPathCapabilities;
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertLacksPathCapabilities;
|
||||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
|
||||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
|
||||||
|
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_PART_UPLOAD_TIMEOUT;
|
||||||
import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_ALGORITHM;
|
import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_ALGORITHM;
|
||||||
import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_KEY;
|
import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_KEY;
|
||||||
import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM;
|
import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM;
|
||||||
@ -100,7 +101,10 @@ public void testCreateNonRecursiveSuccess() throws IOException {
|
|||||||
public void testPutObjectDirect() throws Throwable {
|
public void testPutObjectDirect() throws Throwable {
|
||||||
final S3AFileSystem fs = getFileSystem();
|
final S3AFileSystem fs = getFileSystem();
|
||||||
try (AuditSpan span = span()) {
|
try (AuditSpan span = span()) {
|
||||||
RequestFactory factory = RequestFactoryImpl.builder().withBucket(fs.getBucket()).build();
|
RequestFactory factory = RequestFactoryImpl.builder()
|
||||||
|
.withBucket(fs.getBucket())
|
||||||
|
.withPartUploadTimeout(DEFAULT_PART_UPLOAD_TIMEOUT)
|
||||||
|
.build();
|
||||||
Path path = path("putDirect");
|
Path path = path("putDirect");
|
||||||
PutObjectRequest.Builder putObjectRequestBuilder =
|
PutObjectRequest.Builder putObjectRequestBuilder =
|
||||||
factory.newPutObjectRequestBuilder(path.toUri().getPath(), null, -1, false);
|
factory.newPutObjectRequestBuilder(path.toUri().getPath(), null, -1, false);
|
||||||
|
@ -54,6 +54,7 @@
|
|||||||
import org.apache.hadoop.util.Progressable;
|
import org.apache.hadoop.util.Progressable;
|
||||||
|
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_PART_UPLOAD_TIMEOUT;
|
||||||
import static org.apache.hadoop.fs.s3a.audit.AuditTestSupport.noopAuditor;
|
import static org.apache.hadoop.fs.s3a.audit.AuditTestSupport.noopAuditor;
|
||||||
import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.stubDurationTrackerFactory;
|
import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.stubDurationTrackerFactory;
|
||||||
import static org.apache.hadoop.util.Preconditions.checkNotNull;
|
import static org.apache.hadoop.util.Preconditions.checkNotNull;
|
||||||
@ -99,6 +100,7 @@ public class MockS3AFileSystem extends S3AFileSystem {
|
|||||||
.withRequestPreparer(MockS3AFileSystem::prepareRequest)
|
.withRequestPreparer(MockS3AFileSystem::prepareRequest)
|
||||||
.withBucket(BUCKET)
|
.withBucket(BUCKET)
|
||||||
.withEncryptionSecrets(new EncryptionSecrets())
|
.withEncryptionSecrets(new EncryptionSecrets())
|
||||||
|
.withPartUploadTimeout(DEFAULT_PART_UPLOAD_TIMEOUT)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -24,6 +24,7 @@
|
|||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.regex.Matcher;
|
import java.util.regex.Matcher;
|
||||||
|
|
||||||
|
import org.assertj.core.api.Assertions;
|
||||||
import software.amazon.awssdk.http.SdkHttpRequest;
|
import software.amazon.awssdk.http.SdkHttpRequest;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
@ -32,6 +33,7 @@
|
|||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.s3a.audit.impl.LoggingAuditor;
|
import org.apache.hadoop.fs.s3a.audit.impl.LoggingAuditor;
|
||||||
|
import org.apache.hadoop.fs.s3a.audit.impl.ReferrerExtractor;
|
||||||
import org.apache.hadoop.fs.store.audit.AuditSpan;
|
import org.apache.hadoop.fs.store.audit.AuditSpan;
|
||||||
import org.apache.hadoop.fs.audit.CommonAuditContext;
|
import org.apache.hadoop.fs.audit.CommonAuditContext;
|
||||||
import org.apache.hadoop.fs.store.audit.HttpReferrerAuditHeader;
|
import org.apache.hadoop.fs.store.audit.HttpReferrerAuditHeader;
|
||||||
@ -417,4 +419,33 @@ private void expectStrippedField(final String str,
|
|||||||
.describedAs("Stripped <%s>", str)
|
.describedAs("Stripped <%s>", str)
|
||||||
.isEqualTo(ex);
|
.isEqualTo(ex);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
/**
|
||||||
|
* Verify that exceptions raised when building referrer headers
|
||||||
|
* do not result in failures, just an empty header.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testSpanResilience() throws Throwable {
|
||||||
|
final CommonAuditContext auditContext = CommonAuditContext.currentAuditContext();
|
||||||
|
final String failing = "failing";
|
||||||
|
auditContext.put(failing, () -> {
|
||||||
|
throw new RuntimeException("raised");
|
||||||
|
});
|
||||||
|
try {
|
||||||
|
final HttpReferrerAuditHeader referrer = ReferrerExtractor.getReferrer(auditor, span());
|
||||||
|
Assertions.assertThat(referrer.buildHttpReferrer())
|
||||||
|
.describedAs("referrer header")
|
||||||
|
.isBlank();
|
||||||
|
// repeat
|
||||||
|
LOG.info("second attempt: there should be no second warning below");
|
||||||
|
Assertions.assertThat(referrer.buildHttpReferrer())
|
||||||
|
.describedAs("referrer header 2")
|
||||||
|
.isBlank();
|
||||||
|
referrer.buildHttpReferrer();
|
||||||
|
} finally {
|
||||||
|
// critical to remove this so it doesn't interfere with any other
|
||||||
|
// tests
|
||||||
|
auditContext.remove(failing);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -0,0 +1,52 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.fs.s3a.audit.impl;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.s3a.audit.AuditSpanS3A;
|
||||||
|
import org.apache.hadoop.fs.store.audit.HttpReferrerAuditHeader;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Extract the referrer from a LoggingAuditor through a package-private
|
||||||
|
* method.
|
||||||
|
*/
|
||||||
|
public final class ReferrerExtractor {
|
||||||
|
|
||||||
|
private ReferrerExtractor() {
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the referrer provided the span is an instance or
|
||||||
|
* subclass of LoggingAuditSpan.
|
||||||
|
* If wrapped by a {@code WrappingAuditSpan}, it will be extracted.
|
||||||
|
* @param auditor the auditor.
|
||||||
|
* @param span span
|
||||||
|
* @return the referrer
|
||||||
|
* @throws ClassCastException if a different span type was passed in
|
||||||
|
*/
|
||||||
|
public static HttpReferrerAuditHeader getReferrer(LoggingAuditor auditor,
|
||||||
|
AuditSpanS3A span) {
|
||||||
|
AuditSpanS3A sp;
|
||||||
|
if (span instanceof ActiveAuditManagerS3A.WrappingAuditSpan) {
|
||||||
|
sp = ((ActiveAuditManagerS3A.WrappingAuditSpan) span).getSpan();
|
||||||
|
} else {
|
||||||
|
sp = span;
|
||||||
|
}
|
||||||
|
return auditor.getReferrer(sp);
|
||||||
|
}
|
||||||
|
}
|
@ -153,7 +153,7 @@ public Configuration createConfiguration() {
|
|||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void setup() throws Exception {
|
public void setup() throws Exception {
|
||||||
SdkFaultInjector.resetEvaluator();
|
SdkFaultInjector.resetFaultInjector();
|
||||||
super.setup();
|
super.setup();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -161,7 +161,7 @@ public void setup() throws Exception {
|
|||||||
public void teardown() throws Exception {
|
public void teardown() throws Exception {
|
||||||
// safety check in case the evaluation is failing any
|
// safety check in case the evaluation is failing any
|
||||||
// request needed in cleanup.
|
// request needed in cleanup.
|
||||||
SdkFaultInjector.resetEvaluator();
|
SdkFaultInjector.resetFaultInjector();
|
||||||
|
|
||||||
super.teardown();
|
super.teardown();
|
||||||
}
|
}
|
||||||
|
@ -21,7 +21,9 @@
|
|||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
|
import org.assertj.core.api.Assertions;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
@ -33,8 +35,12 @@
|
|||||||
import org.apache.hadoop.fs.contract.ContractTestUtils;
|
import org.apache.hadoop.fs.contract.ContractTestUtils;
|
||||||
import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
|
import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
|
||||||
import org.apache.hadoop.fs.s3a.S3AFileSystem;
|
import org.apache.hadoop.fs.s3a.S3AFileSystem;
|
||||||
|
import org.apache.hadoop.fs.s3a.api.PerformanceFlagEnum;
|
||||||
|
import org.apache.hadoop.fs.s3a.test.SdkFaultInjector;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.net.ConnectTimeoutException;
|
import org.apache.hadoop.net.ConnectTimeoutException;
|
||||||
|
import org.apache.hadoop.util.DurationInfo;
|
||||||
|
import org.apache.hadoop.util.OperationDuration;
|
||||||
|
|
||||||
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
|
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
|
||||||
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
|
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
|
||||||
@ -42,16 +48,19 @@
|
|||||||
import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_ACQUISITION_TIMEOUT;
|
import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_ACQUISITION_TIMEOUT;
|
||||||
import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_IDLE_TIME;
|
import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_IDLE_TIME;
|
||||||
import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_TTL;
|
import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_TTL;
|
||||||
|
import static org.apache.hadoop.fs.s3a.Constants.DIRECTORY_OPERATIONS_PURGE_UPLOADS;
|
||||||
import static org.apache.hadoop.fs.s3a.Constants.ESTABLISH_TIMEOUT;
|
import static org.apache.hadoop.fs.s3a.Constants.ESTABLISH_TIMEOUT;
|
||||||
import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_PERFORMANCE;
|
import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_PERFORMANCE;
|
||||||
import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_PERFORMANCE_FLAGS;
|
import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_PERFORMANCE_FLAGS;
|
||||||
import static org.apache.hadoop.fs.s3a.Constants.MAXIMUM_CONNECTIONS;
|
import static org.apache.hadoop.fs.s3a.Constants.MAXIMUM_CONNECTIONS;
|
||||||
import static org.apache.hadoop.fs.s3a.Constants.MAX_ERROR_RETRIES;
|
import static org.apache.hadoop.fs.s3a.Constants.MAX_ERROR_RETRIES;
|
||||||
|
import static org.apache.hadoop.fs.s3a.Constants.PART_UPLOAD_TIMEOUT;
|
||||||
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
|
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
|
||||||
import static org.apache.hadoop.fs.s3a.Constants.REQUEST_TIMEOUT;
|
import static org.apache.hadoop.fs.s3a.Constants.REQUEST_TIMEOUT;
|
||||||
import static org.apache.hadoop.fs.s3a.Constants.RETRY_LIMIT;
|
import static org.apache.hadoop.fs.s3a.Constants.RETRY_LIMIT;
|
||||||
import static org.apache.hadoop.fs.s3a.Constants.SOCKET_TIMEOUT;
|
import static org.apache.hadoop.fs.s3a.Constants.SOCKET_TIMEOUT;
|
||||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
|
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
|
||||||
|
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_PATH_PREFIX;
|
||||||
import static org.apache.hadoop.fs.s3a.impl.ConfigurationHelper.setDurationAsMillis;
|
import static org.apache.hadoop.fs.s3a.impl.ConfigurationHelper.setDurationAsMillis;
|
||||||
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
||||||
|
|
||||||
@ -63,7 +72,7 @@
|
|||||||
* The likely cause is actually -Dprefetch test runs as these return connections to
|
* The likely cause is actually -Dprefetch test runs as these return connections to
|
||||||
* the pool.
|
* the pool.
|
||||||
* However, it is also important to have a non-brittle FS for creating the test file
|
* However, it is also important to have a non-brittle FS for creating the test file
|
||||||
* and teardow, again, this makes for a flaky test..
|
* and teardown, again, this makes for a flaky test.
|
||||||
*/
|
*/
|
||||||
public class ITestConnectionTimeouts extends AbstractS3ATestBase {
|
public class ITestConnectionTimeouts extends AbstractS3ATestBase {
|
||||||
|
|
||||||
@ -72,6 +81,23 @@ public class ITestConnectionTimeouts extends AbstractS3ATestBase {
|
|||||||
*/
|
*/
|
||||||
public static final int FILE_SIZE = 1024;
|
public static final int FILE_SIZE = 1024;
|
||||||
|
|
||||||
|
public static final byte[] DATASET = dataset(FILE_SIZE, '0', 10);
|
||||||
|
|
||||||
|
public static final Duration UPLOAD_DURATION = Duration.ofSeconds(15);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Configuration createConfiguration() {
|
||||||
|
final Configuration conf = super.createConfiguration();
|
||||||
|
removeBaseAndBucketOverrides(conf,
|
||||||
|
DIRECTORY_OPERATIONS_PURGE_UPLOADS,
|
||||||
|
PART_UPLOAD_TIMEOUT);
|
||||||
|
setDurationAsMillis(conf, PART_UPLOAD_TIMEOUT, UPLOAD_DURATION);
|
||||||
|
|
||||||
|
// set this so teardown will clean pending uploads.
|
||||||
|
conf.setBoolean(DIRECTORY_OPERATIONS_PURGE_UPLOADS, true);
|
||||||
|
return conf;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a configuration for an FS which has timeouts set to very low values
|
* Create a configuration for an FS which has timeouts set to very low values
|
||||||
* and no retries.
|
* and no retries.
|
||||||
@ -86,6 +112,7 @@ private Configuration timingOutConfiguration() {
|
|||||||
ESTABLISH_TIMEOUT,
|
ESTABLISH_TIMEOUT,
|
||||||
MAX_ERROR_RETRIES,
|
MAX_ERROR_RETRIES,
|
||||||
MAXIMUM_CONNECTIONS,
|
MAXIMUM_CONNECTIONS,
|
||||||
|
PART_UPLOAD_TIMEOUT,
|
||||||
PREFETCH_ENABLED_KEY,
|
PREFETCH_ENABLED_KEY,
|
||||||
REQUEST_TIMEOUT,
|
REQUEST_TIMEOUT,
|
||||||
SOCKET_TIMEOUT,
|
SOCKET_TIMEOUT,
|
||||||
@ -118,7 +145,6 @@ public void teardown() throws Exception {
|
|||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testGeneratePoolTimeouts() throws Throwable {
|
public void testGeneratePoolTimeouts() throws Throwable {
|
||||||
byte[] data = dataset(FILE_SIZE, '0', 10);
|
|
||||||
AWSClientConfig.setMinimumOperationDuration(Duration.ZERO);
|
AWSClientConfig.setMinimumOperationDuration(Duration.ZERO);
|
||||||
Configuration conf = timingOutConfiguration();
|
Configuration conf = timingOutConfiguration();
|
||||||
Path path = methodPath();
|
Path path = methodPath();
|
||||||
@ -127,7 +153,7 @@ public void testGeneratePoolTimeouts() throws Throwable {
|
|||||||
final S3AFileSystem fs = getFileSystem();
|
final S3AFileSystem fs = getFileSystem();
|
||||||
// create the test file using the good fs, to avoid connection timeouts
|
// create the test file using the good fs, to avoid connection timeouts
|
||||||
// during setup.
|
// during setup.
|
||||||
ContractTestUtils.createFile(fs, path, true, data);
|
ContractTestUtils.createFile(fs, path, true, DATASET);
|
||||||
final FileStatus st = fs.getFileStatus(path);
|
final FileStatus st = fs.getFileStatus(path);
|
||||||
try (FileSystem brittleFS = FileSystem.newInstance(fs.getUri(), conf)) {
|
try (FileSystem brittleFS = FileSystem.newInstance(fs.getUri(), conf)) {
|
||||||
intercept(ConnectTimeoutException.class, () -> {
|
intercept(ConnectTimeoutException.class, () -> {
|
||||||
@ -148,4 +174,102 @@ public void testGeneratePoolTimeouts() throws Throwable {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verify that different timeouts are used for object upload operations.
|
||||||
|
* The PUT operation can take longer than the value set as the
|
||||||
|
* connection.request.timeout, but other operations (GET) will
|
||||||
|
* fail.
|
||||||
|
* <p>
|
||||||
|
* This test tries to balance "being fast" with "not failing assertions
|
||||||
|
* in parallel test runs".
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testObjectUploadTimeouts() throws Throwable {
|
||||||
|
AWSClientConfig.setMinimumOperationDuration(Duration.ZERO);
|
||||||
|
final Path dir = methodPath();
|
||||||
|
Path file = new Path(dir, "file");
|
||||||
|
Configuration conf = new Configuration(getConfiguration());
|
||||||
|
removeBaseAndBucketOverrides(conf,
|
||||||
|
PART_UPLOAD_TIMEOUT,
|
||||||
|
REQUEST_TIMEOUT,
|
||||||
|
FS_S3A_PERFORMANCE_FLAGS
|
||||||
|
);
|
||||||
|
|
||||||
|
// skip all checks
|
||||||
|
conf.set(FS_S3A_PERFORMANCE_FLAGS, PerformanceFlagEnum.Create.name());
|
||||||
|
final int uploadTimeout = 10;
|
||||||
|
// uploads have a long timeout
|
||||||
|
final Duration uploadDuration = Duration.ofSeconds(uploadTimeout);
|
||||||
|
setDurationAsMillis(conf, PART_UPLOAD_TIMEOUT, uploadDuration);
|
||||||
|
|
||||||
|
// other requests a short one
|
||||||
|
final Duration shortTimeout = Duration.ofSeconds(5);
|
||||||
|
setDurationAsMillis(conf, REQUEST_TIMEOUT, shortTimeout);
|
||||||
|
setDurationAsMillis(conf, CONNECTION_ACQUISITION_TIMEOUT, shortTimeout);
|
||||||
|
conf.setInt(RETRY_LIMIT, 0);
|
||||||
|
|
||||||
|
SdkFaultInjector.resetFaultInjector();
|
||||||
|
// total sleep time is tracked for extra assertions
|
||||||
|
final AtomicLong totalSleepTime = new AtomicLong(0);
|
||||||
|
// fault injector is set to sleep for a bit less than the upload timeout.
|
||||||
|
final long sleepTime = uploadDuration.toMillis() - 2000;
|
||||||
|
SdkFaultInjector.setAction((req, resp) -> {
|
||||||
|
totalSleepTime.addAndGet(sleepTime);
|
||||||
|
LOG.info("sleeping {} millis", sleepTime);
|
||||||
|
try {
|
||||||
|
Thread.sleep(sleepTime);
|
||||||
|
} catch (InterruptedException ignored) {
|
||||||
|
}
|
||||||
|
return resp;
|
||||||
|
});
|
||||||
|
SdkFaultInjector.setRequestFailureConditions(999,
|
||||||
|
SdkFaultInjector::isPutRequest);
|
||||||
|
SdkFaultInjector.addFaultInjection(conf);
|
||||||
|
final S3AFileSystem fs = getFileSystem();
|
||||||
|
try (FileSystem brittleFS = FileSystem.newInstance(fs.getUri(), conf)) {
|
||||||
|
OperationDuration dur = new DurationInfo(LOG, "Creating File");
|
||||||
|
ContractTestUtils.createFile(brittleFS, file, true, DATASET);
|
||||||
|
dur.finished();
|
||||||
|
Assertions.assertThat(totalSleepTime.get())
|
||||||
|
.describedAs("total sleep time of PUT")
|
||||||
|
.isGreaterThan(0);
|
||||||
|
Assertions.assertThat(dur.asDuration())
|
||||||
|
.describedAs("Duration of write")
|
||||||
|
.isGreaterThan(shortTimeout)
|
||||||
|
.isLessThan(uploadDuration);
|
||||||
|
|
||||||
|
// reading the file will fail because sleepiing
|
||||||
|
totalSleepTime.set(0);
|
||||||
|
LOG.debug("attempting read");
|
||||||
|
SdkFaultInjector.setRequestFailureConditions(999,
|
||||||
|
SdkFaultInjector::isGetRequest);
|
||||||
|
// the exact IOE depends on what failed; if it is in the http read it will be a
|
||||||
|
// software.amazon.awssdk.thirdparty.org.apache.http.ConnectionClosedException
|
||||||
|
// which is too low level to safely assert about.
|
||||||
|
// it can also surface as an UncheckedIOException wrapping the inner cause.
|
||||||
|
intercept(Exception.class, () ->
|
||||||
|
ContractTestUtils.readUTF8(brittleFS, file, DATASET.length));
|
||||||
|
Assertions.assertThat(totalSleepTime.get())
|
||||||
|
.describedAs("total sleep time of read")
|
||||||
|
.isGreaterThan(0);
|
||||||
|
|
||||||
|
// and try a multipart upload to verify that its requests also outlast
|
||||||
|
// the short requests
|
||||||
|
SdkFaultInjector.setRequestFailureConditions(999,
|
||||||
|
SdkFaultInjector::isPartUpload);
|
||||||
|
Path magicFile = new Path(dir, MAGIC_PATH_PREFIX + "0001/__base/file2");
|
||||||
|
totalSleepTime.set(0);
|
||||||
|
OperationDuration dur2 = new DurationInfo(LOG, "Creating File");
|
||||||
|
ContractTestUtils.createFile(brittleFS, magicFile, true, DATASET);
|
||||||
|
dur2.finished();
|
||||||
|
Assertions.assertThat(totalSleepTime.get())
|
||||||
|
.describedAs("total sleep time of magic write")
|
||||||
|
.isGreaterThan(0);
|
||||||
|
Assertions.assertThat(dur2.asDuration())
|
||||||
|
.describedAs("Duration of magic write")
|
||||||
|
.isGreaterThan(shortTimeout);
|
||||||
|
brittleFS.delete(dir, true);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -19,16 +19,21 @@
|
|||||||
package org.apache.hadoop.fs.s3a.impl;
|
package org.apache.hadoop.fs.s3a.impl;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.time.Duration;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
import software.amazon.awssdk.awscore.AwsRequest;
|
import software.amazon.awssdk.awscore.AwsRequest;
|
||||||
|
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
|
||||||
import software.amazon.awssdk.core.SdkRequest;
|
import software.amazon.awssdk.core.SdkRequest;
|
||||||
import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
|
import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
|
||||||
import org.assertj.core.api.Assertions;
|
import org.assertj.core.api.Assertions;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
|
||||||
|
import software.amazon.awssdk.services.s3.model.S3Request;
|
||||||
|
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
|
||||||
|
|
||||||
|
|
||||||
import org.apache.hadoop.fs.PathIOException;
|
import org.apache.hadoop.fs.PathIOException;
|
||||||
@ -38,6 +43,7 @@
|
|||||||
import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets;
|
import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets;
|
||||||
import org.apache.hadoop.test.AbstractHadoopTestBase;
|
import org.apache.hadoop.test.AbstractHadoopTestBase;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_PART_UPLOAD_TIMEOUT;
|
||||||
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
||||||
import static org.assertj.core.api.Assertions.assertThat;
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
|
||||||
@ -109,8 +115,6 @@ public void testRequestFactoryWithCannedACL() throws Throwable {
|
|||||||
.isEqualTo(acl);
|
.isEqualTo(acl);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Now add a processor and verify that it was invoked for
|
* Now add a processor and verify that it was invoked for
|
||||||
* exactly as many requests as were analyzed.
|
* exactly as many requests as were analyzed.
|
||||||
@ -207,4 +211,64 @@ public void testMultipartUploadRequest() throws Throwable {
|
|||||||
.isEqualTo(requestsAnalyzed);
|
.isEqualTo(requestsAnalyzed);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Assertion for Request timeouts.
|
||||||
|
* @param duration expected duration.
|
||||||
|
* @param request request.
|
||||||
|
*/
|
||||||
|
private void assertApiTimeouts(Duration duration, S3Request request) {
|
||||||
|
Assertions.assertThat(request.overrideConfiguration())
|
||||||
|
.describedAs("request %s", request)
|
||||||
|
.isNotEmpty();
|
||||||
|
final AwsRequestOverrideConfiguration override =
|
||||||
|
request.overrideConfiguration().get();
|
||||||
|
Assertions.assertThat(override.apiCallAttemptTimeout())
|
||||||
|
.describedAs("apiCallAttemptTimeout")
|
||||||
|
.hasValue(duration);
|
||||||
|
Assertions.assertThat(override.apiCallTimeout())
|
||||||
|
.describedAs("apiCallTimeout")
|
||||||
|
.hasValue(duration);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If not overridden timeouts are set to the default part upload timeout.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testDefaultUploadTimeouts() throws Throwable {
|
||||||
|
|
||||||
|
RequestFactory factory = RequestFactoryImpl.builder()
|
||||||
|
.withBucket("bucket")
|
||||||
|
.withMultipartPartCountLimit(2)
|
||||||
|
.build();
|
||||||
|
final UploadPartRequest upload =
|
||||||
|
factory.newUploadPartRequestBuilder("path", "id", 2, 128_000_000).build();
|
||||||
|
assertApiTimeouts(DEFAULT_PART_UPLOAD_TIMEOUT, upload);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verify that when upload request timeouts are set,
|
||||||
|
* they are passed down.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testUploadTimeouts() throws Throwable {
|
||||||
|
Duration partDuration = Duration.ofDays(1);
|
||||||
|
RequestFactory factory = RequestFactoryImpl.builder()
|
||||||
|
.withBucket("bucket")
|
||||||
|
.withPartUploadTimeout(partDuration)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
String path = "path";
|
||||||
|
|
||||||
|
// A simple PUT
|
||||||
|
final PutObjectRequest put = factory.newPutObjectRequestBuilder(path,
|
||||||
|
PutObjectOptions.deletingDirs(), 1024, false).build();
|
||||||
|
assertApiTimeouts(partDuration, put);
|
||||||
|
|
||||||
|
// multipart part
|
||||||
|
final UploadPartRequest upload = factory.newUploadPartRequestBuilder(path,
|
||||||
|
"1", 3, 128_000_000)
|
||||||
|
.build();
|
||||||
|
assertApiTimeouts(partDuration, upload);
|
||||||
|
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -166,7 +166,7 @@ protected Configuration createScaleConfiguration() {
|
|||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void setup() throws Exception {
|
public void setup() throws Exception {
|
||||||
SdkFaultInjector.resetEvaluator();
|
SdkFaultInjector.resetFaultInjector();
|
||||||
super.setup();
|
super.setup();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -174,7 +174,7 @@ public void setup() throws Exception {
|
|||||||
public void teardown() throws Exception {
|
public void teardown() throws Exception {
|
||||||
// safety check in case the evaluation is failing any
|
// safety check in case the evaluation is failing any
|
||||||
// request needed in cleanup.
|
// request needed in cleanup.
|
||||||
SdkFaultInjector.resetEvaluator();
|
SdkFaultInjector.resetFaultInjector();
|
||||||
|
|
||||||
super.teardown();
|
super.teardown();
|
||||||
}
|
}
|
||||||
|
@ -28,6 +28,7 @@
|
|||||||
import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_MIN_SIZE;
|
import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_MIN_SIZE;
|
||||||
import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_SIZE;
|
import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_SIZE;
|
||||||
import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_UPLOADS_ENABLED;
|
import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_UPLOADS_ENABLED;
|
||||||
|
import static org.apache.hadoop.fs.s3a.Constants.PART_UPLOAD_TIMEOUT;
|
||||||
import static org.apache.hadoop.fs.s3a.Constants.REQUEST_TIMEOUT;
|
import static org.apache.hadoop.fs.s3a.Constants.REQUEST_TIMEOUT;
|
||||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
|
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
|
||||||
|
|
||||||
@ -78,12 +79,13 @@ protected Configuration createScaleConfiguration() {
|
|||||||
MIN_MULTIPART_THRESHOLD,
|
MIN_MULTIPART_THRESHOLD,
|
||||||
MULTIPART_UPLOADS_ENABLED,
|
MULTIPART_UPLOADS_ENABLED,
|
||||||
MULTIPART_SIZE,
|
MULTIPART_SIZE,
|
||||||
|
PART_UPLOAD_TIMEOUT,
|
||||||
REQUEST_TIMEOUT);
|
REQUEST_TIMEOUT);
|
||||||
conf.setInt(IO_CHUNK_BUFFER_SIZE, 655360);
|
conf.setInt(IO_CHUNK_BUFFER_SIZE, 655360);
|
||||||
conf.setInt(MIN_MULTIPART_THRESHOLD, MULTIPART_MIN_SIZE);
|
conf.setInt(MIN_MULTIPART_THRESHOLD, MULTIPART_MIN_SIZE);
|
||||||
conf.setInt(MULTIPART_SIZE, MULTIPART_MIN_SIZE);
|
conf.setInt(MULTIPART_SIZE, MULTIPART_MIN_SIZE);
|
||||||
conf.setBoolean(MULTIPART_UPLOADS_ENABLED, false);
|
conf.setBoolean(MULTIPART_UPLOADS_ENABLED, false);
|
||||||
conf.set(REQUEST_TIMEOUT, SINGLE_PUT_REQUEST_TIMEOUT);
|
conf.set(PART_UPLOAD_TIMEOUT, SINGLE_PUT_REQUEST_TIMEOUT);
|
||||||
return conf;
|
return conf;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -19,6 +19,7 @@
|
|||||||
package org.apache.hadoop.fs.s3a.test;
|
package org.apache.hadoop.fs.s3a.test;
|
||||||
|
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import java.util.function.BiFunction;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
@ -35,6 +36,7 @@
|
|||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
|
||||||
|
import static java.util.Objects.requireNonNull;
|
||||||
import static org.apache.hadoop.fs.s3a.audit.AuditTestSupport.enableLoggingAuditor;
|
import static org.apache.hadoop.fs.s3a.audit.AuditTestSupport.enableLoggingAuditor;
|
||||||
import static org.apache.hadoop.fs.s3a.audit.AuditTestSupport.resetAuditOptions;
|
import static org.apache.hadoop.fs.s3a.audit.AuditTestSupport.resetAuditOptions;
|
||||||
import static org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.AUDIT_EXECUTION_INTERCEPTORS;
|
import static org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.AUDIT_EXECUTION_INTERCEPTORS;
|
||||||
@ -77,6 +79,13 @@ public final class SdkFaultInjector implements ExecutionInterceptor {
|
|||||||
*/
|
*/
|
||||||
private static Function<Context.ModifyHttpResponse, Boolean> evaluator = ALWAYS_ALLOW;
|
private static Function<Context.ModifyHttpResponse, Boolean> evaluator = ALWAYS_ALLOW;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Action to take on failure.
|
||||||
|
*/
|
||||||
|
private static BiFunction<SdkRequest, SdkHttpResponse, SdkHttpResponse>
|
||||||
|
action = SdkFaultInjector::patchStatusCode;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Update the value of {@link #FAILURE_STATUS_CODE}.
|
* Update the value of {@link #FAILURE_STATUS_CODE}.
|
||||||
* @param value new value
|
* @param value new value
|
||||||
@ -97,10 +106,14 @@ public static void setEvaluator(Function<Context.ModifyHttpResponse, Boolean> va
|
|||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Reset the evaluator to enable everything.
|
* Reset fault injection.
|
||||||
|
* The evaluator will enable everything;
|
||||||
|
* the failure action is set to
|
||||||
|
* {@link #patchStatusCode(SdkRequest, SdkHttpResponse)}.
|
||||||
*/
|
*/
|
||||||
public static void resetEvaluator() {
|
public static void resetFaultInjector() {
|
||||||
setEvaluator(ALWAYS_ALLOW);
|
setEvaluator(ALWAYS_ALLOW);
|
||||||
|
setAction(SdkFaultInjector::patchStatusCode);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -123,6 +136,23 @@ public static void setRequestFailureConditions(final int attempts,
|
|||||||
setEvaluator(condition);
|
setEvaluator(condition);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the action to invoke.
|
||||||
|
* @param action new action.
|
||||||
|
*/
|
||||||
|
public static void setAction(BiFunction<SdkRequest, SdkHttpResponse, SdkHttpResponse> action) {
|
||||||
|
SdkFaultInjector.action = requireNonNull(action);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Is the response being processed from a GET request?
|
||||||
|
* @param context request context.
|
||||||
|
* @return true if the request is of the right type.
|
||||||
|
*/
|
||||||
|
public static boolean isGetRequest(final Context.ModifyHttpResponse context) {
|
||||||
|
return context.httpRequest().method().equals(SdkHttpMethod.GET);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Is the response being processed from a PUT request?
|
* Is the response being processed from a PUT request?
|
||||||
* @param context request context.
|
* @param context request context.
|
||||||
@ -168,6 +198,8 @@ public static boolean isMultipartAbort(final Context.ModifyHttpResponse context)
|
|||||||
return context.request() instanceof AbortMultipartUploadRequest;
|
return context.request() instanceof AbortMultipartUploadRequest;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Review response from S3 and optionall modify its status code.
|
* Review response from S3 and optionall modify its status code.
|
||||||
* @return the original response or a copy with a different status code.
|
* @return the original response or a copy with a different status code.
|
||||||
@ -179,14 +211,7 @@ public SdkHttpResponse modifyHttpResponse(final Context.ModifyHttpResponse conte
|
|||||||
SdkHttpResponse httpResponse = context.httpResponse();
|
SdkHttpResponse httpResponse = context.httpResponse();
|
||||||
if (evaluator.apply(context) && shouldFail()) {
|
if (evaluator.apply(context) && shouldFail()) {
|
||||||
|
|
||||||
// fail the request
|
return action.apply(request, httpResponse);
|
||||||
final int code = FAILURE_STATUS_CODE.get();
|
|
||||||
LOG.info("Fault Injector returning {} error code for request {}",
|
|
||||||
code, request);
|
|
||||||
|
|
||||||
return httpResponse.copy(b -> {
|
|
||||||
b.statusCode(code);
|
|
||||||
});
|
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
// pass unchanged
|
// pass unchanged
|
||||||
@ -194,6 +219,25 @@ public SdkHttpResponse modifyHttpResponse(final Context.ModifyHttpResponse conte
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The default fault injector: patch the status code with the value in
|
||||||
|
* {@link #FAILURE_STATUS_CODE}.
|
||||||
|
* @param request original request
|
||||||
|
* @param httpResponse ongoing response
|
||||||
|
* @return modified response.
|
||||||
|
*/
|
||||||
|
public static SdkHttpResponse patchStatusCode(final SdkRequest request,
|
||||||
|
final SdkHttpResponse httpResponse) {
|
||||||
|
// fail the request
|
||||||
|
final int code = FAILURE_STATUS_CODE.get();
|
||||||
|
LOG.info("Fault Injector returning {} error code for request {}",
|
||||||
|
code, request);
|
||||||
|
|
||||||
|
return httpResponse.copy(b -> {
|
||||||
|
b.statusCode(code);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Should the request fail based on the failure count?
|
* Should the request fail based on the failure count?
|
||||||
* @return true if the request count means a request must fail
|
* @return true if the request count means a request must fail
|
||||||
|
@ -143,7 +143,7 @@
|
|||||||
<forkCount>1</forkCount>
|
<forkCount>1</forkCount>
|
||||||
<reuseForks>false</reuseForks>
|
<reuseForks>false</reuseForks>
|
||||||
<forkedProcessTimeoutInSeconds>600</forkedProcessTimeoutInSeconds>
|
<forkedProcessTimeoutInSeconds>600</forkedProcessTimeoutInSeconds>
|
||||||
<argLine>-Xmx1024m</argLine>
|
<argLine>${maven-surefire-plugin.argLine} -Xmx1024m</argLine>
|
||||||
<includes>
|
<includes>
|
||||||
<include>**/Test*.java</include>
|
<include>**/Test*.java</include>
|
||||||
</includes>
|
</includes>
|
||||||
|
@ -147,7 +147,7 @@
|
|||||||
<forkCount>1</forkCount>
|
<forkCount>1</forkCount>
|
||||||
<reuseForks>false</reuseForks>
|
<reuseForks>false</reuseForks>
|
||||||
<forkedProcessTimeoutInSeconds>600</forkedProcessTimeoutInSeconds>
|
<forkedProcessTimeoutInSeconds>600</forkedProcessTimeoutInSeconds>
|
||||||
<argLine>-Xmx1024m</argLine>
|
<argLine>${maven-surefire-plugin.argLine} -Xmx1024m</argLine>
|
||||||
<includes>
|
<includes>
|
||||||
<include>**/Test*.java</include>
|
<include>**/Test*.java</include>
|
||||||
</includes>
|
</includes>
|
||||||
|
@ -170,6 +170,9 @@ private NodeForPreemption getPreemptionCandidatesOnNode(
|
|||||||
Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
|
Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
|
||||||
Resource totalPreemptionAllowed, boolean readOnly) {
|
Resource totalPreemptionAllowed, boolean readOnly) {
|
||||||
RMContainer reservedContainer = node.getReservedContainer();
|
RMContainer reservedContainer = node.getReservedContainer();
|
||||||
|
if (reservedContainer == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
Resource available = Resources.clone(node.getUnallocatedResource());
|
Resource available = Resources.clone(node.getUnallocatedResource());
|
||||||
Resource totalSelected = Resources.createResource(0);
|
Resource totalSelected = Resources.createResource(0);
|
||||||
List<RMContainer> sortedRunningContainers =
|
List<RMContainer> sortedRunningContainers =
|
||||||
|
@ -876,10 +876,9 @@ private void completeOustandingUpdatesWhichAreReserved(
|
|||||||
RMContainer rmContainer, ContainerStatus containerStatus,
|
RMContainer rmContainer, ContainerStatus containerStatus,
|
||||||
RMContainerEventType event) {
|
RMContainerEventType event) {
|
||||||
N schedulerNode = getSchedulerNode(rmContainer.getNodeId());
|
N schedulerNode = getSchedulerNode(rmContainer.getNodeId());
|
||||||
if (schedulerNode != null &&
|
if (schedulerNode != null) {
|
||||||
schedulerNode.getReservedContainer() != null) {
|
|
||||||
RMContainer resContainer = schedulerNode.getReservedContainer();
|
RMContainer resContainer = schedulerNode.getReservedContainer();
|
||||||
if (resContainer.getReservedSchedulerKey() != null) {
|
if (resContainer != null && resContainer.getReservedSchedulerKey() != null) {
|
||||||
ContainerId containerToUpdate = resContainer
|
ContainerId containerToUpdate = resContainer
|
||||||
.getReservedSchedulerKey().getContainerToUpdate();
|
.getReservedSchedulerKey().getContainerToUpdate();
|
||||||
if (containerToUpdate != null &&
|
if (containerToUpdate != null &&
|
||||||
|
@ -858,12 +858,13 @@ private ContainerAllocation allocate(Resource clusterResource,
|
|||||||
FiCaSchedulerNode node = iter.next();
|
FiCaSchedulerNode node = iter.next();
|
||||||
|
|
||||||
// Do not schedule if there are any reservations to fulfill on the node
|
// Do not schedule if there are any reservations to fulfill on the node
|
||||||
|
RMContainer nodeReservedContainer = node.getReservedContainer();
|
||||||
if (iter.hasNext() &&
|
if (iter.hasNext() &&
|
||||||
node.getReservedContainer() != null &&
|
nodeReservedContainer != null &&
|
||||||
isSkipAllocateOnNodesWithReservedContainer()) {
|
isSkipAllocateOnNodesWithReservedContainer()) {
|
||||||
LOG.debug("Skipping scheduling on node {} since it has already been"
|
LOG.debug("Skipping scheduling on node {} since it has already been"
|
||||||
+ " reserved by {}", node.getNodeID(),
|
+ " reserved by {}", node.getNodeID(),
|
||||||
node.getReservedContainer().getContainerId());
|
nodeReservedContainer.getContainerId());
|
||||||
ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
|
ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
|
||||||
activitiesManager, node, application, schedulerKey,
|
activitiesManager, node, application, schedulerKey,
|
||||||
ActivityDiagnosticConstant.NODE_HAS_BEEN_RESERVED, ActivityLevel.NODE);
|
ActivityDiagnosticConstant.NODE_HAS_BEEN_RESERVED, ActivityLevel.NODE);
|
||||||
|
@ -520,13 +520,13 @@ public boolean accept(Resource cluster,
|
|||||||
// When reserve a resource (state == NEW is for new container,
|
// When reserve a resource (state == NEW is for new container,
|
||||||
// state == RUNNING is for increase container).
|
// state == RUNNING is for increase container).
|
||||||
// Just check if the node is not already reserved by someone
|
// Just check if the node is not already reserved by someone
|
||||||
if (schedulerContainer.getSchedulerNode().getReservedContainer()
|
RMContainer reservedContainer =
|
||||||
!= null) {
|
schedulerContainer.getSchedulerNode().getReservedContainer();
|
||||||
|
if (reservedContainer != null) {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Try to reserve a container, but the node is "
|
LOG.debug("Try to reserve a container, but the node is "
|
||||||
+ "already reserved by another container="
|
+ "already reserved by another container="
|
||||||
+ schedulerContainer.getSchedulerNode()
|
+ reservedContainer.getContainerId());
|
||||||
.getReservedContainer().getContainerId());
|
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -491,6 +491,8 @@
|
|||||||
</plugin>
|
</plugin>
|
||||||
<!-- The fork value is deliberately set to 0 to avoid VM crash while running tests
|
<!-- The fork value is deliberately set to 0 to avoid VM crash while running tests
|
||||||
on Jenkins, removing this leads to tests crashing silently due to VM crash -->
|
on Jenkins, removing this leads to tests crashing silently due to VM crash -->
|
||||||
|
<!-- TODO: we should investigate and address the crash issue and re-enable fork,
|
||||||
|
otherwise, JPMS args does not take effect -->
|
||||||
<plugin>
|
<plugin>
|
||||||
<artifactId>maven-surefire-plugin</artifactId>
|
<artifactId>maven-surefire-plugin</artifactId>
|
||||||
<configuration>
|
<configuration>
|
||||||
|
Loading…
Reference in New Issue
Block a user