diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/BackReference.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/BackReference.java new file mode 100644 index 0000000000..04e39875b6 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/BackReference.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.impl; + +import javax.annotation.Nullable; + +/** + * Holds reference to an object to be attached to a stream or store to avoid + * the reference being lost to GC. + */ +public class BackReference { + private final Object reference; + + public BackReference(@Nullable Object reference) { + this.reference = reference; + } + + /** + * is the reference null? + * @return true if the ref. is null, else false. + */ + public boolean isNull() { + return reference == null; + } + + @Override + public String toString() { + return "BackReference{" + + "reference=" + reference + + '}'; + } +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java index 5fb2c6e170..426ad8ca1e 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java @@ -45,6 +45,7 @@ import javax.annotation.Nullable; import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.fs.impl.BackReference; import org.apache.hadoop.security.ProviderUtils; import org.apache.hadoop.util.Preconditions; import org.slf4j.Logger; @@ -155,6 +156,9 @@ public class AzureBlobFileSystem extends FileSystem /** Rate limiting for operations which use it to throttle their IO. */ private RateLimiting rateLimiting; + /** Storing full path uri for better logging. */ + private URI fullPathUri; + @Override public void initialize(URI uri, Configuration configuration) throws IOException { @@ -165,7 +169,7 @@ public void initialize(URI uri, Configuration configuration) setConf(configuration); LOG.debug("Initializing AzureBlobFileSystem for {}", uri); - + this.fullPathUri = uri; this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority()); abfsCounters = new AbfsCountersImpl(uri); // name of the blockFactory to be used. @@ -192,6 +196,7 @@ public void initialize(URI uri, Configuration configuration) .withAbfsCounters(abfsCounters) .withBlockFactory(blockFactory) .withBlockOutputActiveBlocks(blockOutputActiveBlocks) + .withBackReference(new BackReference(this)) .build(); this.abfsStore = new AzureBlobFileSystemStore(systemStoreBuilder); @@ -236,7 +241,7 @@ public void initialize(URI uri, Configuration configuration) public String toString() { final StringBuilder sb = new StringBuilder( "AzureBlobFileSystem{"); - sb.append("uri=").append(uri); + sb.append("uri=").append(fullPathUri); sb.append(", user='").append(abfsStore.getUser()).append('\''); sb.append(", primaryUserGroup='").append(abfsStore.getPrimaryGroup()).append('\''); sb.append("[" + CAPABILITY_SAFE_READAHEAD + "]"); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index 79ffc796c3..5c06270fa5 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -55,6 +55,7 @@ import java.util.concurrent.TimeUnit; import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.fs.impl.BackReference; import org.apache.hadoop.util.Preconditions; import org.apache.hadoop.thirdparty.com.google.common.base.Strings; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Futures; @@ -189,6 +190,9 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport { /** Bounded ThreadPool for this instance. */ private ExecutorService boundedThreadPool; + /** ABFS instance reference to be held by the store to avoid GC close. */ + private BackReference fsBackRef; + /** * FileSystem Store for {@link AzureBlobFileSystem} for Abfs operations. * Built using the {@link AzureBlobFileSystemStoreBuilder} with parameters @@ -202,6 +206,7 @@ public AzureBlobFileSystemStore( String[] authorityParts = authorityParts(uri); final String fileSystemName = authorityParts[0]; final String accountName = authorityParts[1]; + this.fsBackRef = abfsStoreBuilder.fsBackRef; leaseRefs = Collections.synchronizedMap(new WeakHashMap<>()); @@ -711,6 +716,7 @@ private AbfsOutputStreamContext populateAbfsOutputStreamContext( .withExecutorService(new SemaphoredDelegatingExecutor(boundedThreadPool, blockOutputActiveBlocks, true)) .withTracingContext(tracingContext) + .withAbfsBackRef(fsBackRef) .build(); } @@ -818,6 +824,7 @@ private AbfsInputStreamContext populateAbfsInputStreamContext( abfsConfiguration.shouldReadBufferSizeAlways()) .withReadAheadBlockSize(abfsConfiguration.getReadAheadBlockSize()) .withBufferedPreadDisabled(bufferedPreadDisabled) + .withAbfsBackRef(fsBackRef) .build(); } @@ -1871,6 +1878,7 @@ public static final class AzureBlobFileSystemStoreBuilder { private AbfsCounters abfsCounters; private DataBlocks.BlockFactory blockFactory; private int blockOutputActiveBlocks; + private BackReference fsBackRef; public AzureBlobFileSystemStoreBuilder withUri(URI value) { this.uri = value; @@ -1906,6 +1914,12 @@ public AzureBlobFileSystemStoreBuilder withBlockOutputActiveBlocks( return this; } + public AzureBlobFileSystemStoreBuilder withBackReference( + BackReference fsBackRef) { + this.fsBackRef = fsBackRef; + return this; + } + public AzureBlobFileSystemStoreBuilder build() { return this; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java index fdeaf70177..86442dac9a 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java @@ -26,6 +26,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.fs.impl.BackReference; import org.apache.hadoop.util.Preconditions; import org.slf4j.Logger; @@ -121,6 +122,9 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, */ private long nextReadPos; + /** ABFS instance to be held by the input stream to avoid GC close. */ + private final BackReference fsBackRef; + public AbfsInputStream( final AbfsClient client, final Statistics statistics, @@ -152,6 +156,7 @@ public AbfsInputStream( this.tracingContext.setStreamID(inputStreamId); this.context = abfsInputStreamContext; readAheadBlockSize = abfsInputStreamContext.getReadAheadBlockSize(); + this.fsBackRef = abfsInputStreamContext.getFsBackRef(); // Propagate the config values to ReadBufferManager so that the first instance // to initialize can set the readAheadBlockSize @@ -857,4 +862,9 @@ long getFCursorAfterLastRead() { long getLimit() { return this.limit; } + + @VisibleForTesting + BackReference getFsBackRef() { + return fsBackRef; + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java index e258958b1a..b78a899340 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java @@ -20,6 +20,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import org.apache.hadoop.fs.impl.BackReference; import org.apache.hadoop.util.Preconditions; /** @@ -51,6 +53,9 @@ public class AbfsInputStreamContext extends AbfsStreamContext { private boolean bufferedPreadDisabled; + /** A BackReference to the FS instance that created this OutputStream. */ + private BackReference fsBackRef; + public AbfsInputStreamContext(final long sasTokenRenewPeriodForStreamsInSeconds) { super(sasTokenRenewPeriodForStreamsInSeconds); } @@ -122,6 +127,12 @@ public AbfsInputStreamContext withBufferedPreadDisabled( return this; } + public AbfsInputStreamContext withAbfsBackRef( + final BackReference fsBackRef) { + this.fsBackRef = fsBackRef; + return this; + } + public AbfsInputStreamContext build() { if (readBufferSize > readAheadBlockSize) { LOG.debug( @@ -180,4 +191,8 @@ public int getReadAheadBlockSize() { public boolean isBufferedPreadDisabled() { return bufferedPreadDisabled; } + + public BackReference getFsBackRef() { + return fsBackRef; + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java index 82e20ce5b7..4268dc3f91 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java @@ -27,6 +27,7 @@ import java.util.UUID; import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.fs.impl.BackReference; import org.apache.hadoop.util.Preconditions; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors; @@ -126,6 +127,9 @@ public class AbfsOutputStream extends OutputStream implements Syncable, /** Executor service to carry out the parallel upload requests. */ private final ListeningExecutorService executorService; + /** ABFS instance to be held by the output stream to avoid GC close. */ + private final BackReference fsBackRef; + public AbfsOutputStream(AbfsOutputStreamContext abfsOutputStreamContext) throws IOException { this.client = abfsOutputStreamContext.getClient(); @@ -147,6 +151,7 @@ public AbfsOutputStream(AbfsOutputStreamContext abfsOutputStreamContext) this.numOfAppendsToServerSinceLastFlush = 0; this.writeOperations = new ConcurrentLinkedDeque<>(); this.outputStreamStatistics = abfsOutputStreamContext.getStreamStatistics(); + this.fsBackRef = abfsOutputStreamContext.getFsBackRef(); if (this.isAppendBlob) { this.maxConcurrentRequestCount = 1; @@ -488,6 +493,12 @@ public synchronized void close() throws IOException { } try { + // Check if Executor Service got shutdown before the writes could be + // completed. + if (hasActiveBlockDataToUpload() && executorService.isShutdown()) { + throw new PathIOException(path, "Executor Service closed before " + + "writes could be completed."); + } flushInternal(true); } catch (IOException e) { // Problems surface in try-with-resources clauses if @@ -766,4 +777,14 @@ public String toString() { sb.append("}"); return sb.toString(); } + + @VisibleForTesting + BackReference getFsBackRef() { + return fsBackRef; + } + + @VisibleForTesting + ListeningExecutorService getExecutorService() { + return executorService; + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java index ed89733036..1d1a99c7d9 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java @@ -22,6 +22,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; +import org.apache.hadoop.fs.impl.BackReference; import org.apache.hadoop.fs.store.DataBlocks; /** @@ -65,6 +66,9 @@ public class AbfsOutputStreamContext extends AbfsStreamContext { private TracingContext tracingContext; + /** A BackReference to the FS instance that created this OutputStream. */ + private BackReference fsBackRef; + public AbfsOutputStreamContext(final long sasTokenRenewPeriodForStreamsInSeconds) { super(sasTokenRenewPeriodForStreamsInSeconds); } @@ -157,6 +161,12 @@ public AbfsOutputStreamContext withTracingContext( return this; } + public AbfsOutputStreamContext withAbfsBackRef( + final BackReference fsBackRef) { + this.fsBackRef = fsBackRef; + return this; + } + public AbfsOutputStreamContext build() { // Validation of parameters to be done here. if (streamStatistics == null) { @@ -261,4 +271,8 @@ public ExecutorService getExecutorService() { public TracingContext getTracingContext() { return tracingContext; } + + public BackReference getFsBackRef() { + return fsBackRef; + } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java index 66f072501d..2ac58fbcb1 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java @@ -32,6 +32,8 @@ import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; + +import org.assertj.core.api.Assertions; import org.junit.Test; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB; @@ -106,6 +108,25 @@ public void testExceptionInOptimization() throws Exception { } } + /** + * Testing the back reference being passed down to AbfsInputStream. + */ + @Test + public void testAzureBlobFileSystemBackReferenceInInputStream() + throws IOException { + Path path = path(getMethodName()); + // Create a file then open it to verify if this input stream contains any + // back reference. + try (FSDataOutputStream out = getFileSystem().create(path); + FSDataInputStream in = getFileSystem().open(path)) { + AbfsInputStream abfsInputStream = (AbfsInputStream) in.getWrappedStream(); + + Assertions.assertThat(abfsInputStream.getFsBackRef().isNull()) + .describedAs("BackReference in input stream should not be null") + .isFalse(); + } + } + private void testExceptionInOptimization(final FileSystem fs, final Path testFilePath, final int seekPos, final int length, final byte[] fileContent) diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java index 431c456ae3..eee0c177c3 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java @@ -18,19 +18,28 @@ package org.apache.hadoop.fs.azurebfs.services; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; + import org.assertj.core.api.Assertions; import org.junit.Test; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathIOException; import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest; import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys; +import org.apache.hadoop.test.LambdaTestUtils; /** * Test create operation. */ public class ITestAbfsOutputStream extends AbstractAbfsIntegrationTest { + + private static final int TEST_EXECUTION_TIMEOUT = 2 * 60 * 1000; private static final String TEST_FILE_PATH = "testfile"; public ITestAbfsOutputStream() throws Exception { @@ -84,4 +93,73 @@ public void testMaxRequestsAndQueueCapacity() throws Exception { } } + /** + * Verify the passing of AzureBlobFileSystem reference to AbfsOutputStream + * to make sure that the FS instance is not eligible for GC while writing. + */ + @Test(timeout = TEST_EXECUTION_TIMEOUT) + public void testAzureBlobFileSystemBackReferenceInOutputStream() + throws Exception { + byte[] testBytes = new byte[5 * 1024]; + // Creating an output stream using a FS in a separate method to make the + // FS instance used eligible for GC. Since when a method is popped from + // the stack frame, it's variables become anonymous, this creates higher + // chance of getting Garbage collected. + try (AbfsOutputStream out = getStream()) { + // Every 5KB block written is flushed and a GC is hinted, if the + // executor service is shut down in between, the test should fail + // indicating premature shutdown while writing. + for (int i = 0; i < 5; i++) { + out.write(testBytes); + out.flush(); + System.gc(); + Assertions.assertThat( + out.getExecutorService().isShutdown() || out.getExecutorService() + .isTerminated()) + .describedAs("Executor Service should not be closed before " + + "OutputStream while writing") + .isFalse(); + Assertions.assertThat(out.getFsBackRef().isNull()) + .describedAs("BackReference in output stream should not be null") + .isFalse(); + } + } + } + + /** + * Verify AbfsOutputStream close() behaviour of throwing a PathIOE when the + * FS instance is closed before the stream. + */ + @Test + public void testAbfsOutputStreamClosingFsBeforeStream() + throws Exception { + AzureBlobFileSystem fs = new AzureBlobFileSystem(); + fs.initialize(new URI(getTestUrl()), new Configuration()); + Path pathFs = path(getMethodName()); + byte[] inputBytes = new byte[5 * 1024]; + try (AbfsOutputStream out = createAbfsOutputStreamWithFlushEnabled(fs, + pathFs)) { + out.write(inputBytes); + fs.close(); + // verify that output stream close after fs.close() would raise a + // pathIOE containing the path being written to. + LambdaTestUtils + .intercept(PathIOException.class, getMethodName(), out::close); + } + } + + /** + * Separate method to create an outputStream using a local FS instance so + * that once this method has returned, the FS instance can be eligible for GC. + * + * @return AbfsOutputStream used for writing. + */ + private AbfsOutputStream getStream() throws URISyntaxException, IOException { + AzureBlobFileSystem fs1 = new AzureBlobFileSystem(); + fs1.initialize(new URI(getTestUrl()), new Configuration()); + Path pathFs1 = path(getMethodName() + "1"); + + return createAbfsOutputStreamWithFlushEnabled(fs1, pathFs1); + } + }