Revert "HADOOP-18706. Improve S3ABlockOutputStream recovery (#5563)"
This reverts commit 372631c5667b02b5c9f280ab4c09a3d71a7ee36d. Reverted due to HADOOP-18744.
This commit is contained in:
parent
e9740cb17a
commit
e6b54f7f68
hadoop-tools/hadoop-aws/src
main/java/org/apache/hadoop/fs/s3a
test/java/org/apache/hadoop/fs/s3a
@ -232,9 +232,7 @@ class S3ABlockOutputStream extends OutputStream implements
|
|||||||
LOG.error("Number of partitions in stream exceeds limit for S3: "
|
LOG.error("Number of partitions in stream exceeds limit for S3: "
|
||||||
+ Constants.MAX_MULTIPART_COUNT + " write may fail.");
|
+ Constants.MAX_MULTIPART_COUNT + " write may fail.");
|
||||||
}
|
}
|
||||||
activeBlock = blockFactory.create(
|
activeBlock = blockFactory.create(blockCount, this.blockSize, statistics);
|
||||||
writeOperationHelper.getAuditSpan().getSpanId(),
|
|
||||||
key, blockCount, this.blockSize, statistics);
|
|
||||||
}
|
}
|
||||||
return activeBlock;
|
return activeBlock;
|
||||||
}
|
}
|
||||||
@ -730,14 +728,8 @@ class S3ABlockOutputStream extends OutputStream implements
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Shared processing of Syncable operation reporting/downgrade.
|
* Shared processing of Syncable operation reporting/downgrade.
|
||||||
*
|
|
||||||
* Syncable API is not supported, so calls to hsync/hflush will throw an
|
|
||||||
* UnsupportedOperationException unless the stream was constructed with
|
|
||||||
* {@link #downgradeSyncableExceptions} set to true, in which case the stream is flushed.
|
|
||||||
* @throws IOException IO Problem
|
|
||||||
* @throws UnsupportedOperationException if downgrade syncable exceptions is set to false
|
|
||||||
*/
|
*/
|
||||||
private void handleSyncableInvocation() throws IOException {
|
private void handleSyncableInvocation() {
|
||||||
final UnsupportedOperationException ex
|
final UnsupportedOperationException ex
|
||||||
= new UnsupportedOperationException(E_NOT_SYNCABLE);
|
= new UnsupportedOperationException(E_NOT_SYNCABLE);
|
||||||
if (!downgradeSyncableExceptions) {
|
if (!downgradeSyncableExceptions) {
|
||||||
@ -749,7 +741,6 @@ class S3ABlockOutputStream extends OutputStream implements
|
|||||||
key);
|
key);
|
||||||
// and log at debug
|
// and log at debug
|
||||||
LOG.debug("Downgrading Syncable call", ex);
|
LOG.debug("Downgrading Syncable call", ex);
|
||||||
flush();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -175,14 +175,12 @@ final class S3ADataBlocks {
|
|||||||
/**
|
/**
|
||||||
* Create a block.
|
* Create a block.
|
||||||
*
|
*
|
||||||
* @param spanId id of the audit span
|
|
||||||
* @param key key of s3 object being written to
|
|
||||||
* @param index index of block
|
* @param index index of block
|
||||||
* @param limit limit of the block.
|
* @param limit limit of the block.
|
||||||
* @param statistics stats to work with
|
* @param statistics stats to work with
|
||||||
* @return a new block.
|
* @return a new block.
|
||||||
*/
|
*/
|
||||||
abstract DataBlock create(String spanId, String key, long index, long limit,
|
abstract DataBlock create(long index, long limit,
|
||||||
BlockOutputStreamStatistics statistics)
|
BlockOutputStreamStatistics statistics)
|
||||||
throws IOException;
|
throws IOException;
|
||||||
|
|
||||||
@ -393,11 +391,11 @@ final class S3ADataBlocks {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
DataBlock create(String spanId, String key, long index, long limit,
|
DataBlock create(long index, long limit,
|
||||||
BlockOutputStreamStatistics statistics)
|
BlockOutputStreamStatistics statistics)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
Preconditions.checkArgument(limit > 0,
|
Preconditions.checkArgument(limit > 0,
|
||||||
"Invalid block size: %d [%s]", limit, key);
|
"Invalid block size: %d", limit);
|
||||||
return new ByteArrayBlock(0, limit, statistics);
|
return new ByteArrayBlock(0, limit, statistics);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -518,11 +516,11 @@ final class S3ADataBlocks {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
ByteBufferBlock create(String spanId, String key, long index, long limit,
|
ByteBufferBlock create(long index, long limit,
|
||||||
BlockOutputStreamStatistics statistics)
|
BlockOutputStreamStatistics statistics)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
Preconditions.checkArgument(limit > 0,
|
Preconditions.checkArgument(limit > 0,
|
||||||
"Invalid block size: %d [%s]", limit, key);
|
"Invalid block size: %d", limit);
|
||||||
return new ByteBufferBlock(index, limit, statistics);
|
return new ByteBufferBlock(index, limit, statistics);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -800,8 +798,6 @@ final class S3ADataBlocks {
|
|||||||
* Buffer blocks to disk.
|
* Buffer blocks to disk.
|
||||||
*/
|
*/
|
||||||
static class DiskBlockFactory extends BlockFactory {
|
static class DiskBlockFactory extends BlockFactory {
|
||||||
private static final String ESCAPED_FORWARD_SLASH = "EFS";
|
|
||||||
private static final String ESCAPED_BACKSLASH = "EBS";
|
|
||||||
|
|
||||||
DiskBlockFactory(S3AFileSystem owner) {
|
DiskBlockFactory(S3AFileSystem owner) {
|
||||||
super(owner);
|
super(owner);
|
||||||
@ -810,8 +806,6 @@ final class S3ADataBlocks {
|
|||||||
/**
|
/**
|
||||||
* Create a temp file and a {@link DiskBlock} instance to manage it.
|
* Create a temp file and a {@link DiskBlock} instance to manage it.
|
||||||
*
|
*
|
||||||
* @param spanId id of the audit span
|
|
||||||
* @param key of the s3 object being written
|
|
||||||
* @param index block index
|
* @param index block index
|
||||||
* @param limit limit of the block. -1 means "no limit"
|
* @param limit limit of the block. -1 means "no limit"
|
||||||
* @param statistics statistics to update
|
* @param statistics statistics to update
|
||||||
@ -819,22 +813,17 @@ final class S3ADataBlocks {
|
|||||||
* @throws IOException IO problems
|
* @throws IOException IO problems
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
DataBlock create(String spanId, String key, long index,
|
DataBlock create(long index,
|
||||||
long limit,
|
long limit,
|
||||||
BlockOutputStreamStatistics statistics)
|
BlockOutputStreamStatistics statistics)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
Preconditions.checkArgument(limit != 0,
|
Preconditions.checkArgument(limit != 0,
|
||||||
"Invalid block size: %d [%s]", limit, key);
|
"Invalid block size: %d", limit);
|
||||||
String prefix = String.format("s3ablock-%04d-%s-%s-", index, spanId, escapeS3Key(key));
|
File destFile = getOwner()
|
||||||
File destFile = getOwner().createTmpFileForWrite(prefix, limit, getOwner().getConf());
|
.createTmpFileForWrite(String.format("s3ablock-%04d-", index),
|
||||||
|
limit, getOwner().getConf());
|
||||||
return new DiskBlock(destFile, limit, index, statistics);
|
return new DiskBlock(destFile, limit, index, statistics);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected static String escapeS3Key(String key) {
|
|
||||||
return key
|
|
||||||
.replace("\\", ESCAPED_BACKSLASH)
|
|
||||||
.replace("/", ESCAPED_FORWARD_SLASH);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -217,7 +217,6 @@ public class WriteOperationHelper implements WriteOperations {
|
|||||||
* Get the audit span this object was created with.
|
* Get the audit span this object was created with.
|
||||||
* @return the audit span
|
* @return the audit span
|
||||||
*/
|
*/
|
||||||
@Override
|
|
||||||
public AuditSpan getAuditSpan() {
|
public AuditSpan getAuditSpan() {
|
||||||
return auditSpan;
|
return auditSpan;
|
||||||
}
|
}
|
||||||
|
@ -43,7 +43,6 @@ import org.apache.hadoop.fs.Path;
|
|||||||
import org.apache.hadoop.fs.PathIOException;
|
import org.apache.hadoop.fs.PathIOException;
|
||||||
import org.apache.hadoop.fs.s3a.impl.PutObjectOptions;
|
import org.apache.hadoop.fs.s3a.impl.PutObjectOptions;
|
||||||
import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
|
import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
|
||||||
import org.apache.hadoop.fs.store.audit.AuditSpan;
|
|
||||||
import org.apache.hadoop.fs.store.audit.AuditSpanSource;
|
import org.apache.hadoop.fs.store.audit.AuditSpanSource;
|
||||||
import org.apache.hadoop.util.functional.CallableRaisingIOE;
|
import org.apache.hadoop.util.functional.CallableRaisingIOE;
|
||||||
|
|
||||||
@ -306,12 +305,6 @@ public interface WriteOperations extends AuditSpanSource, Closeable {
|
|||||||
*/
|
*/
|
||||||
Configuration getConf();
|
Configuration getConf();
|
||||||
|
|
||||||
/**
|
|
||||||
* Get the audit span this object was created with.
|
|
||||||
* @return the audit span
|
|
||||||
*/
|
|
||||||
AuditSpan getAuditSpan();
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a S3 Select request for the destination path.
|
* Create a S3 Select request for the destination path.
|
||||||
* This does not build the query.
|
* This does not build the query.
|
||||||
|
@ -29,12 +29,9 @@ import org.apache.hadoop.io.IOUtils;
|
|||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.io.File;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.Objects;
|
|
||||||
|
|
||||||
import static org.apache.hadoop.fs.StreamCapabilities.ABORTABLE_STREAM;
|
import static org.apache.hadoop.fs.StreamCapabilities.ABORTABLE_STREAM;
|
||||||
import static org.apache.hadoop.fs.s3a.Constants.*;
|
import static org.apache.hadoop.fs.s3a.Constants.*;
|
||||||
@ -82,46 +79,6 @@ public class ITestS3ABlockOutputArray extends AbstractS3ATestBase {
|
|||||||
verifyUpload("regular", 1024);
|
verifyUpload("regular", 1024);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Test that the DiskBlock's local file doesn't result in error when the S3 key exceeds the max
|
|
||||||
* char limit of the local file system. Currently
|
|
||||||
* {@link java.io.File#createTempFile(String, String, File)} is being relied on to handle the
|
|
||||||
* truncation.
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
@Test
|
|
||||||
public void testDiskBlockCreate() throws IOException {
|
|
||||||
String s3Key = // 1024 char
|
|
||||||
"very_long_s3_key__very_long_s3_key__very_long_s3_key__very_long_s3_key__" +
|
|
||||||
"very_long_s3_key__very_long_s3_key__very_long_s3_key__very_long_s3_key__" +
|
|
||||||
"very_long_s3_key__very_long_s3_key__very_long_s3_key__very_long_s3_key__" +
|
|
||||||
"very_long_s3_key__very_long_s3_key__very_long_s3_key__very_long_s3_key__" +
|
|
||||||
"very_long_s3_key__very_long_s3_key__very_long_s3_key__very_long_s3_key__" +
|
|
||||||
"very_long_s3_key__very_long_s3_key__very_long_s3_key__very_long_s3_key__" +
|
|
||||||
"very_long_s3_key__very_long_s3_key__very_long_s3_key__very_long_s3_key__" +
|
|
||||||
"very_long_s3_key__very_long_s3_key__very_long_s3_key__very_long_s3_key__" +
|
|
||||||
"very_long_s3_key__very_long_s3_key__very_long_s3_key__very_long_s3_key__" +
|
|
||||||
"very_long_s3_key__very_long_s3_key__very_long_s3_key__very_long_s3_key__" +
|
|
||||||
"very_long_s3_key__very_long_s3_key__very_long_s3_key__very_long_s3_key__" +
|
|
||||||
"very_long_s3_key__very_long_s3_key__very_long_s3_key__very_long_s3_key__" +
|
|
||||||
"very_long_s3_key__very_long_s3_key__very_long_s3_key__very_long_s3_key__" +
|
|
||||||
"very_long_s3_key__very_long_s3_key__very_long_s3_key__very_long_s3_key__" +
|
|
||||||
"very_long_s3_key";
|
|
||||||
long blockSize = getFileSystem().getDefaultBlockSize();
|
|
||||||
try (S3ADataBlocks.BlockFactory diskBlockFactory =
|
|
||||||
new S3ADataBlocks.DiskBlockFactory(getFileSystem());
|
|
||||||
S3ADataBlocks.DataBlock dataBlock =
|
|
||||||
diskBlockFactory.create("spanId", s3Key, 1, blockSize, null);
|
|
||||||
) {
|
|
||||||
String tmpDir = getConfiguration().get("hadoop.tmp.dir");
|
|
||||||
boolean created = Arrays.stream(
|
|
||||||
Objects.requireNonNull(new File(tmpDir).listFiles()))
|
|
||||||
.anyMatch(f -> f.getName().contains("very_long_s3_key"));
|
|
||||||
assertTrue(String.format("tmp file should have been created locally in %s", tmpDir), created);
|
|
||||||
LOG.info(dataBlock.toString()); // block file name/location can be viewed in failsafe-report
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test(expected = IOException.class)
|
@Test(expected = IOException.class)
|
||||||
public void testWriteAfterStreamClose() throws Throwable {
|
public void testWriteAfterStreamClose() throws Throwable {
|
||||||
Path dest = path("testWriteAfterStreamClose");
|
Path dest = path("testWriteAfterStreamClose");
|
||||||
@ -179,7 +136,7 @@ public class ITestS3ABlockOutputArray extends AbstractS3ATestBase {
|
|||||||
new S3AInstrumentation(new URI("s3a://example"));
|
new S3AInstrumentation(new URI("s3a://example"));
|
||||||
BlockOutputStreamStatistics outstats
|
BlockOutputStreamStatistics outstats
|
||||||
= instrumentation.newOutputStreamStatistics(null);
|
= instrumentation.newOutputStreamStatistics(null);
|
||||||
S3ADataBlocks.DataBlock block = factory.create("spanId", "object/key", 1, BLOCK_SIZE, outstats);
|
S3ADataBlocks.DataBlock block = factory.create(1, BLOCK_SIZE, outstats);
|
||||||
block.write(dataset, 0, dataset.length);
|
block.write(dataset, 0, dataset.length);
|
||||||
S3ADataBlocks.BlockUploadData uploadData = block.startUpload();
|
S3ADataBlocks.BlockUploadData uploadData = block.startUpload();
|
||||||
InputStream stream = uploadData.getUploadStream();
|
InputStream stream = uploadData.getUploadStream();
|
||||||
|
@ -51,7 +51,7 @@ public class TestDataBlocks extends Assert {
|
|||||||
new S3ADataBlocks.ByteBufferBlockFactory(null)) {
|
new S3ADataBlocks.ByteBufferBlockFactory(null)) {
|
||||||
int limit = 128;
|
int limit = 128;
|
||||||
S3ADataBlocks.ByteBufferBlockFactory.ByteBufferBlock block
|
S3ADataBlocks.ByteBufferBlockFactory.ByteBufferBlock block
|
||||||
= factory.create("spanId", "s3\\object/key", 1, limit, null);
|
= factory.create(1, limit, null);
|
||||||
assertOutstandingBuffers(factory, 1);
|
assertOutstandingBuffers(factory, 1);
|
||||||
|
|
||||||
byte[] buffer = ContractTestUtils.toAsciiByteArray("test data");
|
byte[] buffer = ContractTestUtils.toAsciiByteArray("test data");
|
||||||
|
@ -26,7 +26,6 @@ import org.apache.hadoop.fs.s3a.impl.PutObjectOptions;
|
|||||||
import org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext;
|
import org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext;
|
||||||
import org.apache.hadoop.fs.s3a.test.MinimalWriteOperationHelperCallbacks;
|
import org.apache.hadoop.fs.s3a.test.MinimalWriteOperationHelperCallbacks;
|
||||||
import org.apache.hadoop.fs.statistics.IOStatisticsContext;
|
import org.apache.hadoop.fs.statistics.IOStatisticsContext;
|
||||||
import org.apache.hadoop.fs.store.audit.AuditSpan;
|
|
||||||
import org.apache.hadoop.util.Progressable;
|
import org.apache.hadoop.util.Progressable;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
@ -39,10 +38,7 @@ import static org.apache.hadoop.fs.s3a.audit.AuditTestSupport.noopAuditor;
|
|||||||
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
||||||
import static org.mockito.Mockito.doThrow;
|
import static org.mockito.Mockito.doThrow;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.never;
|
|
||||||
import static org.mockito.Mockito.spy;
|
import static org.mockito.Mockito.spy;
|
||||||
import static org.mockito.Mockito.times;
|
|
||||||
import static org.mockito.Mockito.verify;
|
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -63,9 +59,6 @@ public class TestS3ABlockOutputStream extends AbstractS3AMockTest {
|
|||||||
mock(S3ADataBlocks.BlockFactory.class);
|
mock(S3ADataBlocks.BlockFactory.class);
|
||||||
long blockSize = Constants.DEFAULT_MULTIPART_SIZE;
|
long blockSize = Constants.DEFAULT_MULTIPART_SIZE;
|
||||||
WriteOperationHelper oHelper = mock(WriteOperationHelper.class);
|
WriteOperationHelper oHelper = mock(WriteOperationHelper.class);
|
||||||
AuditSpan auditSpan = mock(AuditSpan.class);
|
|
||||||
when(auditSpan.getSpanId()).thenReturn("spanId");
|
|
||||||
when(oHelper.getAuditSpan()).thenReturn(auditSpan);
|
|
||||||
PutTracker putTracker = mock(PutTracker.class);
|
PutTracker putTracker = mock(PutTracker.class);
|
||||||
final S3ABlockOutputStream.BlockOutputStreamBuilder builder =
|
final S3ABlockOutputStream.BlockOutputStreamBuilder builder =
|
||||||
S3ABlockOutputStream.builder()
|
S3ABlockOutputStream.builder()
|
||||||
@ -163,7 +156,6 @@ public class TestS3ABlockOutputStream extends AbstractS3AMockTest {
|
|||||||
stream = spy(new S3ABlockOutputStream(builder));
|
stream = spy(new S3ABlockOutputStream(builder));
|
||||||
intercept(UnsupportedOperationException.class, () -> stream.hflush());
|
intercept(UnsupportedOperationException.class, () -> stream.hflush());
|
||||||
intercept(UnsupportedOperationException.class, () -> stream.hsync());
|
intercept(UnsupportedOperationException.class, () -> stream.hsync());
|
||||||
verify(stream, never()).flush();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -177,11 +169,8 @@ public class TestS3ABlockOutputStream extends AbstractS3AMockTest {
|
|||||||
builder.withDowngradeSyncableExceptions(true);
|
builder.withDowngradeSyncableExceptions(true);
|
||||||
stream = spy(new S3ABlockOutputStream(builder));
|
stream = spy(new S3ABlockOutputStream(builder));
|
||||||
|
|
||||||
verify(stream, never()).flush();
|
|
||||||
stream.hflush();
|
stream.hflush();
|
||||||
verify(stream, times(1)).flush();
|
|
||||||
stream.hsync();
|
stream.hsync();
|
||||||
verify(stream, times(2)).flush();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user