HADOOP-18410. S3AInputStream.unbuffer() does not release http connections -prefetch changes(#4766)

Changes in HADOOP-18410 which are needed for the S3A prefetching stream; needed
as part of the HADOOP-18703 backport

Change-Id: Ib403ca793e29a4416e5d892f9081de5832da3b68
This commit is contained in:
Steve Loughran 2022-08-31 11:16:52 +01:00
parent 312b776833
commit 8fafc83749
No known key found for this signature in database
GPG Key ID: D22CF846DBB162A0
2 changed files with 17 additions and 93 deletions

View File

@ -27,6 +27,7 @@
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -36,12 +37,10 @@
import org.apache.hadoop.fs.s3a.S3AReadOpContext;
import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
import org.apache.hadoop.fs.s3a.impl.ChangeTracker;
import org.apache.hadoop.fs.s3a.impl.SDKStreamDrainer;
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
import org.apache.hadoop.fs.statistics.DurationTracker;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration;
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
/**
* Encapsulates low level interactions with S3 object on AWS.
*/
@ -79,7 +78,7 @@ public class S3ARemoteObject {
* Maps a stream returned by openForRead() to the associated S3 object.
* That allows us to close the object when closing the stream.
*/
private Map<InputStream, S3Object> s3Objects;
private final Map<InputStream, S3Object> s3Objects;
/**
* uri of the object being read.
@ -225,104 +224,27 @@ public InputStream openForRead(long offset, int size) throws IOException {
void close(InputStream inputStream, int numRemainingBytes) {
S3Object obj;
synchronized (s3Objects) {
obj = s3Objects.get(inputStream);
obj = s3Objects.remove(inputStream);
if (obj == null) {
throw new IllegalArgumentException("inputStream not found");
}
s3Objects.remove(inputStream);
}
SDKStreamDrainer drainer = new SDKStreamDrainer(
uri,
obj,
(S3ObjectInputStream)inputStream,
false,
numRemainingBytes,
streamStatistics,
"close() operation");
if (numRemainingBytes <= context.getAsyncDrainThreshold()) {
// don't bother with async io.
drain(false, "close() operation", numRemainingBytes, obj, inputStream);
drainer.apply();
} else {
LOG.debug("initiating asynchronous drain of {} bytes", numRemainingBytes);
// schedule an async drain/abort with references to the fields so they
// can be reused
client.submit(
() -> drain(false, "close() operation", numRemainingBytes, obj,
inputStream));
// schedule an async drain/abort
client.submit(drainer);
}
}
/**
* drain the stream. This method is intended to be
* used directly or asynchronously, and measures the
* duration of the operation in the stream statistics.
*
* @param shouldAbort force an abort; used if explicitly requested.
* @param reason reason for stream being closed; used in messages
* @param remaining remaining bytes
* @param requestObject http request object;
* @param inputStream stream to close.
* @return was the stream aborted?
*/
private boolean drain(
final boolean shouldAbort,
final String reason,
final long remaining,
final S3Object requestObject,
final InputStream inputStream) {
try {
return invokeTrackingDuration(
streamStatistics.initiateInnerStreamClose(shouldAbort),
() -> drainOrAbortHttpStream(shouldAbort, reason, remaining,
requestObject, inputStream));
} catch (IOException e) {
// this is only here because invokeTrackingDuration() has it in its
// signature
return shouldAbort;
}
}
/**
* Drain or abort the inner stream.
* Exceptions are swallowed.
* If a close() is attempted and fails, the operation escalates to
* an abort.
*
* @param shouldAbort force an abort; used if explicitly requested.
* @param reason reason for stream being closed; used in messages
* @param remaining remaining bytes
* @param requestObject http request object
* @param inputStream stream to close.
* @return was the stream aborted?
*/
private boolean drainOrAbortHttpStream(
boolean shouldAbort,
final String reason,
final long remaining,
final S3Object requestObject,
final InputStream inputStream) {
if (!shouldAbort && remaining > 0) {
try {
long drained = 0;
byte[] buffer = new byte[DRAIN_BUFFER_SIZE];
while (true) {
final int count = inputStream.read(buffer);
if (count < 0) {
// no more data is left
break;
}
drained += count;
}
LOG.debug("Drained stream of {} bytes", drained);
} catch (Exception e) {
// exception escalates to an abort
LOG.debug("When closing {} stream for {}, will abort the stream", uri,
reason, e);
shouldAbort = true;
}
}
cleanupWithLogger(LOG, inputStream);
cleanupWithLogger(LOG, requestObject);
streamStatistics.streamClose(shouldAbort, remaining);
LOG.debug("Stream {} {}: {}; remaining={}", uri,
(shouldAbort ? "aborted" : "closed"), reason,
remaining);
return shouldAbort;
}
}

View File

@ -45,6 +45,7 @@
import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE;
import static org.apache.hadoop.fs.s3a.Constants.MAXIMUM_CONNECTIONS;
import static org.apache.hadoop.fs.s3a.Constants.MAX_ERROR_RETRIES;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
import static org.apache.hadoop.fs.s3a.Constants.READAHEAD_RANGE;
import static org.apache.hadoop.fs.s3a.Constants.REQUEST_TIMEOUT;
import static org.apache.hadoop.fs.s3a.Constants.RETRY_LIMIT;
@ -102,6 +103,7 @@ public Configuration createConfiguration() {
INPUT_FADVISE,
MAX_ERROR_RETRIES,
MAXIMUM_CONNECTIONS,
PREFETCH_ENABLED_KEY,
READAHEAD_RANGE,
REQUEST_TIMEOUT,
RETRY_LIMIT,