HADOOP-18781. ABFS backReference passed down to streams to avoid GC closing the FS. (#5780)

To avoid the ABFS instance getting closed due to GC while the streams are working, attach the ABFS instance to a backReference opaque object and passing down to the streams so that we have a hard reference while the streams are working. 

Contributed by: Mehakmeet Singh
This commit is contained in:
Mehakmeet Singh 2023-07-11 17:57:05 +05:30 committed by GitHub
parent 6843f8e4e0
commit fac7d26c5d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 228 additions and 2 deletions

View File

@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.impl;
import javax.annotation.Nullable;
/**
* Holds reference to an object to be attached to a stream or store to avoid
* the reference being lost to GC.
*/
public class BackReference {
private final Object reference;
public BackReference(@Nullable Object reference) {
this.reference = reference;
}
/**
* is the reference null?
* @return true if the ref. is null, else false.
*/
public boolean isNull() {
return reference == null;
}
@Override
public String toString() {
return "BackReference{" +
"reference=" + reference +
'}';
}
}

View File

@ -45,6 +45,7 @@
import javax.annotation.Nullable; import javax.annotation.Nullable;
import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.impl.BackReference;
import org.apache.hadoop.security.ProviderUtils; import org.apache.hadoop.security.ProviderUtils;
import org.apache.hadoop.util.Preconditions; import org.apache.hadoop.util.Preconditions;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -155,6 +156,9 @@ public class AzureBlobFileSystem extends FileSystem
/** Rate limiting for operations which use it to throttle their IO. */ /** Rate limiting for operations which use it to throttle their IO. */
private RateLimiting rateLimiting; private RateLimiting rateLimiting;
/** Storing full path uri for better logging. */
private URI fullPathUri;
@Override @Override
public void initialize(URI uri, Configuration configuration) public void initialize(URI uri, Configuration configuration)
throws IOException { throws IOException {
@ -165,7 +169,7 @@ public void initialize(URI uri, Configuration configuration)
setConf(configuration); setConf(configuration);
LOG.debug("Initializing AzureBlobFileSystem for {}", uri); LOG.debug("Initializing AzureBlobFileSystem for {}", uri);
this.fullPathUri = uri;
this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority()); this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
abfsCounters = new AbfsCountersImpl(uri); abfsCounters = new AbfsCountersImpl(uri);
// name of the blockFactory to be used. // name of the blockFactory to be used.
@ -192,6 +196,7 @@ public void initialize(URI uri, Configuration configuration)
.withAbfsCounters(abfsCounters) .withAbfsCounters(abfsCounters)
.withBlockFactory(blockFactory) .withBlockFactory(blockFactory)
.withBlockOutputActiveBlocks(blockOutputActiveBlocks) .withBlockOutputActiveBlocks(blockOutputActiveBlocks)
.withBackReference(new BackReference(this))
.build(); .build();
this.abfsStore = new AzureBlobFileSystemStore(systemStoreBuilder); this.abfsStore = new AzureBlobFileSystemStore(systemStoreBuilder);
@ -236,7 +241,7 @@ public void initialize(URI uri, Configuration configuration)
public String toString() { public String toString() {
final StringBuilder sb = new StringBuilder( final StringBuilder sb = new StringBuilder(
"AzureBlobFileSystem{"); "AzureBlobFileSystem{");
sb.append("uri=").append(uri); sb.append("uri=").append(fullPathUri);
sb.append(", user='").append(abfsStore.getUser()).append('\''); sb.append(", user='").append(abfsStore.getUser()).append('\'');
sb.append(", primaryUserGroup='").append(abfsStore.getPrimaryGroup()).append('\''); sb.append(", primaryUserGroup='").append(abfsStore.getPrimaryGroup()).append('\'');
sb.append("[" + CAPABILITY_SAFE_READAHEAD + "]"); sb.append("[" + CAPABILITY_SAFE_READAHEAD + "]");

View File

@ -55,6 +55,7 @@
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.impl.BackReference;
import org.apache.hadoop.util.Preconditions; import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.thirdparty.com.google.common.base.Strings; import org.apache.hadoop.thirdparty.com.google.common.base.Strings;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Futures; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Futures;
@ -189,6 +190,9 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
/** Bounded ThreadPool for this instance. */ /** Bounded ThreadPool for this instance. */
private ExecutorService boundedThreadPool; private ExecutorService boundedThreadPool;
/** ABFS instance reference to be held by the store to avoid GC close. */
private BackReference fsBackRef;
/** /**
* FileSystem Store for {@link AzureBlobFileSystem} for Abfs operations. * FileSystem Store for {@link AzureBlobFileSystem} for Abfs operations.
* Built using the {@link AzureBlobFileSystemStoreBuilder} with parameters * Built using the {@link AzureBlobFileSystemStoreBuilder} with parameters
@ -202,6 +206,7 @@ public AzureBlobFileSystemStore(
String[] authorityParts = authorityParts(uri); String[] authorityParts = authorityParts(uri);
final String fileSystemName = authorityParts[0]; final String fileSystemName = authorityParts[0];
final String accountName = authorityParts[1]; final String accountName = authorityParts[1];
this.fsBackRef = abfsStoreBuilder.fsBackRef;
leaseRefs = Collections.synchronizedMap(new WeakHashMap<>()); leaseRefs = Collections.synchronizedMap(new WeakHashMap<>());
@ -711,6 +716,7 @@ private AbfsOutputStreamContext populateAbfsOutputStreamContext(
.withExecutorService(new SemaphoredDelegatingExecutor(boundedThreadPool, .withExecutorService(new SemaphoredDelegatingExecutor(boundedThreadPool,
blockOutputActiveBlocks, true)) blockOutputActiveBlocks, true))
.withTracingContext(tracingContext) .withTracingContext(tracingContext)
.withAbfsBackRef(fsBackRef)
.build(); .build();
} }
@ -818,6 +824,7 @@ private AbfsInputStreamContext populateAbfsInputStreamContext(
abfsConfiguration.shouldReadBufferSizeAlways()) abfsConfiguration.shouldReadBufferSizeAlways())
.withReadAheadBlockSize(abfsConfiguration.getReadAheadBlockSize()) .withReadAheadBlockSize(abfsConfiguration.getReadAheadBlockSize())
.withBufferedPreadDisabled(bufferedPreadDisabled) .withBufferedPreadDisabled(bufferedPreadDisabled)
.withAbfsBackRef(fsBackRef)
.build(); .build();
} }
@ -1871,6 +1878,7 @@ public static final class AzureBlobFileSystemStoreBuilder {
private AbfsCounters abfsCounters; private AbfsCounters abfsCounters;
private DataBlocks.BlockFactory blockFactory; private DataBlocks.BlockFactory blockFactory;
private int blockOutputActiveBlocks; private int blockOutputActiveBlocks;
private BackReference fsBackRef;
public AzureBlobFileSystemStoreBuilder withUri(URI value) { public AzureBlobFileSystemStoreBuilder withUri(URI value) {
this.uri = value; this.uri = value;
@ -1906,6 +1914,12 @@ public AzureBlobFileSystemStoreBuilder withBlockOutputActiveBlocks(
return this; return this;
} }
public AzureBlobFileSystemStoreBuilder withBackReference(
BackReference fsBackRef) {
this.fsBackRef = fsBackRef;
return this;
}
public AzureBlobFileSystemStoreBuilder build() { public AzureBlobFileSystemStoreBuilder build() {
return this; return this;
} }

View File

@ -26,6 +26,7 @@
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.impl.BackReference;
import org.apache.hadoop.util.Preconditions; import org.apache.hadoop.util.Preconditions;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -121,6 +122,9 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
*/ */
private long nextReadPos; private long nextReadPos;
/** ABFS instance to be held by the input stream to avoid GC close. */
private final BackReference fsBackRef;
public AbfsInputStream( public AbfsInputStream(
final AbfsClient client, final AbfsClient client,
final Statistics statistics, final Statistics statistics,
@ -152,6 +156,7 @@ public AbfsInputStream(
this.tracingContext.setStreamID(inputStreamId); this.tracingContext.setStreamID(inputStreamId);
this.context = abfsInputStreamContext; this.context = abfsInputStreamContext;
readAheadBlockSize = abfsInputStreamContext.getReadAheadBlockSize(); readAheadBlockSize = abfsInputStreamContext.getReadAheadBlockSize();
this.fsBackRef = abfsInputStreamContext.getFsBackRef();
// Propagate the config values to ReadBufferManager so that the first instance // Propagate the config values to ReadBufferManager so that the first instance
// to initialize can set the readAheadBlockSize // to initialize can set the readAheadBlockSize
@ -857,4 +862,9 @@ long getFCursorAfterLastRead() {
long getLimit() { long getLimit() {
return this.limit; return this.limit;
} }
@VisibleForTesting
BackReference getFsBackRef() {
return fsBackRef;
}
} }

View File

@ -20,6 +20,8 @@
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.impl.BackReference;
import org.apache.hadoop.util.Preconditions; import org.apache.hadoop.util.Preconditions;
/** /**
@ -51,6 +53,9 @@ public class AbfsInputStreamContext extends AbfsStreamContext {
private boolean bufferedPreadDisabled; private boolean bufferedPreadDisabled;
/** A BackReference to the FS instance that created this OutputStream. */
private BackReference fsBackRef;
public AbfsInputStreamContext(final long sasTokenRenewPeriodForStreamsInSeconds) { public AbfsInputStreamContext(final long sasTokenRenewPeriodForStreamsInSeconds) {
super(sasTokenRenewPeriodForStreamsInSeconds); super(sasTokenRenewPeriodForStreamsInSeconds);
} }
@ -122,6 +127,12 @@ public AbfsInputStreamContext withBufferedPreadDisabled(
return this; return this;
} }
public AbfsInputStreamContext withAbfsBackRef(
final BackReference fsBackRef) {
this.fsBackRef = fsBackRef;
return this;
}
public AbfsInputStreamContext build() { public AbfsInputStreamContext build() {
if (readBufferSize > readAheadBlockSize) { if (readBufferSize > readAheadBlockSize) {
LOG.debug( LOG.debug(
@ -180,4 +191,8 @@ public int getReadAheadBlockSize() {
public boolean isBufferedPreadDisabled() { public boolean isBufferedPreadDisabled() {
return bufferedPreadDisabled; return bufferedPreadDisabled;
} }
public BackReference getFsBackRef() {
return fsBackRef;
}
} }

View File

@ -27,6 +27,7 @@
import java.util.UUID; import java.util.UUID;
import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.impl.BackReference;
import org.apache.hadoop.util.Preconditions; import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;
@ -126,6 +127,9 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
/** Executor service to carry out the parallel upload requests. */ /** Executor service to carry out the parallel upload requests. */
private final ListeningExecutorService executorService; private final ListeningExecutorService executorService;
/** ABFS instance to be held by the output stream to avoid GC close. */
private final BackReference fsBackRef;
public AbfsOutputStream(AbfsOutputStreamContext abfsOutputStreamContext) public AbfsOutputStream(AbfsOutputStreamContext abfsOutputStreamContext)
throws IOException { throws IOException {
this.client = abfsOutputStreamContext.getClient(); this.client = abfsOutputStreamContext.getClient();
@ -147,6 +151,7 @@ public AbfsOutputStream(AbfsOutputStreamContext abfsOutputStreamContext)
this.numOfAppendsToServerSinceLastFlush = 0; this.numOfAppendsToServerSinceLastFlush = 0;
this.writeOperations = new ConcurrentLinkedDeque<>(); this.writeOperations = new ConcurrentLinkedDeque<>();
this.outputStreamStatistics = abfsOutputStreamContext.getStreamStatistics(); this.outputStreamStatistics = abfsOutputStreamContext.getStreamStatistics();
this.fsBackRef = abfsOutputStreamContext.getFsBackRef();
if (this.isAppendBlob) { if (this.isAppendBlob) {
this.maxConcurrentRequestCount = 1; this.maxConcurrentRequestCount = 1;
@ -488,6 +493,12 @@ public synchronized void close() throws IOException {
} }
try { try {
// Check if Executor Service got shutdown before the writes could be
// completed.
if (hasActiveBlockDataToUpload() && executorService.isShutdown()) {
throw new PathIOException(path, "Executor Service closed before "
+ "writes could be completed.");
}
flushInternal(true); flushInternal(true);
} catch (IOException e) { } catch (IOException e) {
// Problems surface in try-with-resources clauses if // Problems surface in try-with-resources clauses if
@ -766,4 +777,14 @@ public String toString() {
sb.append("}"); sb.append("}");
return sb.toString(); return sb.toString();
} }
@VisibleForTesting
BackReference getFsBackRef() {
return fsBackRef;
}
@VisibleForTesting
ListeningExecutorService getExecutorService() {
return executorService;
}
} }

View File

@ -22,6 +22,7 @@
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext; import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
import org.apache.hadoop.fs.impl.BackReference;
import org.apache.hadoop.fs.store.DataBlocks; import org.apache.hadoop.fs.store.DataBlocks;
/** /**
@ -65,6 +66,9 @@ public class AbfsOutputStreamContext extends AbfsStreamContext {
private TracingContext tracingContext; private TracingContext tracingContext;
/** A BackReference to the FS instance that created this OutputStream. */
private BackReference fsBackRef;
public AbfsOutputStreamContext(final long sasTokenRenewPeriodForStreamsInSeconds) { public AbfsOutputStreamContext(final long sasTokenRenewPeriodForStreamsInSeconds) {
super(sasTokenRenewPeriodForStreamsInSeconds); super(sasTokenRenewPeriodForStreamsInSeconds);
} }
@ -157,6 +161,12 @@ public AbfsOutputStreamContext withTracingContext(
return this; return this;
} }
public AbfsOutputStreamContext withAbfsBackRef(
final BackReference fsBackRef) {
this.fsBackRef = fsBackRef;
return this;
}
public AbfsOutputStreamContext build() { public AbfsOutputStreamContext build() {
// Validation of parameters to be done here. // Validation of parameters to be done here.
if (streamStatistics == null) { if (streamStatistics == null) {
@ -261,4 +271,8 @@ public ExecutorService getExecutorService() {
public TracingContext getTracingContext() { public TracingContext getTracingContext() {
return tracingContext; return tracingContext;
} }
public BackReference getFsBackRef() {
return fsBackRef;
}
} }

View File

@ -32,6 +32,8 @@
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore; import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext; import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
import org.assertj.core.api.Assertions;
import org.junit.Test; import org.junit.Test;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB;
@ -106,6 +108,25 @@ public void testExceptionInOptimization() throws Exception {
} }
} }
/**
* Testing the back reference being passed down to AbfsInputStream.
*/
@Test
public void testAzureBlobFileSystemBackReferenceInInputStream()
throws IOException {
Path path = path(getMethodName());
// Create a file then open it to verify if this input stream contains any
// back reference.
try (FSDataOutputStream out = getFileSystem().create(path);
FSDataInputStream in = getFileSystem().open(path)) {
AbfsInputStream abfsInputStream = (AbfsInputStream) in.getWrappedStream();
Assertions.assertThat(abfsInputStream.getFsBackRef().isNull())
.describedAs("BackReference in input stream should not be null")
.isFalse();
}
}
private void testExceptionInOptimization(final FileSystem fs, private void testExceptionInOptimization(final FileSystem fs,
final Path testFilePath, final Path testFilePath,
final int seekPos, final int length, final byte[] fileContent) final int seekPos, final int length, final byte[] fileContent)

View File

@ -18,19 +18,28 @@
package org.apache.hadoop.fs.azurebfs.services; package org.apache.hadoop.fs.azurebfs.services;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.assertj.core.api.Assertions; import org.assertj.core.api.Assertions;
import org.junit.Test; import org.junit.Test;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest; import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys; import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
import org.apache.hadoop.test.LambdaTestUtils;
/** /**
* Test create operation. * Test create operation.
*/ */
public class ITestAbfsOutputStream extends AbstractAbfsIntegrationTest { public class ITestAbfsOutputStream extends AbstractAbfsIntegrationTest {
private static final int TEST_EXECUTION_TIMEOUT = 2 * 60 * 1000;
private static final String TEST_FILE_PATH = "testfile"; private static final String TEST_FILE_PATH = "testfile";
public ITestAbfsOutputStream() throws Exception { public ITestAbfsOutputStream() throws Exception {
@ -84,4 +93,73 @@ public void testMaxRequestsAndQueueCapacity() throws Exception {
} }
} }
/**
* Verify the passing of AzureBlobFileSystem reference to AbfsOutputStream
* to make sure that the FS instance is not eligible for GC while writing.
*/
@Test(timeout = TEST_EXECUTION_TIMEOUT)
public void testAzureBlobFileSystemBackReferenceInOutputStream()
throws Exception {
byte[] testBytes = new byte[5 * 1024];
// Creating an output stream using a FS in a separate method to make the
// FS instance used eligible for GC. Since when a method is popped from
// the stack frame, it's variables become anonymous, this creates higher
// chance of getting Garbage collected.
try (AbfsOutputStream out = getStream()) {
// Every 5KB block written is flushed and a GC is hinted, if the
// executor service is shut down in between, the test should fail
// indicating premature shutdown while writing.
for (int i = 0; i < 5; i++) {
out.write(testBytes);
out.flush();
System.gc();
Assertions.assertThat(
out.getExecutorService().isShutdown() || out.getExecutorService()
.isTerminated())
.describedAs("Executor Service should not be closed before "
+ "OutputStream while writing")
.isFalse();
Assertions.assertThat(out.getFsBackRef().isNull())
.describedAs("BackReference in output stream should not be null")
.isFalse();
}
}
}
/**
* Verify AbfsOutputStream close() behaviour of throwing a PathIOE when the
* FS instance is closed before the stream.
*/
@Test
public void testAbfsOutputStreamClosingFsBeforeStream()
throws Exception {
AzureBlobFileSystem fs = new AzureBlobFileSystem();
fs.initialize(new URI(getTestUrl()), new Configuration());
Path pathFs = path(getMethodName());
byte[] inputBytes = new byte[5 * 1024];
try (AbfsOutputStream out = createAbfsOutputStreamWithFlushEnabled(fs,
pathFs)) {
out.write(inputBytes);
fs.close();
// verify that output stream close after fs.close() would raise a
// pathIOE containing the path being written to.
LambdaTestUtils
.intercept(PathIOException.class, getMethodName(), out::close);
}
}
/**
* Separate method to create an outputStream using a local FS instance so
* that once this method has returned, the FS instance can be eligible for GC.
*
* @return AbfsOutputStream used for writing.
*/
private AbfsOutputStream getStream() throws URISyntaxException, IOException {
AzureBlobFileSystem fs1 = new AzureBlobFileSystem();
fs1.initialize(new URI(getTestUrl()), new Configuration());
Path pathFs1 = path(getMethodName() + "1");
return createAbfsOutputStreamWithFlushEnabled(fs1, pathFs1);
}
} }