HADOOP-16965. Refactor abfs stream configuration. (#1956)

Contributed by Mukund Thakur.
This commit is contained in:
Mukund Thakur 2020-04-21 21:57:29 +05:30 committed by GitHub
parent 60fa15366e
commit 8031c66295
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 204 additions and 29 deletions

View File

@ -81,7 +81,9 @@
import org.apache.hadoop.fs.azurebfs.services.AbfsClient; import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation; import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream; import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream;
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamContext;
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream; import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStreamContext;
import org.apache.hadoop.fs.azurebfs.services.AbfsPermission; import org.apache.hadoop.fs.azurebfs.services.AbfsPermission;
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation; import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
import org.apache.hadoop.fs.azurebfs.services.AuthType; import org.apache.hadoop.fs.azurebfs.services.AuthType;
@ -415,12 +417,18 @@ public OutputStream createFile(final Path path,
statistics, statistics,
AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path),
0, 0,
abfsConfiguration.getWriteBufferSize(), populateAbfsOutputStreamContext());
abfsConfiguration.isFlushEnabled(),
abfsConfiguration.isOutputStreamFlushDisabled());
} }
} }
private AbfsOutputStreamContext populateAbfsOutputStreamContext() {
return new AbfsOutputStreamContext()
.withWriteBufferSize(abfsConfiguration.getWriteBufferSize())
.enableFlush(abfsConfiguration.isFlushEnabled())
.disableOutputStreamFlush(abfsConfiguration.isOutputStreamFlushDisabled())
.build();
}
public void createDirectory(final Path path, final FsPermission permission, final FsPermission umask) public void createDirectory(final Path path, final FsPermission permission, final FsPermission umask)
throws AzureBlobFileSystemException { throws AzureBlobFileSystemException {
try (AbfsPerfInfo perfInfo = startTracking("createDirectory", "createPath")) { try (AbfsPerfInfo perfInfo = startTracking("createDirectory", "createPath")) {
@ -466,11 +474,19 @@ public AbfsInputStream openFileForRead(final Path path, final FileSystem.Statist
// Add statistics for InputStream // Add statistics for InputStream
return new AbfsInputStream(client, statistics, return new AbfsInputStream(client, statistics,
AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), contentLength, AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), contentLength,
abfsConfiguration.getReadBufferSize(), abfsConfiguration.getReadAheadQueueDepth(), populateAbfsInputStreamContext(),
abfsConfiguration.getTolerateOobAppends(), eTag); eTag);
} }
} }
private AbfsInputStreamContext populateAbfsInputStreamContext() {
return new AbfsInputStreamContext()
.withReadBufferSize(abfsConfiguration.getReadBufferSize())
.withReadAheadQueueDepth(abfsConfiguration.getReadAheadQueueDepth())
.withTolerateOobAppends(abfsConfiguration.getTolerateOobAppends())
.build();
}
public OutputStream openFileForWrite(final Path path, final FileSystem.Statistics statistics, final boolean overwrite) throws public OutputStream openFileForWrite(final Path path, final FileSystem.Statistics statistics, final boolean overwrite) throws
AzureBlobFileSystemException { AzureBlobFileSystemException {
try (AbfsPerfInfo perfInfo = startTracking("openFileForWrite", "getPathStatus")) { try (AbfsPerfInfo perfInfo = startTracking("openFileForWrite", "getPathStatus")) {
@ -502,9 +518,7 @@ public OutputStream openFileForWrite(final Path path, final FileSystem.Statistic
statistics, statistics,
AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path),
offset, offset,
abfsConfiguration.getWriteBufferSize(), populateAbfsOutputStreamContext());
abfsConfiguration.isFlushEnabled(),
abfsConfiguration.isOutputStreamFlushDisabled());
} }
} }

View File

@ -61,21 +61,19 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
private boolean closed = false; private boolean closed = false;
public AbfsInputStream( public AbfsInputStream(
final AbfsClient client, final AbfsClient client,
final Statistics statistics, final Statistics statistics,
final String path, final String path,
final long contentLength, final long contentLength,
final int bufferSize, final AbfsInputStreamContext abfsInputStreamContext,
final int readAheadQueueDepth, final String eTag) {
final boolean tolerateOobAppends,
final String eTag) {
this.client = client; this.client = client;
this.statistics = statistics; this.statistics = statistics;
this.path = path; this.path = path;
this.contentLength = contentLength; this.contentLength = contentLength;
this.bufferSize = bufferSize; this.bufferSize = abfsInputStreamContext.getReadBufferSize();
this.readAheadQueueDepth = (readAheadQueueDepth >= 0) ? readAheadQueueDepth : Runtime.getRuntime().availableProcessors(); this.readAheadQueueDepth = abfsInputStreamContext.getReadAheadQueueDepth();
this.tolerateOobAppends = tolerateOobAppends; this.tolerateOobAppends = abfsInputStreamContext.isTolerateOobAppends();
this.eTag = eTag; this.eTag = eTag;
this.readAheadEnabled = true; this.readAheadEnabled = true;
} }

View File

@ -0,0 +1,70 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.services;
/**
* Class to hold extra input stream configs.
*/
public class AbfsInputStreamContext extends AbfsStreamContext {
private int readBufferSize;
private int readAheadQueueDepth;
private boolean tolerateOobAppends;
public AbfsInputStreamContext() {
}
public AbfsInputStreamContext withReadBufferSize(final int readBufferSize) {
this.readBufferSize = readBufferSize;
return this;
}
public AbfsInputStreamContext withReadAheadQueueDepth(
final int readAheadQueueDepth) {
this.readAheadQueueDepth = (readAheadQueueDepth >= 0)
? readAheadQueueDepth
: Runtime.getRuntime().availableProcessors();
return this;
}
public AbfsInputStreamContext withTolerateOobAppends(
final boolean tolerateOobAppends) {
this.tolerateOobAppends = tolerateOobAppends;
return this;
}
public AbfsInputStreamContext build() {
// Validation of parameters to be done here.
return this;
}
public int getReadBufferSize() {
return readBufferSize;
}
public int getReadAheadQueueDepth() {
return readAheadQueueDepth;
}
public boolean isTolerateOobAppends() {
return tolerateOobAppends;
}
}

View File

@ -82,23 +82,22 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
private final Statistics statistics; private final Statistics statistics;
public AbfsOutputStream( public AbfsOutputStream(
final AbfsClient client, final AbfsClient client,
final Statistics statistics, final Statistics statistics,
final String path, final String path,
final long position, final long position,
final int bufferSize, AbfsOutputStreamContext abfsOutputStreamContext) {
final boolean supportFlush,
final boolean disableOutputStreamFlush) {
this.client = client; this.client = client;
this.statistics = statistics; this.statistics = statistics;
this.path = path; this.path = path;
this.position = position; this.position = position;
this.closed = false; this.closed = false;
this.supportFlush = supportFlush; this.supportFlush = abfsOutputStreamContext.isEnableFlush();
this.disableOutputStreamFlush = disableOutputStreamFlush; this.disableOutputStreamFlush = abfsOutputStreamContext
.isDisableOutputStreamFlush();
this.lastError = null; this.lastError = null;
this.lastFlushOffset = 0; this.lastFlushOffset = 0;
this.bufferSize = bufferSize; this.bufferSize = abfsOutputStreamContext.getWriteBufferSize();
this.buffer = byteBufferPool.getBuffer(false, bufferSize).array(); this.buffer = byteBufferPool.getBuffer(false, bufferSize).array();
this.bufferIndex = 0; this.bufferIndex = 0;
this.writeOperations = new ConcurrentLinkedDeque<>(); this.writeOperations = new ConcurrentLinkedDeque<>();

View File

@ -0,0 +1,68 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.services;
/**
* Class to hold extra output stream configs.
*/
public class AbfsOutputStreamContext extends AbfsStreamContext {
private int writeBufferSize;
private boolean enableFlush;
private boolean disableOutputStreamFlush;
public AbfsOutputStreamContext() {
}
public AbfsOutputStreamContext withWriteBufferSize(
final int writeBufferSize) {
this.writeBufferSize = writeBufferSize;
return this;
}
public AbfsOutputStreamContext enableFlush(final boolean enableFlush) {
this.enableFlush = enableFlush;
return this;
}
public AbfsOutputStreamContext disableOutputStreamFlush(
final boolean disableOutputStreamFlush) {
this.disableOutputStreamFlush = disableOutputStreamFlush;
return this;
}
public AbfsOutputStreamContext build() {
// Validation of parameters to be done here.
return this;
}
public int getWriteBufferSize() {
return writeBufferSize;
}
public boolean isEnableFlush() {
return enableFlush;
}
public boolean isDisableOutputStreamFlush() {
return disableOutputStreamFlush;
}
}

View File

@ -0,0 +1,26 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.services;
/**
* Base stream configuration class which is going
* to store common configs among input and output streams.
*/
public abstract class AbfsStreamContext {
}