Compare commits

...

10 Commits

Author SHA1 Message Date
slfan1989
f931ede86b
HADOOP-19298. [JDK17] Add a JDK17 profile. (#7085) Contributed by Shilun Fan.
Some checks failed
website / build (push) Has been cancelled
Reviewed-by: Steve Loughran <stevel@apache.org>
Reviewed-by: Attila Doroszlai <adoroszlai@apache.org>
Reviewed-by: Cheng Pan <chengpan@apache.org>
Reviewed-by: Min Yan <yaommen@gmail.com>
Signed-off-by: Shilun Fan <slfan1989@apache.org>
2024-10-18 17:16:33 +08:00
LiuGuH
6589d9f6aa
HDFS-17631. Fix RedundantEditLogInputStream.nextOp() state error when EditLogInputStream.skipUntil() throw IOException (#7066). Contributed by liuguanghua.
Signed-off-by: He Xiaoqiao <hexiaoqiao@apache.org>
2024-10-16 21:16:18 +08:00
Tao Yang
c63aafd7d1
YARN-11732. Fix potential NPE when calling SchedulerNode#reservedContainer for CapacityScheduler (#7065). Contributed by Tao Yang.
Reviewed-by: Syed Shameerur Rahman <syedthameem1@gmail.com>
Signed-off-by: He Xiaoqiao <hexiaoqiao@apache.org>
2024-10-16 21:11:31 +08:00
Davin Tjong
78a08b3b78
MAPREDUCE-7494. File stream leak when LineRecordReader is interrupted (#7117)
Contributed by Davin Tjong
2024-10-16 11:41:18 +01:00
Cheng Pan
9321e322d2
HADOOP-19310. Add JPMS options required by Java 17+ (#7114) Contributed by Cheng Pan.
Reviewed-by: Attila Doroszlai <adoroszlai@apache.org>
Signed-off-by: Shilun Fan <slfan1989@apache.org>
2024-10-16 14:15:01 +08:00
Mukund Thakur
e4b070025b
HADOOP-19291. RawLocalFileSystem to allow overlapping ranges (#7101)
ChecksumFileSystem creates the chunked ranges based on the checksum chunk size and then calls
readVectored on Raw Local which may lead to overlapping ranges in some cases.

Contributed by: Mukund Thakur
2024-10-09 08:34:47 -05:00
Steve Loughran
dc56fc385a
HADOOP-19295. S3A: large uploads can timeout over slow links (#7089)
This sets a different timeout for data upload PUT/POST calls to all
other requests, so that slow block uploads do not trigger timeouts
as rapidly as normal requests. This was always the behavior
in the V1 AWS SDK; for V2 we have to explicitly set it on the operations
we want to give extended timeouts. 

Option:  fs.s3a.connection.part.upload.timeout
Default: 15m

Contributed by Steve Loughran
2024-10-07 17:57:13 +01:00
Steve Loughran
50e6b49e05
HADOOP-19299. HttpReferrerAuditHeader resilience (#7095)
* HttpReferrerAuditHeader is thread safe, copying the lists/maps passed
  in and using synchronized methods when necessary.
* All exceptions raised when building referrer header are caught
  and swallowed.
* The first such error is logged at warn,  
* all errors plus stack are logged at debug

Contributed by Steve Loughran
2024-10-07 13:53:01 +01:00
zhtttylz
1f0d9df887
HDFS-17637. Fix spotbugs in HttpFSFileSystem#getXAttr (#7099) Contributed by Hualong Zhang.
Reviewed-by: Shilun Fan <slfan1989@apache.org>
Signed-off-by: Shilun Fan <slfan1989@apache.org>
2024-10-06 09:16:00 +08:00
Syed Shameerur Rahman
5ea3a1bd0a
HADOOP-19286: S3A: Support cross region access when S3 region/endpoint is set (ADDENDUM) (#7098)
Contributed by Syed Shameerur Rahman
2024-10-04 14:58:53 +01:00
41 changed files with 726 additions and 101 deletions

View File

@ -1571,12 +1571,21 @@ function hadoop_finalize_hadoop_opts
## @description Finish configuring JPMS that enforced for JDK 17 and higher ## @description Finish configuring JPMS that enforced for JDK 17 and higher
## @description prior to executing Java ## @description prior to executing Java
## @description keep this list sync with hadoop-project/pom.xml extraJavaTestArgs
## @audience private ## @audience private
## @stability evolving ## @stability evolving
## @replaceable yes ## @replaceable yes
function hadoop_finalize_jpms_opts function hadoop_finalize_jpms_opts
{ {
hadoop_add_param HADOOP_OPTS IgnoreUnrecognizedVMOptions "-XX:+IgnoreUnrecognizedVMOptions" hadoop_add_param HADOOP_OPTS IgnoreUnrecognizedVMOptions "-XX:+IgnoreUnrecognizedVMOptions"
hadoop_add_param HADOOP_OPTS open.java.io "--add-opens=java.base/java.io=ALL-UNNAMED"
hadoop_add_param HADOOP_OPTS open.java.lang "--add-opens=java.base/java.lang=ALL-UNNAMED"
hadoop_add_param HADOOP_OPTS open.java.lang.reflect "--add-opens=java.base/java.lang.reflect=ALL-UNNAMED"
hadoop_add_param HADOOP_OPTS open.java.math "--add-opens=java.base/java.math=ALL-UNNAMED"
hadoop_add_param HADOOP_OPTS open.java.net "--add-opens=java.base/java.net=ALL-UNNAMED"
hadoop_add_param HADOOP_OPTS open.java.text "--add-opens=java.base/java.text=ALL-UNNAMED"
hadoop_add_param HADOOP_OPTS open.java.util "--add-opens=java.base/java.util=ALL-UNNAMED"
hadoop_add_param HADOOP_OPTS open.java.util.concurrent "--add-opens=java.base/java.util.concurrent=ALL-UNNAMED"
hadoop_add_param HADOOP_OPTS open.java.util.zip "--add-opens=java.base/java.util.zip=ALL-UNNAMED" hadoop_add_param HADOOP_OPTS open.java.util.zip "--add-opens=java.base/java.util.zip=ALL-UNNAMED"
hadoop_add_param HADOOP_OPTS open.sun.security.util "--add-opens=java.base/sun.security.util=ALL-UNNAMED" hadoop_add_param HADOOP_OPTS open.sun.security.util "--add-opens=java.base/sun.security.util=ALL-UNNAMED"
hadoop_add_param HADOOP_OPTS open.sun.security.x509 "--add-opens=java.base/sun.security.x509=ALL-UNNAMED" hadoop_add_param HADOOP_OPTS open.sun.security.x509 "--add-opens=java.base/sun.security.x509=ALL-UNNAMED"

View File

@ -68,7 +68,8 @@
import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import static org.apache.hadoop.fs.VectoredReadUtils.validateAndSortRanges; import static org.apache.hadoop.fs.VectoredReadUtils.sortRangeList;
import static org.apache.hadoop.fs.VectoredReadUtils.validateRangeRequest;
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs; import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BYTES; import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BYTES;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_EXCEPTIONS; import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_EXCEPTIONS;
@ -320,10 +321,10 @@ public void readVectored(List<? extends FileRange> ranges,
IntFunction<ByteBuffer> allocate) throws IOException { IntFunction<ByteBuffer> allocate) throws IOException {
// Validate, but do not pass in a file length as it may change. // Validate, but do not pass in a file length as it may change.
List<? extends FileRange> sortedRanges = validateAndSortRanges(ranges, List<? extends FileRange> sortedRanges = sortRangeList(ranges);
Optional.empty());
// Set up all of the futures, so that we can use them if things fail // Set up all of the futures, so that we can use them if things fail
for(FileRange range: sortedRanges) { for(FileRange range: sortedRanges) {
validateRangeRequest(range);
range.setData(new CompletableFuture<>()); range.setData(new CompletableFuture<>());
} }
try { try {

View File

@ -29,6 +29,7 @@
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.StringJoiner; import java.util.StringJoiner;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier; import java.util.function.Supplier;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -40,6 +41,7 @@
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.audit.CommonAuditContext; import org.apache.hadoop.fs.audit.CommonAuditContext;
import org.apache.hadoop.fs.store.LogExactlyOnce; import org.apache.hadoop.fs.store.LogExactlyOnce;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
import org.apache.http.NameValuePair; import org.apache.http.NameValuePair;
import org.apache.http.client.utils.URLEncodedUtils; import org.apache.http.client.utils.URLEncodedUtils;
@ -57,6 +59,13 @@
* {@code org.apache.hadoop.fs.s3a.audit.TestHttpReferrerAuditHeader} * {@code org.apache.hadoop.fs.s3a.audit.TestHttpReferrerAuditHeader}
* so as to verify that header generation in the S3A auditors, and * so as to verify that header generation in the S3A auditors, and
* S3 log parsing, all work. * S3 log parsing, all work.
* <p>
* This header may be shared across multiple threads at the same time.
* so some methods are marked as synchronized, specifically those reading
* or writing the attribute map.
* <p>
* For the same reason, maps and lists passed down during construction are
* copied into thread safe structures.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Unstable @InterfaceStability.Unstable
@ -81,6 +90,14 @@ public final class HttpReferrerAuditHeader {
private static final LogExactlyOnce WARN_OF_URL_CREATION = private static final LogExactlyOnce WARN_OF_URL_CREATION =
new LogExactlyOnce(LOG); new LogExactlyOnce(LOG);
/**
* Log for warning of an exception raised when building
* the referrer header, including building the evaluated
* attributes.
*/
private static final LogExactlyOnce ERROR_BUILDING_REFERRER_HEADER =
new LogExactlyOnce(LOG);
/** Context ID. */ /** Context ID. */
private final String contextId; private final String contextId;
@ -122,7 +139,11 @@ public final class HttpReferrerAuditHeader {
/** /**
* Instantiate. * Instantiate.
* * <p>
* All maps/enums passed down are copied into thread safe equivalents.
* as their origin is unknown and cannot be guaranteed to
* not be shared.
* <p>
* Context and operationId are expected to be well formed * Context and operationId are expected to be well formed
* numeric/hex strings, at least adequate to be * numeric/hex strings, at least adequate to be
* used as individual path elements in a URL. * used as individual path elements in a URL.
@ -130,15 +151,15 @@ public final class HttpReferrerAuditHeader {
private HttpReferrerAuditHeader( private HttpReferrerAuditHeader(
final Builder builder) { final Builder builder) {
this.contextId = requireNonNull(builder.contextId); this.contextId = requireNonNull(builder.contextId);
this.evaluated = builder.evaluated; this.evaluated = new ConcurrentHashMap<>(builder.evaluated);
this.filter = builder.filter; this.filter = ImmutableSet.copyOf(builder.filter);
this.operationName = requireNonNull(builder.operationName); this.operationName = requireNonNull(builder.operationName);
this.path1 = builder.path1; this.path1 = builder.path1;
this.path2 = builder.path2; this.path2 = builder.path2;
this.spanId = requireNonNull(builder.spanId); this.spanId = requireNonNull(builder.spanId);
// copy the parameters from the builder and extend // copy the parameters from the builder and extend
attributes = builder.attributes; attributes = new ConcurrentHashMap<>(builder.attributes);
addAttribute(PARAM_OP, operationName); addAttribute(PARAM_OP, operationName);
addAttribute(PARAM_PATH, path1); addAttribute(PARAM_PATH, path1);
@ -166,17 +187,18 @@ private HttpReferrerAuditHeader(
* per entry, and "" returned. * per entry, and "" returned.
* @return a referrer string or "" * @return a referrer string or ""
*/ */
public String buildHttpReferrer() { public synchronized String buildHttpReferrer() {
String header; String header;
try { try {
Map<String, String> requestAttrs = new HashMap<>(attributes);
String queries; String queries;
// Update any params which are dynamically evaluated // Update any params which are dynamically evaluated
evaluated.forEach((key, eval) -> evaluated.forEach((key, eval) ->
addAttribute(key, eval.get())); requestAttrs.put(key, eval.get()));
// now build the query parameters from all attributes, static and // now build the query parameters from all attributes, static and
// evaluated, stripping out any from the filter // evaluated, stripping out any from the filter
queries = attributes.entrySet().stream() queries = requestAttrs.entrySet().stream()
.filter(e -> !filter.contains(e.getKey())) .filter(e -> !filter.contains(e.getKey()))
.map(e -> e.getKey() + "=" + e.getValue()) .map(e -> e.getKey() + "=" + e.getValue())
.collect(Collectors.joining("&")); .collect(Collectors.joining("&"));
@ -189,7 +211,14 @@ public String buildHttpReferrer() {
} catch (URISyntaxException e) { } catch (URISyntaxException e) {
WARN_OF_URL_CREATION.warn("Failed to build URI for auditor: " + e, e); WARN_OF_URL_CREATION.warn("Failed to build URI for auditor: " + e, e);
header = ""; header = "";
} catch (RuntimeException e) {
// do not let failure to build the header stop the request being
// issued.
ERROR_BUILDING_REFERRER_HEADER.warn("Failed to construct referred header {}", e.toString());
LOG.debug("Full stack", e);
header = "";
} }
return header; return header;
} }
@ -200,7 +229,7 @@ public String buildHttpReferrer() {
* @param key query key * @param key query key
* @param value query value * @param value query value
*/ */
private void addAttribute(String key, private synchronized void addAttribute(String key,
String value) { String value) {
if (StringUtils.isNotEmpty(value)) { if (StringUtils.isNotEmpty(value)) {
attributes.put(key, value); attributes.put(key, value);

View File

@ -623,8 +623,13 @@ support -and fallback everywhere else.
The restriction "no overlapping ranges" was only initially enforced in The restriction "no overlapping ranges" was only initially enforced in
the S3A connector, which would raise `UnsupportedOperationException`. the S3A connector, which would raise `UnsupportedOperationException`.
Adding the range check as a precondition for all implementations guarantees Adding the range check as a precondition for all implementations (Raw Local
consistent behavior everywhere. being an exception) guarantees consistent behavior everywhere.
The reason Raw Local doesn't have this precondition is ChecksumFileSystem
creates the chunked ranges based on the checksum chunk size and then calls
readVectored on Raw Local which may lead to overlapping ranges in some cases.
For details see [HADOOP-19291](https://issues.apache.org/jira/browse/HADOOP-19291)
For reliable use with older hadoop releases with the API: sort the list of ranges For reliable use with older hadoop releases with the API: sort the list of ranges
and check for overlaps before calling `readVectored()`. and check for overlaps before calling `readVectored()`.

View File

@ -270,13 +270,23 @@ public void testSomeRangesMergedSomeUnmerged() throws Exception {
} }
/** /**
* Vectored IO doesn't support overlapping ranges. * Most file systems won't support overlapping ranges.
* Currently, only Raw Local supports it.
*/ */
@Test @Test
public void testOverlappingRanges() throws Exception { public void testOverlappingRanges() throws Exception {
verifyExceptionalVectoredRead( if (!isSupported(VECTOR_IO_OVERLAPPING_RANGES)) {
getSampleOverlappingRanges(), verifyExceptionalVectoredRead(
IllegalArgumentException.class); getSampleOverlappingRanges(),
IllegalArgumentException.class);
} else {
try (FSDataInputStream in = openVectorFile()) {
List<FileRange> fileRanges = getSampleOverlappingRanges();
in.readVectored(fileRanges, allocate);
validateVectoredReadResult(fileRanges, DATASET, 0);
returnBuffersToPoolPostRead(fileRanges, pool);
}
}
} }
/** /**
@ -284,9 +294,18 @@ public void testOverlappingRanges() throws Exception {
*/ */
@Test @Test
public void testSameRanges() throws Exception { public void testSameRanges() throws Exception {
verifyExceptionalVectoredRead( if (!isSupported(VECTOR_IO_OVERLAPPING_RANGES)) {
getSampleSameRanges(), verifyExceptionalVectoredRead(
IllegalArgumentException.class); getSampleSameRanges(),
IllegalArgumentException.class);
} else {
try (FSDataInputStream in = openVectorFile()) {
List<FileRange> fileRanges = getSampleSameRanges();
in.readVectored(fileRanges, allocate);
validateVectoredReadResult(fileRanges, DATASET, 0);
returnBuffersToPoolPostRead(fileRanges, pool);
}
}
} }
/** /**
@ -329,10 +348,9 @@ public void testSomeRandomNonOverlappingRanges() throws Exception {
public void testConsecutiveRanges() throws Exception { public void testConsecutiveRanges() throws Exception {
List<FileRange> fileRanges = new ArrayList<>(); List<FileRange> fileRanges = new ArrayList<>();
final int offset = 500; final int offset = 500;
final int length = 100; final int length = 2011;
range(fileRanges, offset, length); range(fileRanges, offset, length);
range(fileRanges, 600, 200); range(fileRanges, offset + length, length);
range(fileRanges, 800, 100);
try (FSDataInputStream in = openVectorFile()) { try (FSDataInputStream in = openVectorFile()) {
in.readVectored(fileRanges, allocate); in.readVectored(fileRanges, allocate);
validateVectoredReadResult(fileRanges, DATASET, 0); validateVectoredReadResult(fileRanges, DATASET, 0);

View File

@ -261,4 +261,6 @@ public interface ContractOptions {
* Does vector read check file length on open rather than in the read call? * Does vector read check file length on open rather than in the read call?
*/ */
String VECTOR_IO_EARLY_EOF_CHECK = "vector-io-early-eof-check"; String VECTOR_IO_EARLY_EOF_CHECK = "vector-io-early-eof-check";
String VECTOR_IO_OVERLAPPING_RANGES = "vector-io-overlapping-ranges";
} }

View File

@ -142,4 +142,9 @@
<value>true</value> <value>true</value>
</property> </property>
<property>
<name>fs.contract.vector-io-overlapping-ranges</name>
<value>true</value>
</property>
</configuration> </configuration>

View File

@ -233,7 +233,7 @@
<configuration> <configuration>
<reuseForks>false</reuseForks> <reuseForks>false</reuseForks>
<forkedProcessTimeoutInSeconds>900</forkedProcessTimeoutInSeconds> <forkedProcessTimeoutInSeconds>900</forkedProcessTimeoutInSeconds>
<argLine>-Xmx1024m -XX:+HeapDumpOnOutOfMemoryError</argLine> <argLine>${maven-surefire-plugin.argLine} -Xmx1024m -XX:+HeapDumpOnOutOfMemoryError</argLine>
<environmentVariables> <environmentVariables>
<!-- HADOOP_HOME required for tests on Windows to find winutils --> <!-- HADOOP_HOME required for tests on Windows to find winutils -->
<HADOOP_HOME>${hadoop.common.build.dir}</HADOOP_HOME> <HADOOP_HOME>${hadoop.common.build.dir}</HADOOP_HOME>

View File

@ -1370,7 +1370,7 @@ public byte[] getXAttr(Path f, String name) throws IOException {
JSONObject json = (JSONObject) HttpFSUtils.jsonParse(conn); JSONObject json = (JSONObject) HttpFSUtils.jsonParse(conn);
Map<String, byte[]> xAttrs = createXAttrMap( Map<String, byte[]> xAttrs = createXAttrMap(
(JSONArray) json.get(XATTRS_JSON)); (JSONArray) json.get(XATTRS_JSON));
return xAttrs != null ? xAttrs.get(name) : null; return xAttrs.get(name);
} }
/** Convert xAttrs json to xAttrs map */ /** Convert xAttrs json to xAttrs map */

View File

@ -193,6 +193,8 @@ protected FSEditLogOp nextOp() throws IOException {
} catch (IOException e) { } catch (IOException e) {
prevException = e; prevException = e;
state = State.STREAM_FAILED; state = State.STREAM_FAILED;
LOG.warn("Got error skipUntil edit log input stream {}.", streams[curIdx].getName());
break;
} }
state = State.OK; state = State.OK;
break; break;

View File

@ -0,0 +1,67 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import java.io.IOException;
import java.util.ArrayList;
import org.junit.Test;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.MkdirOp;
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestRedundantEditLogInputStream {
private static final String FAKE_EDIT_STREAM_NAME = "FAKE_STREAM";
@Test
public void testNextOp() throws IOException {
EditLogInputStream fakeStream1 = mock(EditLogInputStream.class);
EditLogInputStream fakeStream2 = mock(EditLogInputStream.class);
ArrayList<EditLogInputStream> list = new ArrayList();
list.add(fakeStream1);
list.add(fakeStream2);
for (int i = 0; i < list.size(); i++) {
EditLogInputStream stream = list.get(i);
when(stream.getName()).thenReturn(FAKE_EDIT_STREAM_NAME + i);
when(stream.getFirstTxId()).thenReturn(1L);
when(stream.getLastTxId()).thenReturn(2L);
when(stream.length()).thenReturn(1L);
}
when(fakeStream1.skipUntil(1)).thenThrow(new IOException("skipUntil failed."));
when(fakeStream2.skipUntil(1)).thenReturn(true);
FSEditLogOp op = new MkdirOp();
op.setTransactionId(100);
when(fakeStream2.readOp()).thenReturn(op);
LogCapturer capture = LogCapturer.captureLogs(RedundantEditLogInputStream.LOG);
RedundantEditLogInputStream redundantEditLogInputStream =
new RedundantEditLogInputStream(list, 1);
FSEditLogOp returnOp = redundantEditLogInputStream.nextOp();
String log = capture.getOutput();
assertTrue(log.contains("Got error skipUntil edit log input stream FAKE_STREAM0"));
assertTrue(log.contains("Got error reading edit log input stream FAKE_STREAM0; "
+ "failing over to edit log FAKE_STREAM1"));
assertEquals(op, returnOp);
}
}

View File

@ -302,6 +302,8 @@ public synchronized void close() throws IOException {
try { try {
if (in != null) { if (in != null) {
in.close(); in.close();
} else if (fileIn != null) {
fileIn.close();
} }
} finally { } finally {
if (decompressor != null) { if (decompressor != null) {

View File

@ -99,45 +99,50 @@ public void initialize(InputSplit genericSplit,
MRJobConfig.INPUT_FILE_MANDATORY_PREFIX); MRJobConfig.INPUT_FILE_MANDATORY_PREFIX);
fileIn = FutureIO.awaitFuture(builder.build()); fileIn = FutureIO.awaitFuture(builder.build());
CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file); try {
if (null!=codec) { CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);
isCompressedInput = true; if (null!=codec) {
decompressor = CodecPool.getDecompressor(codec); isCompressedInput = true;
if (codec instanceof SplittableCompressionCodec) { decompressor = CodecPool.getDecompressor(codec);
final SplitCompressionInputStream cIn = if (codec instanceof SplittableCompressionCodec) {
((SplittableCompressionCodec)codec).createInputStream( final SplitCompressionInputStream cIn =
fileIn, decompressor, start, end, ((SplittableCompressionCodec)codec).createInputStream(
SplittableCompressionCodec.READ_MODE.BYBLOCK); fileIn, decompressor, start, end,
in = new CompressedSplitLineReader(cIn, job, SplittableCompressionCodec.READ_MODE.BYBLOCK);
this.recordDelimiterBytes); in = new CompressedSplitLineReader(cIn, job,
start = cIn.getAdjustedStart(); this.recordDelimiterBytes);
end = cIn.getAdjustedEnd(); start = cIn.getAdjustedStart();
filePosition = cIn; end = cIn.getAdjustedEnd();
} else { filePosition = cIn;
if (start != 0) { } else {
// So we have a split that is only part of a file stored using if (start != 0) {
// a Compression codec that cannot be split. // So we have a split that is only part of a file stored using
throw new IOException("Cannot seek in " + // a Compression codec that cannot be split.
codec.getClass().getSimpleName() + " compressed stream"); throw new IOException("Cannot seek in " +
} codec.getClass().getSimpleName() + " compressed stream");
}
in = new SplitLineReader(codec.createInputStream(fileIn, in = new SplitLineReader(codec.createInputStream(fileIn,
decompressor), job, this.recordDelimiterBytes); decompressor), job, this.recordDelimiterBytes);
filePosition = fileIn;
}
} else {
fileIn.seek(start);
in = new UncompressedSplitLineReader(
fileIn, job, this.recordDelimiterBytes, split.getLength());
filePosition = fileIn; filePosition = fileIn;
} }
} else { // If this is not the first split, we always throw away first record
fileIn.seek(start); // because we always (except the last split) read one extra line in
in = new UncompressedSplitLineReader( // next() method.
fileIn, job, this.recordDelimiterBytes, split.getLength()); if (start != 0) {
filePosition = fileIn; start += in.readLine(new Text(), 0, maxBytesToConsume(start));
}
this.pos = start;
} catch (Exception e) {
fileIn.close();
throw e;
} }
// If this is not the first split, we always throw away first record
// because we always (except the last split) read one extra line in
// next() method.
if (start != 0) {
start += in.readLine(new Text(), 0, maxBytesToConsume(start));
}
this.pos = start;
} }

View File

@ -167,8 +167,19 @@
<enforced.java.version>[${javac.version},)</enforced.java.version> <enforced.java.version>[${javac.version},)</enforced.java.version>
<enforced.maven.version>[3.3.0,)</enforced.maven.version> <enforced.maven.version>[3.3.0,)</enforced.maven.version>
<!-- keep this list sync with
hadoop-common-project/hadoop-common/src/main/bin/hadoop-functions.sh#hadoop_finalize_jpms_opts
-->
<extraJavaTestArgs> <extraJavaTestArgs>
-XX:+IgnoreUnrecognizedVMOptions -XX:+IgnoreUnrecognizedVMOptions
--add-opens=java.base/java.io=ALL-UNNAMED
--add-opens=java.base/java.lang=ALL-UNNAMED
--add-opens=java.base/java.lang.reflect=ALL-UNNAMED
--add-opens=java.base/java.math=ALL-UNNAMED
--add-opens=java.base/java.net=ALL-UNNAMED
--add-opens=java.base/java.text=ALL-UNNAMED
--add-opens=java.base/java.util=ALL-UNNAMED
--add-opens=java.base/java.util.concurrent=ALL-UNNAMED
--add-opens=java.base/java.util.zip=ALL-UNNAMED --add-opens=java.base/java.util.zip=ALL-UNNAMED
--add-opens=java.base/sun.security.util=ALL-UNNAMED --add-opens=java.base/sun.security.util=ALL-UNNAMED
--add-opens=java.base/sun.security.x509=ALL-UNNAMED --add-opens=java.base/sun.security.x509=ALL-UNNAMED
@ -2743,6 +2754,16 @@
</dependencies> </dependencies>
</dependencyManagement> </dependencyManagement>
</profile> </profile>
<!-- We added this profile to support compilation for JDK 9 and above. -->
<profile>
<id>java9</id>
<activation>
<jdk>[9,)</jdk>
</activation>
<properties>
<maven.compiler.release>${javac.version}</maven.compiler.release>
</properties>
</profile>
</profiles> </profiles>
<repositories> <repositories>

View File

@ -398,6 +398,21 @@ private Constants() {
public static final Duration DEFAULT_CONNECTION_ACQUISITION_TIMEOUT_DURATION = public static final Duration DEFAULT_CONNECTION_ACQUISITION_TIMEOUT_DURATION =
Duration.ofSeconds(60); Duration.ofSeconds(60);
/**
* Timeout for uploading all of a small object or a single part
* of a larger one.
* {@value}.
* Default unit is milliseconds for consistency with other options.
*/
public static final String PART_UPLOAD_TIMEOUT =
"fs.s3a.connection.part.upload.timeout";
/**
* Default part upload timeout: 15 minutes.
*/
public static final Duration DEFAULT_PART_UPLOAD_TIMEOUT =
Duration.ofMinutes(15);
/** /**
* Should TCP Keepalive be enabled on the socket? * Should TCP Keepalive be enabled on the socket?
* This adds some network IO, but finds failures faster. * This adds some network IO, but finds failures faster.

View File

@ -271,8 +271,10 @@ protected ClientOverrideConfiguration.Builder createClientOverrideConfiguration(
* <li>If endpoint is configured via via fs.s3a.endpoint, set it. * <li>If endpoint is configured via via fs.s3a.endpoint, set it.
* If no region is configured, try to parse region from endpoint. </li> * If no region is configured, try to parse region from endpoint. </li>
* <li> If no region is configured, and it could not be parsed from the endpoint, * <li> If no region is configured, and it could not be parsed from the endpoint,
* set the default region as US_EAST_2 and enable cross region access. </li> * set the default region as US_EAST_2</li>
* <li> If configured region is empty, fallback to SDK resolution chain. </li> * <li> If configured region is empty, fallback to SDK resolution chain. </li>
* <li> S3 cross region is enabled by default irrespective of region or endpoint
* is set or not.</li>
* </ol> * </ol>
* *
* @param builder S3 client builder. * @param builder S3 client builder.

View File

@ -1286,6 +1286,13 @@ protected RequestFactory createRequestFactory() {
STORAGE_CLASS); STORAGE_CLASS);
} }
// optional custom timeout for bulk uploads
Duration partUploadTimeout = ConfigurationHelper.getDuration(getConf(),
PART_UPLOAD_TIMEOUT,
DEFAULT_PART_UPLOAD_TIMEOUT,
TimeUnit.MILLISECONDS,
Duration.ZERO);
return RequestFactoryImpl.builder() return RequestFactoryImpl.builder()
.withBucket(requireNonNull(bucket)) .withBucket(requireNonNull(bucket))
.withCannedACL(getCannedACL()) .withCannedACL(getCannedACL())
@ -1295,6 +1302,7 @@ protected RequestFactory createRequestFactory() {
.withContentEncoding(contentEncoding) .withContentEncoding(contentEncoding)
.withStorageClass(storageClass) .withStorageClass(storageClass)
.withMultipartUploadEnabled(isMultipartUploadEnabled) .withMultipartUploadEnabled(isMultipartUploadEnabled)
.withPartUploadTimeout(partUploadTimeout)
.build(); .build();
} }

View File

@ -700,7 +700,8 @@ public SdkResponse modifyResponse(Context.ModifyResponse context,
* span is deactivated. * span is deactivated.
* Package-private for testing. * Package-private for testing.
*/ */
private final class WrappingAuditSpan extends AbstractAuditSpanImpl { @VisibleForTesting
final class WrappingAuditSpan extends AbstractAuditSpanImpl {
/** /**
* Inner span. * Inner span.
@ -792,6 +793,15 @@ public boolean isValidSpan() {
return isValid && span.isValidSpan(); return isValid && span.isValidSpan();
} }
/**
* Get the inner span.
* @return the span.
*/
@VisibleForTesting
AuditSpanS3A getSpan() {
return span;
}
/** /**
* Forward to the inner span. * Forward to the inner span.
* {@inheritDoc} * {@inheritDoc}

View File

@ -38,6 +38,7 @@
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.audit.AuditConstants; import org.apache.hadoop.fs.audit.AuditConstants;
import org.apache.hadoop.fs.audit.CommonAuditContext; import org.apache.hadoop.fs.audit.CommonAuditContext;
@ -252,6 +253,17 @@ private void setLastHeader(final String lastHeader) {
this.lastHeader = lastHeader; this.lastHeader = lastHeader;
} }
/**
* Get the referrer provided the span is an instance or
* subclass of LoggingAuditSpan.
* @param span span
* @return the referrer
* @throws ClassCastException if a different span type was passed in
*/
@VisibleForTesting
HttpReferrerAuditHeader getReferrer(AuditSpanS3A span) {
return ((LoggingAuditSpan) span).getReferrer();
}
/** /**
* Span which logs at debug and sets the HTTP referrer on * Span which logs at debug and sets the HTTP referrer on
* invocations. * invocations.
@ -441,10 +453,10 @@ public String toString() {
} }
/** /**
* Get the referrer; visible for tests. * Get the referrer.
* @return the referrer. * @return the referrer.
*/ */
HttpReferrerAuditHeader getReferrer() { private HttpReferrerAuditHeader getReferrer() {
return referrer; return referrer;
} }

View File

@ -26,6 +26,8 @@
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import software.amazon.awssdk.awscore.AwsRequest;
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption; import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
import software.amazon.awssdk.core.retry.RetryMode; import software.amazon.awssdk.core.retry.RetryMode;
@ -623,4 +625,24 @@ static ConnectionSettings createConnectionSettings(Configuration conf) {
socketTimeout); socketTimeout);
} }
/**
* Set a custom ApiCallTimeout for a single request.
* This allows for a longer timeout to be used in data upload
* requests than that for all other S3 interactions;
* This does not happen by default in the V2 SDK
* (see HADOOP-19295).
* <p>
* If the timeout is zero, the request is not patched.
* @param builder builder to patch.
* @param timeout timeout
*/
public static void setRequestTimeout(AwsRequest.Builder builder, Duration timeout) {
if (!timeout.isZero()) {
builder.overrideConfiguration(
AwsRequestOverrideConfiguration.builder()
.apiCallTimeout(timeout)
.apiCallAttemptTimeout(timeout)
.build());
}
}
} }

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.fs.s3a.impl; package org.apache.hadoop.fs.s3a.impl;
import java.time.Duration;
import java.util.Base64; import java.util.Base64;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
@ -59,7 +60,9 @@
import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets; import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets;
import static org.apache.commons.lang3.StringUtils.isNotEmpty; import static org.apache.commons.lang3.StringUtils.isNotEmpty;
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_PART_UPLOAD_TIMEOUT;
import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.UNKNOWN_ALGORITHM; import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.UNKNOWN_ALGORITHM;
import static org.apache.hadoop.fs.s3a.impl.AWSClientConfig.setRequestTimeout;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DEFAULT_UPLOAD_PART_COUNT_LIMIT; import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DEFAULT_UPLOAD_PART_COUNT_LIMIT;
import static org.apache.hadoop.util.Preconditions.checkArgument; import static org.apache.hadoop.util.Preconditions.checkArgument;
import static org.apache.hadoop.util.Preconditions.checkNotNull; import static org.apache.hadoop.util.Preconditions.checkNotNull;
@ -128,6 +131,12 @@ public class RequestFactoryImpl implements RequestFactory {
*/ */
private final boolean isMultipartUploadEnabled; private final boolean isMultipartUploadEnabled;
/**
* Timeout for uploading objects/parts.
* This will be set on data put/post operations only.
*/
private final Duration partUploadTimeout;
/** /**
* Constructor. * Constructor.
* @param builder builder with all the configuration. * @param builder builder with all the configuration.
@ -142,6 +151,7 @@ protected RequestFactoryImpl(
this.contentEncoding = builder.contentEncoding; this.contentEncoding = builder.contentEncoding;
this.storageClass = builder.storageClass; this.storageClass = builder.storageClass;
this.isMultipartUploadEnabled = builder.isMultipartUploadEnabled; this.isMultipartUploadEnabled = builder.isMultipartUploadEnabled;
this.partUploadTimeout = builder.partUploadTimeout;
} }
/** /**
@ -344,6 +354,11 @@ public PutObjectRequest.Builder newPutObjectRequestBuilder(String key,
putObjectRequestBuilder.storageClass(storageClass); putObjectRequestBuilder.storageClass(storageClass);
} }
// Set the timeout for object uploads but not directory markers.
if (!isDirectoryMarker) {
setRequestTimeout(putObjectRequestBuilder, partUploadTimeout);
}
return prepareRequest(putObjectRequestBuilder); return prepareRequest(putObjectRequestBuilder);
} }
@ -595,6 +610,9 @@ public UploadPartRequest.Builder newUploadPartRequestBuilder(
.partNumber(partNumber) .partNumber(partNumber)
.contentLength(size); .contentLength(size);
uploadPartEncryptionParameters(builder); uploadPartEncryptionParameters(builder);
// Set the request timeout for the part upload
setRequestTimeout(builder, partUploadTimeout);
return prepareRequest(builder); return prepareRequest(builder);
} }
@ -702,6 +720,13 @@ public static final class RequestFactoryBuilder {
*/ */
private boolean isMultipartUploadEnabled = true; private boolean isMultipartUploadEnabled = true;
/**
* Timeout for uploading objects/parts.
* This will be set on data put/post operations only.
* A zero value means "no custom timeout"
*/
private Duration partUploadTimeout = DEFAULT_PART_UPLOAD_TIMEOUT;
private RequestFactoryBuilder() { private RequestFactoryBuilder() {
} }
@ -799,6 +824,18 @@ public RequestFactoryBuilder withMultipartUploadEnabled(
this.isMultipartUploadEnabled = value; this.isMultipartUploadEnabled = value;
return this; return this;
} }
/**
* Timeout for uploading objects/parts.
* This will be set on data put/post operations only.
* A zero value means "no custom timeout"
* @param value new value
* @return the builder
*/
public RequestFactoryBuilder withPartUploadTimeout(final Duration value) {
partUploadTimeout = value;
return this;
}
} }
/** /**

View File

@ -26,6 +26,7 @@
import java.io.InputStream; import java.io.InputStream;
import java.io.UncheckedIOException; import java.io.UncheckedIOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.time.LocalDateTime;
import java.util.function.Supplier; import java.util.function.Supplier;
import javax.annotation.Nullable; import javax.annotation.Nullable;
@ -224,6 +225,12 @@ public static abstract class BaseContentProvider<T extends InputStream>
*/ */
private T currentStream; private T currentStream;
/**
* When did this upload start?
* Use in error messages.
*/
private final LocalDateTime startTime;
/** /**
* Constructor. * Constructor.
* @param size size of the data. Must be non-negative. * @param size size of the data. Must be non-negative.
@ -241,6 +248,7 @@ protected BaseContentProvider(int size, @Nullable Supplier<Boolean> isOpen) {
checkArgument(size >= 0, "size is negative: %s", size); checkArgument(size >= 0, "size is negative: %s", size);
this.size = size; this.size = size;
this.isOpen = isOpen; this.isOpen = isOpen;
this.startTime = LocalDateTime.now();
} }
/** /**
@ -274,8 +282,11 @@ public final InputStream newStream() {
close(); close();
checkOpen(); checkOpen();
streamCreationCount++; streamCreationCount++;
if (streamCreationCount > 1) { if (streamCreationCount == 2) {
LOG.info("Stream created more than once: {}", this); // the stream has been recreated for the first time.
// notify only once for this stream, so as not to flood
// the logs.
LOG.info("Stream recreated: {}", this);
} }
return setCurrentStream(createNewStream()); return setCurrentStream(createNewStream());
} }
@ -302,6 +313,14 @@ public int getSize() {
return size; return size;
} }
/**
* When did this upload start?
* @return start time
*/
public LocalDateTime getStartTime() {
return startTime;
}
/** /**
* Current stream. * Current stream.
* When {@link #newStream()} is called, this is set to the new value, * When {@link #newStream()} is called, this is set to the new value,
@ -330,6 +349,7 @@ protected T setCurrentStream(T stream) {
public String toString() { public String toString() {
return "BaseContentProvider{" + return "BaseContentProvider{" +
"size=" + size + "size=" + size +
", initiated at " + startTime +
", streamCreationCount=" + streamCreationCount + ", streamCreationCount=" + streamCreationCount +
", currentStream=" + currentStream + ", currentStream=" + currentStream +
'}'; '}';

View File

@ -356,6 +356,10 @@ public void testWithOutCrossRegionAccess() throws Exception {
// skip the test if the region is sa-east-1 // skip the test if the region is sa-east-1
skipCrossRegionTest(); skipCrossRegionTest();
final Configuration newConf = new Configuration(getConfiguration()); final Configuration newConf = new Configuration(getConfiguration());
removeBaseAndBucketOverrides(newConf,
ENDPOINT,
AWS_S3_CROSS_REGION_ACCESS_ENABLED,
AWS_REGION);
// disable cross region access // disable cross region access
newConf.setBoolean(AWS_S3_CROSS_REGION_ACCESS_ENABLED, false); newConf.setBoolean(AWS_S3_CROSS_REGION_ACCESS_ENABLED, false);
newConf.set(AWS_REGION, SA_EAST_1); newConf.set(AWS_REGION, SA_EAST_1);
@ -374,6 +378,7 @@ public void testWithCrossRegionAccess() throws Exception {
skipCrossRegionTest(); skipCrossRegionTest();
final Configuration newConf = new Configuration(getConfiguration()); final Configuration newConf = new Configuration(getConfiguration());
removeBaseAndBucketOverrides(newConf, removeBaseAndBucketOverrides(newConf,
ENDPOINT,
AWS_S3_CROSS_REGION_ACCESS_ENABLED, AWS_S3_CROSS_REGION_ACCESS_ENABLED,
AWS_REGION); AWS_REGION);
// enable cross region access // enable cross region access

View File

@ -41,6 +41,7 @@
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertLacksPathCapabilities; import static org.apache.hadoop.fs.contract.ContractTestUtils.assertLacksPathCapabilities;
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile; import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
import static org.apache.hadoop.fs.contract.ContractTestUtils.touch; import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_PART_UPLOAD_TIMEOUT;
import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_ALGORITHM; import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_ALGORITHM;
import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_KEY; import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_KEY;
import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM; import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM;
@ -100,7 +101,10 @@ public void testCreateNonRecursiveSuccess() throws IOException {
public void testPutObjectDirect() throws Throwable { public void testPutObjectDirect() throws Throwable {
final S3AFileSystem fs = getFileSystem(); final S3AFileSystem fs = getFileSystem();
try (AuditSpan span = span()) { try (AuditSpan span = span()) {
RequestFactory factory = RequestFactoryImpl.builder().withBucket(fs.getBucket()).build(); RequestFactory factory = RequestFactoryImpl.builder()
.withBucket(fs.getBucket())
.withPartUploadTimeout(DEFAULT_PART_UPLOAD_TIMEOUT)
.build();
Path path = path("putDirect"); Path path = path("putDirect");
PutObjectRequest.Builder putObjectRequestBuilder = PutObjectRequest.Builder putObjectRequestBuilder =
factory.newPutObjectRequestBuilder(path.toUri().getPath(), null, -1, false); factory.newPutObjectRequestBuilder(path.toUri().getPath(), null, -1, false);

View File

@ -54,6 +54,7 @@
import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Progressable;
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_PART_UPLOAD_TIMEOUT;
import static org.apache.hadoop.fs.s3a.audit.AuditTestSupport.noopAuditor; import static org.apache.hadoop.fs.s3a.audit.AuditTestSupport.noopAuditor;
import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.stubDurationTrackerFactory; import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.stubDurationTrackerFactory;
import static org.apache.hadoop.util.Preconditions.checkNotNull; import static org.apache.hadoop.util.Preconditions.checkNotNull;
@ -99,6 +100,7 @@ public class MockS3AFileSystem extends S3AFileSystem {
.withRequestPreparer(MockS3AFileSystem::prepareRequest) .withRequestPreparer(MockS3AFileSystem::prepareRequest)
.withBucket(BUCKET) .withBucket(BUCKET)
.withEncryptionSecrets(new EncryptionSecrets()) .withEncryptionSecrets(new EncryptionSecrets())
.withPartUploadTimeout(DEFAULT_PART_UPLOAD_TIMEOUT)
.build(); .build();
/** /**

View File

@ -24,6 +24,7 @@
import java.util.Map; import java.util.Map;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import org.assertj.core.api.Assertions;
import software.amazon.awssdk.http.SdkHttpRequest; import software.amazon.awssdk.http.SdkHttpRequest;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -32,6 +33,7 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.s3a.audit.impl.LoggingAuditor; import org.apache.hadoop.fs.s3a.audit.impl.LoggingAuditor;
import org.apache.hadoop.fs.s3a.audit.impl.ReferrerExtractor;
import org.apache.hadoop.fs.store.audit.AuditSpan; import org.apache.hadoop.fs.store.audit.AuditSpan;
import org.apache.hadoop.fs.audit.CommonAuditContext; import org.apache.hadoop.fs.audit.CommonAuditContext;
import org.apache.hadoop.fs.store.audit.HttpReferrerAuditHeader; import org.apache.hadoop.fs.store.audit.HttpReferrerAuditHeader;
@ -417,4 +419,33 @@ private void expectStrippedField(final String str,
.describedAs("Stripped <%s>", str) .describedAs("Stripped <%s>", str)
.isEqualTo(ex); .isEqualTo(ex);
} }
/**
* Verify that exceptions raised when building referrer headers
* do not result in failures, just an empty header.
*/
@Test
public void testSpanResilience() throws Throwable {
final CommonAuditContext auditContext = CommonAuditContext.currentAuditContext();
final String failing = "failing";
auditContext.put(failing, () -> {
throw new RuntimeException("raised");
});
try {
final HttpReferrerAuditHeader referrer = ReferrerExtractor.getReferrer(auditor, span());
Assertions.assertThat(referrer.buildHttpReferrer())
.describedAs("referrer header")
.isBlank();
// repeat
LOG.info("second attempt: there should be no second warning below");
Assertions.assertThat(referrer.buildHttpReferrer())
.describedAs("referrer header 2")
.isBlank();
referrer.buildHttpReferrer();
} finally {
// critical to remove this so it doesn't interfere with any other
// tests
auditContext.remove(failing);
}
}
} }

View File

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.audit.impl;
import org.apache.hadoop.fs.s3a.audit.AuditSpanS3A;
import org.apache.hadoop.fs.store.audit.HttpReferrerAuditHeader;
/**
* Extract the referrer from a LoggingAuditor through a package-private
* method.
*/
public final class ReferrerExtractor {
private ReferrerExtractor() {
}
/**
* Get the referrer provided the span is an instance or
* subclass of LoggingAuditSpan.
* If wrapped by a {@code WrappingAuditSpan}, it will be extracted.
* @param auditor the auditor.
* @param span span
* @return the referrer
* @throws ClassCastException if a different span type was passed in
*/
public static HttpReferrerAuditHeader getReferrer(LoggingAuditor auditor,
AuditSpanS3A span) {
AuditSpanS3A sp;
if (span instanceof ActiveAuditManagerS3A.WrappingAuditSpan) {
sp = ((ActiveAuditManagerS3A.WrappingAuditSpan) span).getSpan();
} else {
sp = span;
}
return auditor.getReferrer(sp);
}
}

View File

@ -153,7 +153,7 @@ public Configuration createConfiguration() {
*/ */
@Override @Override
public void setup() throws Exception { public void setup() throws Exception {
SdkFaultInjector.resetEvaluator(); SdkFaultInjector.resetFaultInjector();
super.setup(); super.setup();
} }
@ -161,7 +161,7 @@ public void setup() throws Exception {
public void teardown() throws Exception { public void teardown() throws Exception {
// safety check in case the evaluation is failing any // safety check in case the evaluation is failing any
// request needed in cleanup. // request needed in cleanup.
SdkFaultInjector.resetEvaluator(); SdkFaultInjector.resetFaultInjector();
super.teardown(); super.teardown();
} }

View File

@ -21,7 +21,9 @@
import java.time.Duration; import java.time.Duration;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.assertj.core.api.Assertions;
import org.junit.Test; import org.junit.Test;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -33,8 +35,12 @@
import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.s3a.AbstractS3ATestBase; import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.api.PerformanceFlagEnum;
import org.apache.hadoop.fs.s3a.test.SdkFaultInjector;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.ConnectTimeoutException; import org.apache.hadoop.net.ConnectTimeoutException;
import org.apache.hadoop.util.DurationInfo;
import org.apache.hadoop.util.OperationDuration;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY; import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE; import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
@ -42,16 +48,19 @@
import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_ACQUISITION_TIMEOUT; import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_ACQUISITION_TIMEOUT;
import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_IDLE_TIME; import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_IDLE_TIME;
import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_TTL; import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_TTL;
import static org.apache.hadoop.fs.s3a.Constants.DIRECTORY_OPERATIONS_PURGE_UPLOADS;
import static org.apache.hadoop.fs.s3a.Constants.ESTABLISH_TIMEOUT; import static org.apache.hadoop.fs.s3a.Constants.ESTABLISH_TIMEOUT;
import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_PERFORMANCE; import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_PERFORMANCE;
import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_PERFORMANCE_FLAGS; import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_PERFORMANCE_FLAGS;
import static org.apache.hadoop.fs.s3a.Constants.MAXIMUM_CONNECTIONS; import static org.apache.hadoop.fs.s3a.Constants.MAXIMUM_CONNECTIONS;
import static org.apache.hadoop.fs.s3a.Constants.MAX_ERROR_RETRIES; import static org.apache.hadoop.fs.s3a.Constants.MAX_ERROR_RETRIES;
import static org.apache.hadoop.fs.s3a.Constants.PART_UPLOAD_TIMEOUT;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY; import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
import static org.apache.hadoop.fs.s3a.Constants.REQUEST_TIMEOUT; import static org.apache.hadoop.fs.s3a.Constants.REQUEST_TIMEOUT;
import static org.apache.hadoop.fs.s3a.Constants.RETRY_LIMIT; import static org.apache.hadoop.fs.s3a.Constants.RETRY_LIMIT;
import static org.apache.hadoop.fs.s3a.Constants.SOCKET_TIMEOUT; import static org.apache.hadoop.fs.s3a.Constants.SOCKET_TIMEOUT;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_PATH_PREFIX;
import static org.apache.hadoop.fs.s3a.impl.ConfigurationHelper.setDurationAsMillis; import static org.apache.hadoop.fs.s3a.impl.ConfigurationHelper.setDurationAsMillis;
import static org.apache.hadoop.test.LambdaTestUtils.intercept; import static org.apache.hadoop.test.LambdaTestUtils.intercept;
@ -63,7 +72,7 @@
* The likely cause is actually -Dprefetch test runs as these return connections to * The likely cause is actually -Dprefetch test runs as these return connections to
* the pool. * the pool.
* However, it is also important to have a non-brittle FS for creating the test file * However, it is also important to have a non-brittle FS for creating the test file
* and teardow, again, this makes for a flaky test.. * and teardown, again, this makes for a flaky test.
*/ */
public class ITestConnectionTimeouts extends AbstractS3ATestBase { public class ITestConnectionTimeouts extends AbstractS3ATestBase {
@ -72,6 +81,23 @@ public class ITestConnectionTimeouts extends AbstractS3ATestBase {
*/ */
public static final int FILE_SIZE = 1024; public static final int FILE_SIZE = 1024;
public static final byte[] DATASET = dataset(FILE_SIZE, '0', 10);
public static final Duration UPLOAD_DURATION = Duration.ofSeconds(15);
@Override
protected Configuration createConfiguration() {
final Configuration conf = super.createConfiguration();
removeBaseAndBucketOverrides(conf,
DIRECTORY_OPERATIONS_PURGE_UPLOADS,
PART_UPLOAD_TIMEOUT);
setDurationAsMillis(conf, PART_UPLOAD_TIMEOUT, UPLOAD_DURATION);
// set this so teardown will clean pending uploads.
conf.setBoolean(DIRECTORY_OPERATIONS_PURGE_UPLOADS, true);
return conf;
}
/** /**
* Create a configuration for an FS which has timeouts set to very low values * Create a configuration for an FS which has timeouts set to very low values
* and no retries. * and no retries.
@ -86,6 +112,7 @@ private Configuration timingOutConfiguration() {
ESTABLISH_TIMEOUT, ESTABLISH_TIMEOUT,
MAX_ERROR_RETRIES, MAX_ERROR_RETRIES,
MAXIMUM_CONNECTIONS, MAXIMUM_CONNECTIONS,
PART_UPLOAD_TIMEOUT,
PREFETCH_ENABLED_KEY, PREFETCH_ENABLED_KEY,
REQUEST_TIMEOUT, REQUEST_TIMEOUT,
SOCKET_TIMEOUT, SOCKET_TIMEOUT,
@ -118,7 +145,6 @@ public void teardown() throws Exception {
*/ */
@Test @Test
public void testGeneratePoolTimeouts() throws Throwable { public void testGeneratePoolTimeouts() throws Throwable {
byte[] data = dataset(FILE_SIZE, '0', 10);
AWSClientConfig.setMinimumOperationDuration(Duration.ZERO); AWSClientConfig.setMinimumOperationDuration(Duration.ZERO);
Configuration conf = timingOutConfiguration(); Configuration conf = timingOutConfiguration();
Path path = methodPath(); Path path = methodPath();
@ -127,7 +153,7 @@ public void testGeneratePoolTimeouts() throws Throwable {
final S3AFileSystem fs = getFileSystem(); final S3AFileSystem fs = getFileSystem();
// create the test file using the good fs, to avoid connection timeouts // create the test file using the good fs, to avoid connection timeouts
// during setup. // during setup.
ContractTestUtils.createFile(fs, path, true, data); ContractTestUtils.createFile(fs, path, true, DATASET);
final FileStatus st = fs.getFileStatus(path); final FileStatus st = fs.getFileStatus(path);
try (FileSystem brittleFS = FileSystem.newInstance(fs.getUri(), conf)) { try (FileSystem brittleFS = FileSystem.newInstance(fs.getUri(), conf)) {
intercept(ConnectTimeoutException.class, () -> { intercept(ConnectTimeoutException.class, () -> {
@ -148,4 +174,102 @@ public void testGeneratePoolTimeouts() throws Throwable {
}); });
} }
} }
/**
* Verify that different timeouts are used for object upload operations.
* The PUT operation can take longer than the value set as the
* connection.request.timeout, but other operations (GET) will
* fail.
* <p>
* This test tries to balance "being fast" with "not failing assertions
* in parallel test runs".
*/
@Test
public void testObjectUploadTimeouts() throws Throwable {
AWSClientConfig.setMinimumOperationDuration(Duration.ZERO);
final Path dir = methodPath();
Path file = new Path(dir, "file");
Configuration conf = new Configuration(getConfiguration());
removeBaseAndBucketOverrides(conf,
PART_UPLOAD_TIMEOUT,
REQUEST_TIMEOUT,
FS_S3A_PERFORMANCE_FLAGS
);
// skip all checks
conf.set(FS_S3A_PERFORMANCE_FLAGS, PerformanceFlagEnum.Create.name());
final int uploadTimeout = 10;
// uploads have a long timeout
final Duration uploadDuration = Duration.ofSeconds(uploadTimeout);
setDurationAsMillis(conf, PART_UPLOAD_TIMEOUT, uploadDuration);
// other requests a short one
final Duration shortTimeout = Duration.ofSeconds(5);
setDurationAsMillis(conf, REQUEST_TIMEOUT, shortTimeout);
setDurationAsMillis(conf, CONNECTION_ACQUISITION_TIMEOUT, shortTimeout);
conf.setInt(RETRY_LIMIT, 0);
SdkFaultInjector.resetFaultInjector();
// total sleep time is tracked for extra assertions
final AtomicLong totalSleepTime = new AtomicLong(0);
// fault injector is set to sleep for a bit less than the upload timeout.
final long sleepTime = uploadDuration.toMillis() - 2000;
SdkFaultInjector.setAction((req, resp) -> {
totalSleepTime.addAndGet(sleepTime);
LOG.info("sleeping {} millis", sleepTime);
try {
Thread.sleep(sleepTime);
} catch (InterruptedException ignored) {
}
return resp;
});
SdkFaultInjector.setRequestFailureConditions(999,
SdkFaultInjector::isPutRequest);
SdkFaultInjector.addFaultInjection(conf);
final S3AFileSystem fs = getFileSystem();
try (FileSystem brittleFS = FileSystem.newInstance(fs.getUri(), conf)) {
OperationDuration dur = new DurationInfo(LOG, "Creating File");
ContractTestUtils.createFile(brittleFS, file, true, DATASET);
dur.finished();
Assertions.assertThat(totalSleepTime.get())
.describedAs("total sleep time of PUT")
.isGreaterThan(0);
Assertions.assertThat(dur.asDuration())
.describedAs("Duration of write")
.isGreaterThan(shortTimeout)
.isLessThan(uploadDuration);
// reading the file will fail because sleepiing
totalSleepTime.set(0);
LOG.debug("attempting read");
SdkFaultInjector.setRequestFailureConditions(999,
SdkFaultInjector::isGetRequest);
// the exact IOE depends on what failed; if it is in the http read it will be a
// software.amazon.awssdk.thirdparty.org.apache.http.ConnectionClosedException
// which is too low level to safely assert about.
// it can also surface as an UncheckedIOException wrapping the inner cause.
intercept(Exception.class, () ->
ContractTestUtils.readUTF8(brittleFS, file, DATASET.length));
Assertions.assertThat(totalSleepTime.get())
.describedAs("total sleep time of read")
.isGreaterThan(0);
// and try a multipart upload to verify that its requests also outlast
// the short requests
SdkFaultInjector.setRequestFailureConditions(999,
SdkFaultInjector::isPartUpload);
Path magicFile = new Path(dir, MAGIC_PATH_PREFIX + "0001/__base/file2");
totalSleepTime.set(0);
OperationDuration dur2 = new DurationInfo(LOG, "Creating File");
ContractTestUtils.createFile(brittleFS, magicFile, true, DATASET);
dur2.finished();
Assertions.assertThat(totalSleepTime.get())
.describedAs("total sleep time of magic write")
.isGreaterThan(0);
Assertions.assertThat(dur2.asDuration())
.describedAs("Duration of magic write")
.isGreaterThan(shortTimeout);
brittleFS.delete(dir, true);
}
}
} }

View File

@ -19,16 +19,21 @@
package org.apache.hadoop.fs.s3a.impl; package org.apache.hadoop.fs.s3a.impl;
import java.io.IOException; import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import software.amazon.awssdk.awscore.AwsRequest; import software.amazon.awssdk.awscore.AwsRequest;
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
import software.amazon.awssdk.core.SdkRequest; import software.amazon.awssdk.core.SdkRequest;
import software.amazon.awssdk.services.s3.model.HeadObjectResponse; import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
import org.assertj.core.api.Assertions; import org.assertj.core.api.Assertions;
import org.junit.Test; import org.junit.Test;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.S3Request;
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
import org.apache.hadoop.fs.PathIOException; import org.apache.hadoop.fs.PathIOException;
@ -38,6 +43,7 @@
import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets; import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets;
import org.apache.hadoop.test.AbstractHadoopTestBase; import org.apache.hadoop.test.AbstractHadoopTestBase;
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_PART_UPLOAD_TIMEOUT;
import static org.apache.hadoop.test.LambdaTestUtils.intercept; import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
@ -109,8 +115,6 @@ public void testRequestFactoryWithCannedACL() throws Throwable {
.isEqualTo(acl); .isEqualTo(acl);
} }
/** /**
* Now add a processor and verify that it was invoked for * Now add a processor and verify that it was invoked for
* exactly as many requests as were analyzed. * exactly as many requests as were analyzed.
@ -207,4 +211,64 @@ public void testMultipartUploadRequest() throws Throwable {
.isEqualTo(requestsAnalyzed); .isEqualTo(requestsAnalyzed);
} }
/**
* Assertion for Request timeouts.
* @param duration expected duration.
* @param request request.
*/
private void assertApiTimeouts(Duration duration, S3Request request) {
Assertions.assertThat(request.overrideConfiguration())
.describedAs("request %s", request)
.isNotEmpty();
final AwsRequestOverrideConfiguration override =
request.overrideConfiguration().get();
Assertions.assertThat(override.apiCallAttemptTimeout())
.describedAs("apiCallAttemptTimeout")
.hasValue(duration);
Assertions.assertThat(override.apiCallTimeout())
.describedAs("apiCallTimeout")
.hasValue(duration);
}
/**
* If not overridden timeouts are set to the default part upload timeout.
*/
@Test
public void testDefaultUploadTimeouts() throws Throwable {
RequestFactory factory = RequestFactoryImpl.builder()
.withBucket("bucket")
.withMultipartPartCountLimit(2)
.build();
final UploadPartRequest upload =
factory.newUploadPartRequestBuilder("path", "id", 2, 128_000_000).build();
assertApiTimeouts(DEFAULT_PART_UPLOAD_TIMEOUT, upload);
}
/**
* Verify that when upload request timeouts are set,
* they are passed down.
*/
@Test
public void testUploadTimeouts() throws Throwable {
Duration partDuration = Duration.ofDays(1);
RequestFactory factory = RequestFactoryImpl.builder()
.withBucket("bucket")
.withPartUploadTimeout(partDuration)
.build();
String path = "path";
// A simple PUT
final PutObjectRequest put = factory.newPutObjectRequestBuilder(path,
PutObjectOptions.deletingDirs(), 1024, false).build();
assertApiTimeouts(partDuration, put);
// multipart part
final UploadPartRequest upload = factory.newUploadPartRequestBuilder(path,
"1", 3, 128_000_000)
.build();
assertApiTimeouts(partDuration, upload);
}
} }

View File

@ -166,7 +166,7 @@ protected Configuration createScaleConfiguration() {
*/ */
@Override @Override
public void setup() throws Exception { public void setup() throws Exception {
SdkFaultInjector.resetEvaluator(); SdkFaultInjector.resetFaultInjector();
super.setup(); super.setup();
} }
@ -174,7 +174,7 @@ public void setup() throws Exception {
public void teardown() throws Exception { public void teardown() throws Exception {
// safety check in case the evaluation is failing any // safety check in case the evaluation is failing any
// request needed in cleanup. // request needed in cleanup.
SdkFaultInjector.resetEvaluator(); SdkFaultInjector.resetFaultInjector();
super.teardown(); super.teardown();
} }

View File

@ -28,6 +28,7 @@
import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_MIN_SIZE; import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_MIN_SIZE;
import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_SIZE; import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_SIZE;
import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_UPLOADS_ENABLED; import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_UPLOADS_ENABLED;
import static org.apache.hadoop.fs.s3a.Constants.PART_UPLOAD_TIMEOUT;
import static org.apache.hadoop.fs.s3a.Constants.REQUEST_TIMEOUT; import static org.apache.hadoop.fs.s3a.Constants.REQUEST_TIMEOUT;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
@ -78,12 +79,13 @@ protected Configuration createScaleConfiguration() {
MIN_MULTIPART_THRESHOLD, MIN_MULTIPART_THRESHOLD,
MULTIPART_UPLOADS_ENABLED, MULTIPART_UPLOADS_ENABLED,
MULTIPART_SIZE, MULTIPART_SIZE,
PART_UPLOAD_TIMEOUT,
REQUEST_TIMEOUT); REQUEST_TIMEOUT);
conf.setInt(IO_CHUNK_BUFFER_SIZE, 655360); conf.setInt(IO_CHUNK_BUFFER_SIZE, 655360);
conf.setInt(MIN_MULTIPART_THRESHOLD, MULTIPART_MIN_SIZE); conf.setInt(MIN_MULTIPART_THRESHOLD, MULTIPART_MIN_SIZE);
conf.setInt(MULTIPART_SIZE, MULTIPART_MIN_SIZE); conf.setInt(MULTIPART_SIZE, MULTIPART_MIN_SIZE);
conf.setBoolean(MULTIPART_UPLOADS_ENABLED, false); conf.setBoolean(MULTIPART_UPLOADS_ENABLED, false);
conf.set(REQUEST_TIMEOUT, SINGLE_PUT_REQUEST_TIMEOUT); conf.set(PART_UPLOAD_TIMEOUT, SINGLE_PUT_REQUEST_TIMEOUT);
return conf; return conf;
} }

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.fs.s3a.test; package org.apache.hadoop.fs.s3a.test;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Function; import java.util.function.Function;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -35,6 +36,7 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.fs.s3a.audit.AuditTestSupport.enableLoggingAuditor; import static org.apache.hadoop.fs.s3a.audit.AuditTestSupport.enableLoggingAuditor;
import static org.apache.hadoop.fs.s3a.audit.AuditTestSupport.resetAuditOptions; import static org.apache.hadoop.fs.s3a.audit.AuditTestSupport.resetAuditOptions;
import static org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.AUDIT_EXECUTION_INTERCEPTORS; import static org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.AUDIT_EXECUTION_INTERCEPTORS;
@ -77,6 +79,13 @@ public final class SdkFaultInjector implements ExecutionInterceptor {
*/ */
private static Function<Context.ModifyHttpResponse, Boolean> evaluator = ALWAYS_ALLOW; private static Function<Context.ModifyHttpResponse, Boolean> evaluator = ALWAYS_ALLOW;
/**
* Action to take on failure.
*/
private static BiFunction<SdkRequest, SdkHttpResponse, SdkHttpResponse>
action = SdkFaultInjector::patchStatusCode;
/** /**
* Update the value of {@link #FAILURE_STATUS_CODE}. * Update the value of {@link #FAILURE_STATUS_CODE}.
* @param value new value * @param value new value
@ -97,10 +106,14 @@ public static void setEvaluator(Function<Context.ModifyHttpResponse, Boolean> va
/** /**
* Reset the evaluator to enable everything. * Reset fault injection.
* The evaluator will enable everything;
* the failure action is set to
* {@link #patchStatusCode(SdkRequest, SdkHttpResponse)}.
*/ */
public static void resetEvaluator() { public static void resetFaultInjector() {
setEvaluator(ALWAYS_ALLOW); setEvaluator(ALWAYS_ALLOW);
setAction(SdkFaultInjector::patchStatusCode);
} }
/** /**
@ -123,6 +136,23 @@ public static void setRequestFailureConditions(final int attempts,
setEvaluator(condition); setEvaluator(condition);
} }
/**
* Set the action to invoke.
* @param action new action.
*/
public static void setAction(BiFunction<SdkRequest, SdkHttpResponse, SdkHttpResponse> action) {
SdkFaultInjector.action = requireNonNull(action);
}
/**
* Is the response being processed from a GET request?
* @param context request context.
* @return true if the request is of the right type.
*/
public static boolean isGetRequest(final Context.ModifyHttpResponse context) {
return context.httpRequest().method().equals(SdkHttpMethod.GET);
}
/** /**
* Is the response being processed from a PUT request? * Is the response being processed from a PUT request?
* @param context request context. * @param context request context.
@ -168,6 +198,8 @@ public static boolean isMultipartAbort(final Context.ModifyHttpResponse context)
return context.request() instanceof AbortMultipartUploadRequest; return context.request() instanceof AbortMultipartUploadRequest;
} }
/** /**
* Review response from S3 and optionall modify its status code. * Review response from S3 and optionall modify its status code.
* @return the original response or a copy with a different status code. * @return the original response or a copy with a different status code.
@ -179,14 +211,7 @@ public SdkHttpResponse modifyHttpResponse(final Context.ModifyHttpResponse conte
SdkHttpResponse httpResponse = context.httpResponse(); SdkHttpResponse httpResponse = context.httpResponse();
if (evaluator.apply(context) && shouldFail()) { if (evaluator.apply(context) && shouldFail()) {
// fail the request return action.apply(request, httpResponse);
final int code = FAILURE_STATUS_CODE.get();
LOG.info("Fault Injector returning {} error code for request {}",
code, request);
return httpResponse.copy(b -> {
b.statusCode(code);
});
} else { } else {
// pass unchanged // pass unchanged
@ -194,6 +219,25 @@ public SdkHttpResponse modifyHttpResponse(final Context.ModifyHttpResponse conte
} }
} }
/**
* The default fault injector: patch the status code with the value in
* {@link #FAILURE_STATUS_CODE}.
* @param request original request
* @param httpResponse ongoing response
* @return modified response.
*/
public static SdkHttpResponse patchStatusCode(final SdkRequest request,
final SdkHttpResponse httpResponse) {
// fail the request
final int code = FAILURE_STATUS_CODE.get();
LOG.info("Fault Injector returning {} error code for request {}",
code, request);
return httpResponse.copy(b -> {
b.statusCode(code);
});
}
/** /**
* Should the request fail based on the failure count? * Should the request fail based on the failure count?
* @return true if the request count means a request must fail * @return true if the request count means a request must fail

View File

@ -143,7 +143,7 @@
<forkCount>1</forkCount> <forkCount>1</forkCount>
<reuseForks>false</reuseForks> <reuseForks>false</reuseForks>
<forkedProcessTimeoutInSeconds>600</forkedProcessTimeoutInSeconds> <forkedProcessTimeoutInSeconds>600</forkedProcessTimeoutInSeconds>
<argLine>-Xmx1024m</argLine> <argLine>${maven-surefire-plugin.argLine} -Xmx1024m</argLine>
<includes> <includes>
<include>**/Test*.java</include> <include>**/Test*.java</include>
</includes> </includes>

View File

@ -147,7 +147,7 @@
<forkCount>1</forkCount> <forkCount>1</forkCount>
<reuseForks>false</reuseForks> <reuseForks>false</reuseForks>
<forkedProcessTimeoutInSeconds>600</forkedProcessTimeoutInSeconds> <forkedProcessTimeoutInSeconds>600</forkedProcessTimeoutInSeconds>
<argLine>-Xmx1024m</argLine> <argLine>${maven-surefire-plugin.argLine} -Xmx1024m</argLine>
<includes> <includes>
<include>**/Test*.java</include> <include>**/Test*.java</include>
</includes> </includes>

View File

@ -170,6 +170,9 @@ private NodeForPreemption getPreemptionCandidatesOnNode(
Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates, Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
Resource totalPreemptionAllowed, boolean readOnly) { Resource totalPreemptionAllowed, boolean readOnly) {
RMContainer reservedContainer = node.getReservedContainer(); RMContainer reservedContainer = node.getReservedContainer();
if (reservedContainer == null) {
return null;
}
Resource available = Resources.clone(node.getUnallocatedResource()); Resource available = Resources.clone(node.getUnallocatedResource());
Resource totalSelected = Resources.createResource(0); Resource totalSelected = Resources.createResource(0);
List<RMContainer> sortedRunningContainers = List<RMContainer> sortedRunningContainers =

View File

@ -876,10 +876,9 @@ private void completeOustandingUpdatesWhichAreReserved(
RMContainer rmContainer, ContainerStatus containerStatus, RMContainer rmContainer, ContainerStatus containerStatus,
RMContainerEventType event) { RMContainerEventType event) {
N schedulerNode = getSchedulerNode(rmContainer.getNodeId()); N schedulerNode = getSchedulerNode(rmContainer.getNodeId());
if (schedulerNode != null && if (schedulerNode != null) {
schedulerNode.getReservedContainer() != null) {
RMContainer resContainer = schedulerNode.getReservedContainer(); RMContainer resContainer = schedulerNode.getReservedContainer();
if (resContainer.getReservedSchedulerKey() != null) { if (resContainer != null && resContainer.getReservedSchedulerKey() != null) {
ContainerId containerToUpdate = resContainer ContainerId containerToUpdate = resContainer
.getReservedSchedulerKey().getContainerToUpdate(); .getReservedSchedulerKey().getContainerToUpdate();
if (containerToUpdate != null && if (containerToUpdate != null &&

View File

@ -858,12 +858,13 @@ private ContainerAllocation allocate(Resource clusterResource,
FiCaSchedulerNode node = iter.next(); FiCaSchedulerNode node = iter.next();
// Do not schedule if there are any reservations to fulfill on the node // Do not schedule if there are any reservations to fulfill on the node
RMContainer nodeReservedContainer = node.getReservedContainer();
if (iter.hasNext() && if (iter.hasNext() &&
node.getReservedContainer() != null && nodeReservedContainer != null &&
isSkipAllocateOnNodesWithReservedContainer()) { isSkipAllocateOnNodesWithReservedContainer()) {
LOG.debug("Skipping scheduling on node {} since it has already been" LOG.debug("Skipping scheduling on node {} since it has already been"
+ " reserved by {}", node.getNodeID(), + " reserved by {}", node.getNodeID(),
node.getReservedContainer().getContainerId()); nodeReservedContainer.getContainerId());
ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
activitiesManager, node, application, schedulerKey, activitiesManager, node, application, schedulerKey,
ActivityDiagnosticConstant.NODE_HAS_BEEN_RESERVED, ActivityLevel.NODE); ActivityDiagnosticConstant.NODE_HAS_BEEN_RESERVED, ActivityLevel.NODE);

View File

@ -520,13 +520,13 @@ public boolean accept(Resource cluster,
// When reserve a resource (state == NEW is for new container, // When reserve a resource (state == NEW is for new container,
// state == RUNNING is for increase container). // state == RUNNING is for increase container).
// Just check if the node is not already reserved by someone // Just check if the node is not already reserved by someone
if (schedulerContainer.getSchedulerNode().getReservedContainer() RMContainer reservedContainer =
!= null) { schedulerContainer.getSchedulerNode().getReservedContainer();
if (reservedContainer != null) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Try to reserve a container, but the node is " LOG.debug("Try to reserve a container, but the node is "
+ "already reserved by another container=" + "already reserved by another container="
+ schedulerContainer.getSchedulerNode() + reservedContainer.getContainerId());
.getReservedContainer().getContainerId());
} }
return false; return false;
} }

View File

@ -491,6 +491,8 @@
</plugin> </plugin>
<!-- The fork value is deliberately set to 0 to avoid VM crash while running tests <!-- The fork value is deliberately set to 0 to avoid VM crash while running tests
on Jenkins, removing this leads to tests crashing silently due to VM crash --> on Jenkins, removing this leads to tests crashing silently due to VM crash -->
<!-- TODO: we should investigate and address the crash issue and re-enable fork,
otherwise, JPMS args does not take effect -->
<plugin> <plugin>
<artifactId>maven-surefire-plugin</artifactId> <artifactId>maven-surefire-plugin</artifactId>
<configuration> <configuration>