HADOOP-17461. Collect thread-level IOStatistics. (#4352)

This adds a thread-level collector of IOStatistics, IOStatisticsContext,
which can be:
* Retrieved for a thread and cached for access from other
  threads.
* reset() to record new statistics.
* Queried for live statistics through the
  IOStatisticsSource.getIOStatistics() method.
* Queries for a statistics aggregator for use in instrumented
  classes.
* Asked to create a serializable copy in snapshot()

The goal is to make it possible for applications with multiple
threads performing different work items simultaneously
to be able to collect statistics on the individual threads,
and so generate aggregate reports on the total work performed
for a specific job, query or similar unit of work.

Some changes in IOStatistics-gathering classes are needed for 
this feature
* Caching the active context's aggregator in the object's
  constructor
* Updating it in close()

Slightly more work is needed in multithreaded code,
such as the S3A committers, which collect statistics across
all threads used in task and job commit operations.

Currently the IOStatisticsContext-aware classes are:
* The S3A input stream, output stream and list iterators.
* RawLocalFileSystem's input and output streams.
* The S3A committers.
* The TaskPool class in hadoop-common, which propagates
  the active context into scheduled worker threads.

Collection of statistics in the IOStatisticsContext
is disabled process-wide by default until the feature 
is considered stable.

To enable the collection, set the option
fs.thread.level.iostatistics.enabled
to "true" in core-site.xml;
	
Contributed by Mehakmeet Singh and Steve Loughran
This commit is contained in:
Mehakmeet Singh 2022-07-27 01:11:22 +05:30 committed by GitHub
parent 213ea03758
commit 4c8cd61961
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 1458 additions and 79 deletions

View File

@ -475,4 +475,18 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
* default hadoop temp dir on local system: {@value}. * default hadoop temp dir on local system: {@value}.
*/ */
public static final String HADOOP_TMP_DIR = "hadoop.tmp.dir"; public static final String HADOOP_TMP_DIR = "hadoop.tmp.dir";
/**
* Thread-level IOStats Support.
* {@value}
*/
public static final String THREAD_LEVEL_IOSTATISTICS_ENABLED =
"fs.thread.level.iostatistics.enabled";
/**
* Default value for Thread-level IOStats Support is true.
*/
public static final boolean THREAD_LEVEL_IOSTATISTICS_ENABLED_DEFAULT =
true;
} }

View File

@ -57,6 +57,8 @@
import org.apache.hadoop.fs.impl.StoreImplementationUtils; import org.apache.hadoop.fs.impl.StoreImplementationUtils;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.statistics.IOStatistics; import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsAggregator;
import org.apache.hadoop.fs.statistics.IOStatisticsContext;
import org.apache.hadoop.fs.statistics.IOStatisticsSource; import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.fs.statistics.BufferedIOStatisticsOutputStream; import org.apache.hadoop.fs.statistics.BufferedIOStatisticsOutputStream;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore; import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
@ -156,11 +158,19 @@ class LocalFSFileInputStream extends FSInputStream implements
/** Reference to the bytes read counter for slightly faster counting. */ /** Reference to the bytes read counter for slightly faster counting. */
private final AtomicLong bytesRead; private final AtomicLong bytesRead;
/**
* Thread level IOStatistics aggregator to update in close().
*/
private final IOStatisticsAggregator
ioStatisticsAggregator;
public LocalFSFileInputStream(Path f) throws IOException { public LocalFSFileInputStream(Path f) throws IOException {
name = pathToFile(f); name = pathToFile(f);
fis = new FileInputStream(name); fis = new FileInputStream(name);
bytesRead = ioStatistics.getCounterReference( bytesRead = ioStatistics.getCounterReference(
STREAM_READ_BYTES); STREAM_READ_BYTES);
ioStatisticsAggregator =
IOStatisticsContext.getCurrentIOStatisticsContext().getAggregator();
} }
@Override @Override
@ -193,9 +203,13 @@ public boolean seekToNewSource(long targetPos) throws IOException {
@Override @Override
public void close() throws IOException { public void close() throws IOException {
fis.close(); try {
if (asyncChannel != null) { fis.close();
asyncChannel.close(); if (asyncChannel != null) {
asyncChannel.close();
}
} finally {
ioStatisticsAggregator.aggregate(ioStatistics);
} }
} }
@ -278,6 +292,7 @@ public boolean hasCapability(String capability) {
// new capabilities. // new capabilities.
switch (capability.toLowerCase(Locale.ENGLISH)) { switch (capability.toLowerCase(Locale.ENGLISH)) {
case StreamCapabilities.IOSTATISTICS: case StreamCapabilities.IOSTATISTICS:
case StreamCapabilities.IOSTATISTICS_CONTEXT:
case StreamCapabilities.VECTOREDIO: case StreamCapabilities.VECTOREDIO:
return true; return true;
default: default:
@ -407,9 +422,19 @@ final class LocalFSFileOutputStream extends OutputStream implements
STREAM_WRITE_EXCEPTIONS) STREAM_WRITE_EXCEPTIONS)
.build(); .build();
/**
* Thread level IOStatistics aggregator to update in close().
*/
private final IOStatisticsAggregator
ioStatisticsAggregator;
private LocalFSFileOutputStream(Path f, boolean append, private LocalFSFileOutputStream(Path f, boolean append,
FsPermission permission) throws IOException { FsPermission permission) throws IOException {
File file = pathToFile(f); File file = pathToFile(f);
// store the aggregator before attempting any IO.
ioStatisticsAggregator =
IOStatisticsContext.getCurrentIOStatisticsContext().getAggregator();
if (!append && permission == null) { if (!append && permission == null) {
permission = FsPermission.getFileDefault(); permission = FsPermission.getFileDefault();
} }
@ -436,10 +461,17 @@ private LocalFSFileOutputStream(Path f, boolean append,
} }
/* /*
* Just forward to the fos * Close the fos; update the IOStatisticsContext.
*/ */
@Override @Override
public void close() throws IOException { fos.close(); } public void close() throws IOException {
try {
fos.close();
} finally {
ioStatisticsAggregator.aggregate(ioStatistics);
}
}
@Override @Override
public void flush() throws IOException { fos.flush(); } public void flush() throws IOException { fos.flush(); }
@Override @Override
@ -485,6 +517,7 @@ public boolean hasCapability(String capability) {
// new capabilities. // new capabilities.
switch (capability.toLowerCase(Locale.ENGLISH)) { switch (capability.toLowerCase(Locale.ENGLISH)) {
case StreamCapabilities.IOSTATISTICS: case StreamCapabilities.IOSTATISTICS:
case StreamCapabilities.IOSTATISTICS_CONTEXT:
return true; return true;
default: default:
return StoreImplementationUtils.isProbeForSyncable(capability); return StoreImplementationUtils.isProbeForSyncable(capability);

View File

@ -93,6 +93,12 @@ public interface StreamCapabilities {
*/ */
String ABORTABLE_STREAM = CommonPathCapabilities.ABORTABLE_STREAM; String ABORTABLE_STREAM = CommonPathCapabilities.ABORTABLE_STREAM;
/**
* Streams that support IOStatistics context and capture thread-level
* IOStatistics.
*/
String IOSTATISTICS_CONTEXT = "fs.capability.iocontext.supported";
/** /**
* Capabilities that a stream can support and be queried for. * Capabilities that a stream can support and be queried for.
*/ */

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.fs.impl; package org.apache.hadoop.fs.impl;
import java.lang.ref.WeakReference;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.Function; import java.util.function.Function;
import javax.annotation.Nullable; import javax.annotation.Nullable;
@ -48,7 +49,17 @@ public long currentThreadId() {
} }
public V setForCurrentThread(V newVal) { public V setForCurrentThread(V newVal) {
return put(currentThreadId(), newVal); long id = currentThreadId();
// if the same object is already in the map, just return it.
WeakReference<V> ref = lookup(id);
// Reference value could be set to null. Thus, ref.get() could return
// null. Should be handled accordingly while using the returned value.
if (ref != null && ref.get() == newVal) {
return ref.get();
}
return put(id, newVal);
} }
} }

View File

@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.statistics;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsContextIntegration;
/**
* An interface defined to capture thread-level IOStatistics by using per
* thread context.
* <p>
* The aggregator should be collected in their constructor by statistics-generating
* classes to obtain the aggregator to update <i>across all threads</i>.
* <p>
* The {@link #snapshot()} call creates a snapshot of the statistics;
* <p>
* The {@link #reset()} call resets the statistics in the context so
* that later snapshots will get the incremental data.
*/
public interface IOStatisticsContext extends IOStatisticsSource {
/**
* Get the IOStatisticsAggregator for the context.
*
* @return return the aggregator for the context.
*/
IOStatisticsAggregator getAggregator();
/**
* Capture the snapshot of the context's IOStatistics.
*
* @return IOStatisticsSnapshot for the context.
*/
IOStatisticsSnapshot snapshot();
/**
* Get a unique ID for this context, for logging
* purposes.
*
* @return an ID unique for all contexts in this process.
*/
long getID();
/**
* Reset the context's IOStatistics.
*/
void reset();
/**
* Get the context's IOStatisticsContext.
*
* @return instance of IOStatisticsContext for the context.
*/
static IOStatisticsContext getCurrentIOStatisticsContext() {
return IOStatisticsContextIntegration.getCurrentIOStatisticsContext();
}
/**
* Set the IOStatisticsContext for the current thread.
* @param statisticsContext IOStatistics context instance for the
* current thread. If null, the context is reset.
*/
static void setThreadIOStatisticsContext(
IOStatisticsContext statisticsContext) {
IOStatisticsContextIntegration.setThreadIOStatisticsContext(
statisticsContext);
}
}

View File

@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.statistics.impl;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsAggregator;
import org.apache.hadoop.fs.statistics.IOStatisticsContext;
import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
/**
* Empty IOStatistics context which serves no-op for all the operations and
* returns an empty Snapshot if asked.
*
*/
final class EmptyIOStatisticsContextImpl implements IOStatisticsContext {
private static final IOStatisticsContext EMPTY_CONTEXT = new EmptyIOStatisticsContextImpl();
private EmptyIOStatisticsContextImpl() {
}
/**
* Create a new empty snapshot.
* A new one is always created for isolation.
*
* @return a statistics snapshot
*/
@Override
public IOStatisticsSnapshot snapshot() {
return new IOStatisticsSnapshot();
}
@Override
public IOStatisticsAggregator getAggregator() {
return EmptyIOStatisticsStore.getInstance();
}
@Override
public IOStatistics getIOStatistics() {
return EmptyIOStatistics.getInstance();
}
@Override
public void reset() {}
/**
* The ID is always 0.
* As the real context implementation counter starts at 1,
* we are guaranteed to have unique IDs even between them and
* the empty context.
* @return 0
*/
@Override
public long getID() {
return 0;
}
/**
* Get the single instance.
* @return an instance.
*/
static IOStatisticsContext getInstance() {
return EMPTY_CONTEXT;
}
}

View File

@ -0,0 +1,128 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.statistics.impl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsAggregator;
import org.apache.hadoop.fs.statistics.IOStatisticsContext;
import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
/**
* Implementing the IOStatisticsContext.
*
* A Context defined for IOStatistics collection per thread which captures
* each worker thread's work in FS streams and stores it in the form of
* IOStatisticsSnapshot.
*
* For the current thread the IOStatisticsSnapshot can be used as a way to
* move the IOStatistics data between applications using the Serializable
* nature of the class.
*/
public final class IOStatisticsContextImpl implements IOStatisticsContext {
private static final Logger LOG =
LoggerFactory.getLogger(IOStatisticsContextImpl.class);
/**
* Thread ID.
*/
private final long threadId;
/**
* Unique ID.
*/
private final long id;
/**
* IOStatistics to aggregate.
*/
private final IOStatisticsSnapshot ioStatistics = new IOStatisticsSnapshot();
/**
* Constructor.
* @param threadId thread ID
* @param id instance ID.
*/
public IOStatisticsContextImpl(final long threadId, final long id) {
this.threadId = threadId;
this.id = id;
}
@Override
public String toString() {
return "IOStatisticsContextImpl{" +
"id=" + id +
", threadId=" + threadId +
", ioStatistics=" + ioStatistics +
'}';
}
/**
* Get the IOStatisticsAggregator of the context.
* @return the instance of IOStatisticsAggregator for this context.
*/
@Override
public IOStatisticsAggregator getAggregator() {
return ioStatistics;
}
/**
* Returns a snapshot of the current thread's IOStatistics.
*
* @return IOStatisticsSnapshot of the context.
*/
@Override
public IOStatisticsSnapshot snapshot() {
LOG.debug("Taking snapshot of IOStatisticsContext id {}", id);
return new IOStatisticsSnapshot(ioStatistics);
}
/**
* Reset the thread +.
*/
@Override
public void reset() {
LOG.debug("clearing IOStatisticsContext id {}", id);
ioStatistics.clear();
}
@Override
public IOStatistics getIOStatistics() {
return ioStatistics;
}
/**
* ID of this context.
* @return ID.
*/
@Override
public long getID() {
return id;
}
/**
* Get the thread ID.
* @return thread ID.
*/
public long getThreadID() {
return threadId;
}
}

View File

@ -0,0 +1,167 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.statistics.impl;
import java.lang.ref.WeakReference;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.impl.WeakReferenceThreadMap;
import org.apache.hadoop.fs.statistics.IOStatisticsContext;
import static org.apache.hadoop.fs.CommonConfigurationKeys.THREAD_LEVEL_IOSTATISTICS_ENABLED;
import static org.apache.hadoop.fs.CommonConfigurationKeys.THREAD_LEVEL_IOSTATISTICS_ENABLED_DEFAULT;
/**
* A Utility class for IOStatisticsContext, which helps in creating and
* getting the current active context. Static methods in this class allows to
* get the current context to start aggregating the IOStatistics.
*
* Static initializer is used to work out if the feature to collect
* thread-level IOStatistics is enabled or not and the corresponding
* implementation class is called for it.
*
* Weak Reference thread map to be used to keep track of different context's
* to avoid long-lived memory leakages as these references would be cleaned
* up at GC.
*/
public final class IOStatisticsContextIntegration {
private static final Logger LOG =
LoggerFactory.getLogger(IOStatisticsContextIntegration.class);
/**
* Is thread-level IO Statistics enabled?
*/
private static boolean isThreadIOStatsEnabled;
/**
* ID for next instance to create.
*/
public static final AtomicLong INSTANCE_ID = new AtomicLong(1);
/**
* Active IOStatistics Context containing different worker thread's
* statistics. Weak Reference so that it gets cleaned up during GC and we
* avoid any memory leak issues due to long lived references.
*/
private static final WeakReferenceThreadMap<IOStatisticsContext>
ACTIVE_IOSTATS_CONTEXT =
new WeakReferenceThreadMap<>(
IOStatisticsContextIntegration::createNewInstance,
IOStatisticsContextIntegration::referenceLostContext
);
static {
// Work out if the current context has thread level IOStatistics enabled.
final Configuration configuration = new Configuration();
isThreadIOStatsEnabled =
configuration.getBoolean(THREAD_LEVEL_IOSTATISTICS_ENABLED,
THREAD_LEVEL_IOSTATISTICS_ENABLED_DEFAULT);
}
/**
* Private constructor for a utility class to be used in IOStatisticsContext.
*/
private IOStatisticsContextIntegration() {}
/**
* Creating a new IOStatisticsContext instance for a FS to be used.
* @param key Thread ID that represents which thread the context belongs to.
* @return an instance of IOStatisticsContext.
*/
private static IOStatisticsContext createNewInstance(Long key) {
return new IOStatisticsContextImpl(key, INSTANCE_ID.getAndIncrement());
}
/**
* In case of reference loss for IOStatisticsContext.
* @param key ThreadID.
*/
private static void referenceLostContext(Long key) {
LOG.debug("Reference lost for threadID for the context: {}", key);
}
/**
* Get the current thread's IOStatisticsContext instance. If no instance is
* present for this thread ID, create one using the factory.
* @return instance of IOStatisticsContext.
*/
public static IOStatisticsContext getCurrentIOStatisticsContext() {
return isThreadIOStatsEnabled
? ACTIVE_IOSTATS_CONTEXT.getForCurrentThread()
: EmptyIOStatisticsContextImpl.getInstance();
}
/**
* Set the IOStatisticsContext for the current thread.
* @param statisticsContext IOStatistics context instance for the
* current thread. If null, the context is reset.
*/
public static void setThreadIOStatisticsContext(
IOStatisticsContext statisticsContext) {
if (isThreadIOStatsEnabled) {
if (statisticsContext == null) {
ACTIVE_IOSTATS_CONTEXT.removeForCurrentThread();
}
if (ACTIVE_IOSTATS_CONTEXT.getForCurrentThread() != statisticsContext) {
ACTIVE_IOSTATS_CONTEXT.setForCurrentThread(statisticsContext);
}
}
}
/**
* Get thread ID specific IOStatistics values if
* statistics are enabled and the thread ID is in the map.
* @param testThreadId thread ID.
* @return IOStatisticsContext if found in the map.
*/
@VisibleForTesting
public static IOStatisticsContext getThreadSpecificIOStatisticsContext(long testThreadId) {
LOG.debug("IOStatsContext thread ID required: {}", testThreadId);
if (!isThreadIOStatsEnabled) {
return null;
}
// lookup the weakRef IOStatisticsContext for the thread ID in the
// ThreadMap.
WeakReference<IOStatisticsContext> ioStatisticsSnapshotWeakReference =
ACTIVE_IOSTATS_CONTEXT.lookup(testThreadId);
if (ioStatisticsSnapshotWeakReference != null) {
return ioStatisticsSnapshotWeakReference.get();
}
return null;
}
/**
* A method to enable IOStatisticsContext to override if set otherwise in
* the configurations for tests.
*/
@VisibleForTesting
public static void enableIOStatisticsContext() {
if (!isThreadIOStatsEnabled) {
LOG.info("Enabling Thread IOStatistics..");
isThreadIOStatsEnabled = true;
}
}
}

View File

@ -37,6 +37,7 @@
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.statistics.IOStatisticsContext;
import static java.util.Objects.requireNonNull; import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.util.functional.RemoteIterators.remoteIteratorFromIterable; import static org.apache.hadoop.util.functional.RemoteIterators.remoteIteratorFromIterable;
@ -136,6 +137,15 @@ public static class Builder<I> {
private boolean stopAbortsOnFailure = false; private boolean stopAbortsOnFailure = false;
private int sleepInterval = SLEEP_INTERVAL_AWAITING_COMPLETION; private int sleepInterval = SLEEP_INTERVAL_AWAITING_COMPLETION;
/**
* IOStatisticsContext to switch to in all threads
* taking part in the commit operation.
* This ensures that the IOStatistics collected in the
* worker threads will be aggregated into the total statistics
* of the thread calling the committer commit/abort methods.
*/
private IOStatisticsContext ioStatisticsContext = null;
/** /**
* Create the builder. * Create the builder.
* @param items items to process * @param items items to process
@ -242,7 +252,7 @@ public Builder<I> stopAbortsOnFailure() {
* @param value new value * @param value new value
* @return the builder * @return the builder
*/ */
public Builder sleepInterval(final int value) { public Builder<I> sleepInterval(final int value) {
sleepInterval = value; sleepInterval = value;
return this; return this;
} }
@ -364,6 +374,8 @@ private <E extends Exception> boolean runSingleThreaded(Task<I, E> task)
/** /**
* Parallel execution. * Parallel execution.
* All tasks run within the same IOStatisticsContext as the
* thread calling this method.
* @param task task to execute * @param task task to execute
* @param <E> exception which may be raised in execution. * @param <E> exception which may be raised in execution.
* @return true if the operation executed successfully * @return true if the operation executed successfully
@ -379,64 +391,70 @@ private <E extends Exception> boolean runParallel(final Task<I, E> task)
final AtomicBoolean revertFailed = new AtomicBoolean(false); final AtomicBoolean revertFailed = new AtomicBoolean(false);
List<Future<?>> futures = new ArrayList<>(); List<Future<?>> futures = new ArrayList<>();
ioStatisticsContext = IOStatisticsContext.getCurrentIOStatisticsContext();
IOException iteratorIOE = null; IOException iteratorIOE = null;
final RemoteIterator<I> iterator = this.items; final RemoteIterator<I> iterator = this.items;
try { try {
while(iterator.hasNext()) { while (iterator.hasNext()) {
final I item = iterator.next(); final I item = iterator.next();
// submit a task for each item that will either run or abort the task // submit a task for each item that will either run or abort the task
futures.add(service.submit(() -> { futures.add(service.submit(() -> {
if (!(stopOnFailure && taskFailed.get())) { setStatisticsContext();
// run the task try {
boolean threw = true; if (!(stopOnFailure && taskFailed.get())) {
try { // prepare and run the task
LOG.debug("Executing task"); boolean threw = true;
task.run(item); try {
succeeded.add(item); LOG.debug("Executing task");
LOG.debug("Task succeeded"); task.run(item);
succeeded.add(item);
LOG.debug("Task succeeded");
threw = false; threw = false;
} catch (Exception e) { } catch (Exception e) {
taskFailed.set(true); taskFailed.set(true);
exceptions.add(e); exceptions.add(e);
LOG.info("Task failed {}", e.toString()); LOG.info("Task failed {}", e.toString());
LOG.debug("Task failed", e); LOG.debug("Task failed", e);
if (onFailure != null) { if (onFailure != null) {
try { try {
onFailure.run(item, e); onFailure.run(item, e);
} catch (Exception failException) { } catch (Exception failException) {
LOG.warn("Failed to clean up on failure", e); LOG.warn("Failed to clean up on failure", e);
// swallow the exception // swallow the exception
}
}
} finally {
if (threw) {
taskFailed.set(true);
} }
} }
} finally {
if (threw) { } else if (abortTask != null) {
taskFailed.set(true); // abort the task instead of running it
} if (stopAbortsOnFailure && abortFailed.get()) {
} return;
}
} else if (abortTask != null) {
// abort the task instead of running it boolean failed = true;
if (stopAbortsOnFailure && abortFailed.get()) { try {
return; LOG.info("Aborting task");
} abortTask.run(item);
failed = false;
boolean failed = true; } catch (Exception e) {
try { LOG.error("Failed to abort task", e);
LOG.info("Aborting task"); // swallow the exception
abortTask.run(item); } finally {
failed = false; if (failed) {
} catch (Exception e) { abortFailed.set(true);
LOG.error("Failed to abort task", e); }
// swallow the exception
} finally {
if (failed) {
abortFailed.set(true);
} }
} }
} finally {
resetStatisticsContext();
} }
})); }));
} }
@ -447,7 +465,6 @@ private <E extends Exception> boolean runParallel(final Task<I, E> task)
// mark as a task failure so all submitted tasks will halt/abort // mark as a task failure so all submitted tasks will halt/abort
taskFailed.set(true); taskFailed.set(true);
} }
// let the above tasks complete (or abort) // let the above tasks complete (or abort)
waitFor(futures, sleepInterval); waitFor(futures, sleepInterval);
int futureCount = futures.size(); int futureCount = futures.size();
@ -464,6 +481,7 @@ private <E extends Exception> boolean runParallel(final Task<I, E> task)
} }
boolean failed = true; boolean failed = true;
setStatisticsContext();
try { try {
revertTask.run(item); revertTask.run(item);
failed = false; failed = false;
@ -474,6 +492,7 @@ private <E extends Exception> boolean runParallel(final Task<I, E> task)
if (failed) { if (failed) {
revertFailed.set(true); revertFailed.set(true);
} }
resetStatisticsContext();
} }
})); }));
} }
@ -498,6 +517,26 @@ private <E extends Exception> boolean runParallel(final Task<I, E> task)
// return true if all tasks succeeded. // return true if all tasks succeeded.
return !taskFailed.get(); return !taskFailed.get();
} }
/**
* Set the statistics context for this thread.
*/
private void setStatisticsContext() {
if (ioStatisticsContext != null) {
IOStatisticsContext.setThreadIOStatisticsContext(ioStatisticsContext);
}
}
/**
* Reset the statistics context if it was set earlier.
* This unbinds the current thread from any statistics
* context.
*/
private void resetStatisticsContext() {
if (ioStatisticsContext != null) {
IOStatisticsContext.setThreadIOStatisticsContext(null);
}
}
} }
/** /**

View File

@ -31,7 +31,9 @@
import org.apache.hadoop.fs.s3a.impl.ListingOperationCallbacks; import org.apache.hadoop.fs.s3a.impl.ListingOperationCallbacks;
import org.apache.hadoop.fs.s3a.impl.StoreContext; import org.apache.hadoop.fs.s3a.impl.StoreContext;
import org.apache.hadoop.fs.statistics.IOStatistics; import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsAggregator;
import org.apache.hadoop.fs.statistics.IOStatisticsSource; import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.fs.statistics.IOStatisticsContext;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore; import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
import org.apache.hadoop.fs.store.audit.AuditSpan; import org.apache.hadoop.fs.store.audit.AuditSpan;
import org.apache.hadoop.util.functional.RemoteIterators; import org.apache.hadoop.util.functional.RemoteIterators;
@ -333,7 +335,7 @@ interface FileStatusAcceptor {
* Thread safety: None. * Thread safety: None.
*/ */
class FileStatusListingIterator class FileStatusListingIterator
implements RemoteIterator<S3AFileStatus>, IOStatisticsSource { implements RemoteIterator<S3AFileStatus>, IOStatisticsSource, Closeable {
/** Source of objects. */ /** Source of objects. */
private final ObjectListingIterator source; private final ObjectListingIterator source;
@ -403,6 +405,14 @@ public S3AFileStatus next() throws IOException {
return status; return status;
} }
/**
* Close, if called, will update
* the thread statistics context with the value.
*/
@Override
public void close() {
source.close();
}
/** /**
* Try to retrieve another batch. * Try to retrieve another batch.
* Note that for the initial batch, * Note that for the initial batch,
@ -545,6 +555,11 @@ class ObjectListingIterator implements RemoteIterator<S3ListResult>,
private final AuditSpan span; private final AuditSpan span;
/**
* Context statistics aggregator.
*/
private final IOStatisticsAggregator aggregator;
/** The most recent listing results. */ /** The most recent listing results. */
private S3ListResult objects; private S3ListResult objects;
@ -601,6 +616,8 @@ class ObjectListingIterator implements RemoteIterator<S3ListResult>,
this.span = span; this.span = span;
this.s3ListResultFuture = listingOperationCallbacks this.s3ListResultFuture = listingOperationCallbacks
.listObjectsAsync(request, iostats, span); .listObjectsAsync(request, iostats, span);
this.aggregator = IOStatisticsContext.getCurrentIOStatisticsContext()
.getAggregator();
} }
/** /**
@ -693,11 +710,12 @@ public int getListingCount() {
} }
/** /**
* Close, if actually called, will close the span * Close, if called, will update
* this listing was created with. * the thread statistics context with the value.
*/ */
@Override @Override
public void close() { public void close() {
aggregator.aggregate(getIOStatistics());
} }
} }

View File

@ -41,6 +41,7 @@
import com.amazonaws.services.s3.model.UploadPartRequest; import com.amazonaws.services.s3.model.UploadPartRequest;
import org.apache.hadoop.fs.s3a.impl.PutObjectOptions; import org.apache.hadoop.fs.s3a.impl.PutObjectOptions;
import org.apache.hadoop.fs.statistics.IOStatisticsAggregator;
import org.apache.hadoop.util.Preconditions; import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Futures; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Futures;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture;
@ -165,6 +166,9 @@ class S3ABlockOutputStream extends OutputStream implements
/** is client side encryption enabled? */ /** is client side encryption enabled? */
private final boolean isCSEEnabled; private final boolean isCSEEnabled;
/** Thread level IOStatistics Aggregator. */
private final IOStatisticsAggregator threadIOStatisticsAggregator;
/** /**
* An S3A output stream which uploads partitions in a separate pool of * An S3A output stream which uploads partitions in a separate pool of
* threads; different {@link S3ADataBlocks.BlockFactory} * threads; different {@link S3ADataBlocks.BlockFactory}
@ -201,6 +205,7 @@ class S3ABlockOutputStream extends OutputStream implements
initMultipartUpload(); initMultipartUpload();
} }
this.isCSEEnabled = builder.isCSEEnabled; this.isCSEEnabled = builder.isCSEEnabled;
this.threadIOStatisticsAggregator = builder.ioStatisticsAggregator;
} }
/** /**
@ -454,11 +459,23 @@ public void close() throws IOException {
*/ */
private synchronized void cleanupOnClose() { private synchronized void cleanupOnClose() {
cleanupWithLogger(LOG, getActiveBlock(), blockFactory); cleanupWithLogger(LOG, getActiveBlock(), blockFactory);
mergeThreadIOStatistics(statistics.getIOStatistics());
LOG.debug("Statistics: {}", statistics); LOG.debug("Statistics: {}", statistics);
cleanupWithLogger(LOG, statistics); cleanupWithLogger(LOG, statistics);
clearActiveBlock(); clearActiveBlock();
} }
/**
* Merging the current thread's IOStatistics with the current IOStatistics
* context.
*
* @param streamStatistics Stream statistics to be merged into thread
* statistics aggregator.
*/
private void mergeThreadIOStatistics(IOStatistics streamStatistics) {
getThreadIOStatistics().aggregate(streamStatistics);
}
/** /**
* Best effort abort of the multipart upload; sets * Best effort abort of the multipart upload; sets
* the field to null afterwards. * the field to null afterwards.
@ -662,6 +679,10 @@ public boolean hasCapability(String capability) {
case StreamCapabilities.ABORTABLE_STREAM: case StreamCapabilities.ABORTABLE_STREAM:
return true; return true;
// IOStatistics context support for thread-level IOStatistics.
case StreamCapabilities.IOSTATISTICS_CONTEXT:
return true;
default: default:
return false; return false;
} }
@ -701,6 +722,14 @@ public IOStatistics getIOStatistics() {
return iostatistics; return iostatistics;
} }
/**
* Get the IOStatistics aggregator passed in the builder.
* @return an aggregator
*/
protected IOStatisticsAggregator getThreadIOStatistics() {
return threadIOStatisticsAggregator;
}
/** /**
* Multiple partition upload. * Multiple partition upload.
*/ */
@ -1092,6 +1121,11 @@ public static final class BlockOutputStreamBuilder {
*/ */
private PutObjectOptions putOptions; private PutObjectOptions putOptions;
/**
* thread-level IOStatistics Aggregator.
*/
private IOStatisticsAggregator ioStatisticsAggregator;
private BlockOutputStreamBuilder() { private BlockOutputStreamBuilder() {
} }
@ -1108,6 +1142,7 @@ public void validate() {
requireNonNull(putOptions, "null putOptions"); requireNonNull(putOptions, "null putOptions");
Preconditions.checkArgument(blockSize >= Constants.MULTIPART_MIN_SIZE, Preconditions.checkArgument(blockSize >= Constants.MULTIPART_MIN_SIZE,
"Block size is too small: %s", blockSize); "Block size is too small: %s", blockSize);
requireNonNull(ioStatisticsAggregator, "null ioStatisticsAggregator");
} }
/** /**
@ -1229,5 +1264,17 @@ public BlockOutputStreamBuilder withPutOptions(
putOptions = value; putOptions = value;
return this; return this;
} }
/**
* Set builder value.
*
* @param value new value
* @return the builder
*/
public BlockOutputStreamBuilder withIOStatisticsAggregator(
final IOStatisticsAggregator value) {
ioStatisticsAggregator = value;
return this;
}
} }
} }

View File

@ -131,6 +131,7 @@
import org.apache.hadoop.fs.statistics.IOStatistics; import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsLogging; import org.apache.hadoop.fs.statistics.IOStatisticsLogging;
import org.apache.hadoop.fs.statistics.IOStatisticsSource; import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.fs.statistics.IOStatisticsContext;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore; import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
import org.apache.hadoop.fs.store.audit.AuditEntryPoint; import org.apache.hadoop.fs.store.audit.AuditEntryPoint;
import org.apache.hadoop.fs.store.audit.ActiveThreadSpanSource; import org.apache.hadoop.fs.store.audit.ActiveThreadSpanSource;
@ -1576,7 +1577,8 @@ protected S3AReadOpContext createReadContext(
statistics, statistics,
statisticsContext, statisticsContext,
fileStatus, fileStatus,
vectoredIOContext) vectoredIOContext,
IOStatisticsContext.getCurrentIOStatisticsContext().getAggregator())
.withAuditSpan(auditSpan); .withAuditSpan(auditSpan);
openFileHelper.applyDefaultOptions(roc); openFileHelper.applyDefaultOptions(roc);
return roc.build(); return roc.build();
@ -1743,7 +1745,9 @@ private FSDataOutputStream innerCreateFile(
DOWNGRADE_SYNCABLE_EXCEPTIONS, DOWNGRADE_SYNCABLE_EXCEPTIONS,
DOWNGRADE_SYNCABLE_EXCEPTIONS_DEFAULT)) DOWNGRADE_SYNCABLE_EXCEPTIONS_DEFAULT))
.withCSEEnabled(isCSEEnabled) .withCSEEnabled(isCSEEnabled)
.withPutOptions(putOptions); .withPutOptions(putOptions)
.withIOStatisticsAggregator(
IOStatisticsContext.getCurrentIOStatisticsContext().getAggregator());
return new FSDataOutputStream( return new FSDataOutputStream(
new S3ABlockOutputStream(builder), new S3ABlockOutputStream(builder),
null); null);

View File

@ -37,6 +37,7 @@
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.classification.VisibleForTesting;
@ -53,9 +54,9 @@
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
import org.apache.hadoop.fs.statistics.DurationTracker; import org.apache.hadoop.fs.statistics.DurationTracker;
import org.apache.hadoop.fs.statistics.IOStatistics; import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsAggregator;
import org.apache.hadoop.fs.statistics.IOStatisticsSource; import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.util.functional.CallableRaisingIOE; import org.apache.hadoop.util.functional.CallableRaisingIOE;
import static java.util.Objects.requireNonNull; import static java.util.Objects.requireNonNull;
@ -187,6 +188,9 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
*/ */
private long asyncDrainThreshold; private long asyncDrainThreshold;
/** Aggregator used to aggregate per thread IOStatistics. */
private final IOStatisticsAggregator threadIOStatistics;
/** /**
* Create the stream. * Create the stream.
* This does not attempt to open it; that is only done on the first * This does not attempt to open it; that is only done on the first
@ -225,6 +229,7 @@ public S3AInputStream(S3AReadOpContext ctx,
this.asyncDrainThreshold = ctx.getAsyncDrainThreshold(); this.asyncDrainThreshold = ctx.getAsyncDrainThreshold();
this.unboundedThreadPool = unboundedThreadPool; this.unboundedThreadPool = unboundedThreadPool;
this.vectoredIOContext = context.getVectoredIOContext(); this.vectoredIOContext = context.getVectoredIOContext();
this.threadIOStatistics = requireNonNull(ctx.getIOStatisticsAggregator());
} }
/** /**
@ -600,7 +605,6 @@ public synchronized void close() throws IOException {
stopVectoredIOOperations.set(true); stopVectoredIOOperations.set(true);
// close or abort the stream; blocking // close or abort the stream; blocking
awaitFuture(closeStream("close() operation", false, true)); awaitFuture(closeStream("close() operation", false, true));
LOG.debug("Statistics of stream {}\n{}", key, streamStatistics);
// end the client+audit span. // end the client+audit span.
client.close(); client.close();
// this is actually a no-op // this is actually a no-op
@ -608,10 +612,23 @@ public synchronized void close() throws IOException {
} finally { } finally {
// merge the statistics back into the FS statistics. // merge the statistics back into the FS statistics.
streamStatistics.close(); streamStatistics.close();
// Collect ThreadLevel IOStats
mergeThreadIOStatistics(streamStatistics.getIOStatistics());
} }
} }
} }
/**
* Merging the current thread's IOStatistics with the current IOStatistics
* context.
*
* @param streamIOStats Stream statistics to be merged into thread
* statistics aggregator.
*/
private void mergeThreadIOStatistics(IOStatistics streamIOStats) {
threadIOStatistics.aggregate(streamIOStats);
}
/** /**
* Close a stream: decide whether to abort or close, based on * Close a stream: decide whether to abort or close, based on
* the length of the stream and the current position. * the length of the stream and the current position.
@ -1331,6 +1348,7 @@ public synchronized void unbuffer() {
public boolean hasCapability(String capability) { public boolean hasCapability(String capability) {
switch (toLowerCase(capability)) { switch (toLowerCase(capability)) {
case StreamCapabilities.IOSTATISTICS: case StreamCapabilities.IOSTATISTICS:
case StreamCapabilities.IOSTATISTICS_CONTEXT:
case StreamCapabilities.READAHEAD: case StreamCapabilities.READAHEAD:
case StreamCapabilities.UNBUFFER: case StreamCapabilities.UNBUFFER:
case StreamCapabilities.VECTOREDIO: case StreamCapabilities.VECTOREDIO:

View File

@ -23,6 +23,7 @@
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy; import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext; import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext;
import org.apache.hadoop.fs.statistics.IOStatisticsAggregator;
import org.apache.hadoop.fs.store.audit.AuditSpan; import org.apache.hadoop.fs.store.audit.AuditSpan;
import javax.annotation.Nullable; import javax.annotation.Nullable;
@ -70,6 +71,9 @@ public class S3AReadOpContext extends S3AOpContext {
*/ */
private final VectoredIOContext vectoredIOContext; private final VectoredIOContext vectoredIOContext;
/** Thread-level IOStatistics aggregator. **/
private final IOStatisticsAggregator ioStatisticsAggregator;
/** /**
* Instantiate. * Instantiate.
* @param path path of read * @param path path of read
@ -78,6 +82,7 @@ public class S3AReadOpContext extends S3AOpContext {
* @param instrumentation statistics context * @param instrumentation statistics context
* @param dstFileStatus target file status * @param dstFileStatus target file status
* @param vectoredIOContext context for vectored read operation. * @param vectoredIOContext context for vectored read operation.
* @param ioStatisticsAggregator IOStatistics aggregator for each thread.
*/ */
public S3AReadOpContext( public S3AReadOpContext(
final Path path, final Path path,
@ -85,11 +90,13 @@ public S3AReadOpContext(
@Nullable FileSystem.Statistics stats, @Nullable FileSystem.Statistics stats,
S3AStatisticsContext instrumentation, S3AStatisticsContext instrumentation,
FileStatus dstFileStatus, FileStatus dstFileStatus,
VectoredIOContext vectoredIOContext) { VectoredIOContext vectoredIOContext,
IOStatisticsAggregator ioStatisticsAggregator) {
super(invoker, stats, instrumentation, super(invoker, stats, instrumentation,
dstFileStatus); dstFileStatus);
this.path = requireNonNull(path); this.path = requireNonNull(path);
this.vectoredIOContext = requireNonNull(vectoredIOContext, "vectoredIOContext"); this.vectoredIOContext = requireNonNull(vectoredIOContext, "vectoredIOContext");
this.ioStatisticsAggregator = ioStatisticsAggregator;
} }
/** /**
@ -105,6 +112,7 @@ public S3AReadOpContext build() {
"invalid readahead %d", readahead); "invalid readahead %d", readahead);
Preconditions.checkArgument(asyncDrainThreshold >= 0, Preconditions.checkArgument(asyncDrainThreshold >= 0,
"invalid drainThreshold %d", asyncDrainThreshold); "invalid drainThreshold %d", asyncDrainThreshold);
requireNonNull(ioStatisticsAggregator, "ioStatisticsAggregator");
return this; return this;
} }
@ -215,6 +223,15 @@ public VectoredIOContext getVectoredIOContext() {
return vectoredIOContext; return vectoredIOContext;
} }
/**
* Return the IOStatistics aggregator.
*
* @return instance of IOStatisticsAggregator.
*/
public IOStatisticsAggregator getIOStatisticsAggregator() {
return ioStatisticsAggregator;
}
@Override @Override
public String toString() { public String toString() {
final StringBuilder sb = new StringBuilder( final StringBuilder sb = new StringBuilder(

View File

@ -40,6 +40,7 @@
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.audit.AuditConstants; import org.apache.hadoop.fs.audit.AuditConstants;
import org.apache.hadoop.fs.statistics.IOStatisticsContext;
import org.apache.hadoop.fs.store.audit.AuditSpan; import org.apache.hadoop.fs.store.audit.AuditSpan;
import org.apache.hadoop.fs.store.audit.AuditSpanSource; import org.apache.hadoop.fs.store.audit.AuditSpanSource;
import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.fs.s3a.S3AFileSystem;
@ -467,8 +468,7 @@ public void recoverTask(TaskAttemptContext taskContext) throws IOException {
* *
* While the classic committers create a 0-byte file, the S3A committers * While the classic committers create a 0-byte file, the S3A committers
* PUT up a the contents of a {@link SuccessData} file. * PUT up a the contents of a {@link SuccessData} file.
* * @param commitContext commit context
* @param context job context
* @param pending the pending commits * @param pending the pending commits
* *
* @return the success data, even if the marker wasn't created * @return the success data, even if the marker wasn't created
@ -476,7 +476,7 @@ public void recoverTask(TaskAttemptContext taskContext) throws IOException {
* @throws IOException IO failure * @throws IOException IO failure
*/ */
protected SuccessData maybeCreateSuccessMarkerFromCommits( protected SuccessData maybeCreateSuccessMarkerFromCommits(
JobContext context, final CommitContext commitContext,
ActiveCommit pending) throws IOException { ActiveCommit pending) throws IOException {
List<String> filenames = new ArrayList<>(pending.size()); List<String> filenames = new ArrayList<>(pending.size());
// The list of committed objects in pending is size limited in // The list of committed objects in pending is size limited in
@ -488,7 +488,13 @@ protected SuccessData maybeCreateSuccessMarkerFromCommits(
// and the current statistics // and the current statistics
snapshot.aggregate(getIOStatistics()); snapshot.aggregate(getIOStatistics());
return maybeCreateSuccessMarker(context, filenames, snapshot); // and include the context statistics if enabled
if (commitContext.isCollectIOStatistics()) {
snapshot.aggregate(commitContext.getIOStatisticsContext()
.getIOStatistics());
}
return maybeCreateSuccessMarker(commitContext.getJobContext(), filenames, snapshot);
} }
/** /**
@ -729,6 +735,7 @@ private void loadAndCommit(
final FileStatus status) throws IOException { final FileStatus status) throws IOException {
final Path path = status.getPath(); final Path path = status.getPath();
commitContext.switchToIOStatisticsContext();
try (DurationInfo ignored = try (DurationInfo ignored =
new DurationInfo(LOG, new DurationInfo(LOG,
"Loading and committing files in pendingset %s", path)) { "Loading and committing files in pendingset %s", path)) {
@ -775,6 +782,7 @@ private void loadAndRevert(
final FileStatus status) throws IOException { final FileStatus status) throws IOException {
final Path path = status.getPath(); final Path path = status.getPath();
commitContext.switchToIOStatisticsContext();
try (DurationInfo ignored = try (DurationInfo ignored =
new DurationInfo(LOG, false, "Committing %s", path)) { new DurationInfo(LOG, false, "Committing %s", path)) {
PendingSet pendingSet = PersistentCommitData.load( PendingSet pendingSet = PersistentCommitData.load(
@ -806,6 +814,7 @@ private void loadAndAbort(
final boolean deleteRemoteFiles) throws IOException { final boolean deleteRemoteFiles) throws IOException {
final Path path = status.getPath(); final Path path = status.getPath();
commitContext.switchToIOStatisticsContext();
try (DurationInfo ignored = try (DurationInfo ignored =
new DurationInfo(LOG, false, "Aborting %s", path)) { new DurationInfo(LOG, false, "Aborting %s", path)) {
PendingSet pendingSet = PersistentCommitData.load( PendingSet pendingSet = PersistentCommitData.load(
@ -832,6 +841,8 @@ private void loadAndAbort(
/** /**
* Start the final job commit/abort commit operations. * Start the final job commit/abort commit operations.
* If configured to collect statistics,
* The IO StatisticsContext is reset.
* @param context job context * @param context job context
* @return a commit context through which the operations can be invoked. * @return a commit context through which the operations can be invoked.
* @throws IOException failure. * @throws IOException failure.
@ -840,14 +851,22 @@ protected CommitContext initiateJobOperation(
final JobContext context) final JobContext context)
throws IOException { throws IOException {
return getCommitOperations().createCommitContext( IOStatisticsContext ioStatisticsContext =
IOStatisticsContext.getCurrentIOStatisticsContext();
CommitContext commitContext = getCommitOperations().createCommitContext(
context, context,
getOutputPath(), getOutputPath(),
getJobCommitThreadCount(context)); getJobCommitThreadCount(context),
ioStatisticsContext);
commitContext.maybeResetIOStatisticsContext();
return commitContext;
} }
/** /**
* Start a ask commit/abort commit operations. * Start a ask commit/abort commit operations.
* This may have a different thread count. * This may have a different thread count.
* If configured to collect statistics,
* The IO StatisticsContext is reset.
* @param context job or task context * @param context job or task context
* @return a commit context through which the operations can be invoked. * @return a commit context through which the operations can be invoked.
* @throws IOException failure. * @throws IOException failure.
@ -856,10 +875,13 @@ protected CommitContext initiateTaskOperation(
final JobContext context) final JobContext context)
throws IOException { throws IOException {
return getCommitOperations().createCommitContext( CommitContext commitContext = getCommitOperations().createCommitContext(
context, context,
getOutputPath(), getOutputPath(),
getTaskCommitThreadCount(context)); getTaskCommitThreadCount(context),
IOStatisticsContext.getCurrentIOStatisticsContext());
commitContext.maybeResetIOStatisticsContext();
return commitContext;
} }
/** /**
@ -1014,7 +1036,7 @@ public void commitJob(JobContext context) throws IOException {
stage = "completed"; stage = "completed";
jobCompleted(true); jobCompleted(true);
stage = "marker"; stage = "marker";
successData = maybeCreateSuccessMarkerFromCommits(context, pending); successData = maybeCreateSuccessMarkerFromCommits(commitContext, pending);
stage = "cleanup"; stage = "cleanup";
cleanup(commitContext, false); cleanup(commitContext, false);
} catch (IOException e) { } catch (IOException e) {

View File

@ -354,4 +354,24 @@ private CommitConstants() {
public static final String OPT_SUMMARY_REPORT_DIR = public static final String OPT_SUMMARY_REPORT_DIR =
OPT_PREFIX + "summary.report.directory"; OPT_PREFIX + "summary.report.directory";
/**
* Experimental feature to collect thread level IO statistics.
* When set the committers will reset the statistics in
* task setup and propagate to the job committer.
* The job comitter will include those and its own statistics.
* Do not use if the execution engine is collecting statistics,
* as the multiple reset() operations will result in incomplete
* statistics.
* Value: {@value}.
*/
public static final String S3A_COMMITTER_EXPERIMENTAL_COLLECT_IOSTATISTICS =
OPT_PREFIX + "experimental.collect.iostatistics";
/**
* Default value for {@link #S3A_COMMITTER_EXPERIMENTAL_COLLECT_IOSTATISTICS}.
* Value: {@value}.
*/
public static final boolean S3A_COMMITTER_EXPERIMENTAL_COLLECT_IOSTATISTICS_DEFAULT =
false;
} }

View File

@ -21,6 +21,7 @@
import java.io.Closeable; import java.io.Closeable;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
@ -36,6 +37,7 @@
import org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants; import org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants;
import org.apache.hadoop.fs.s3a.commit.files.PendingSet; import org.apache.hadoop.fs.s3a.commit.files.PendingSet;
import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit; import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
import org.apache.hadoop.fs.statistics.IOStatisticsContext;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
@ -47,6 +49,8 @@
import static org.apache.hadoop.fs.s3a.Constants.THREAD_POOL_SHUTDOWN_DELAY_SECONDS; import static org.apache.hadoop.fs.s3a.Constants.THREAD_POOL_SHUTDOWN_DELAY_SECONDS;
import static org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter.THREAD_PREFIX; import static org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter.THREAD_PREFIX;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.S3A_COMMITTER_EXPERIMENTAL_COLLECT_IOSTATISTICS;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.S3A_COMMITTER_EXPERIMENTAL_COLLECT_IOSTATISTICS_DEFAULT;
import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.THREAD_KEEP_ALIVE_TIME; import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.THREAD_KEEP_ALIVE_TIME;
/** /**
@ -123,24 +127,43 @@ public final class CommitContext implements Closeable {
*/ */
private final int committerThreads; private final int committerThreads;
/**
* Should IOStatistics be collected by the committer?
*/
private final boolean collectIOStatistics;
/**
* IOStatisticsContext to switch to in all threads
* taking part in the commit operation.
* This ensures that the IOStatistics collected in the
* worker threads will be aggregated into the total statistics
* of the thread calling the committer commit/abort methods.
*/
private final IOStatisticsContext ioStatisticsContext;
/** /**
* Create. * Create.
* @param commitOperations commit callbacks * @param commitOperations commit callbacks
* @param jobContext job context * @param jobContext job context
* @param committerThreads number of commit threads * @param committerThreads number of commit threads
* @param ioStatisticsContext IOStatistics context of current thread
*/ */
public CommitContext( public CommitContext(
final CommitOperations commitOperations, final CommitOperations commitOperations,
final JobContext jobContext, final JobContext jobContext,
final int committerThreads) { final int committerThreads,
final IOStatisticsContext ioStatisticsContext) {
this.commitOperations = commitOperations; this.commitOperations = commitOperations;
this.jobContext = jobContext; this.jobContext = jobContext;
this.conf = jobContext.getConfiguration(); this.conf = jobContext.getConfiguration();
this.jobId = jobContext.getJobID().toString(); this.jobId = jobContext.getJobID().toString();
this.collectIOStatistics = conf.getBoolean(
S3A_COMMITTER_EXPERIMENTAL_COLLECT_IOSTATISTICS,
S3A_COMMITTER_EXPERIMENTAL_COLLECT_IOSTATISTICS_DEFAULT);
this.ioStatisticsContext = Objects.requireNonNull(ioStatisticsContext);
this.auditContextUpdater = new AuditContextUpdater(jobContext); this.auditContextUpdater = new AuditContextUpdater(jobContext);
this.auditContextUpdater.updateCurrentAuditContext(); this.auditContextUpdater.updateCurrentAuditContext();
this.committerThreads = committerThreads; this.committerThreads = committerThreads;
buildSubmitters(); buildSubmitters();
} }
@ -152,15 +175,19 @@ public CommitContext(
* @param conf job conf * @param conf job conf
* @param jobId ID * @param jobId ID
* @param committerThreads number of commit threads * @param committerThreads number of commit threads
* @param ioStatisticsContext IOStatistics context of current thread
*/ */
public CommitContext(final CommitOperations commitOperations, public CommitContext(final CommitOperations commitOperations,
final Configuration conf, final Configuration conf,
final String jobId, final String jobId,
final int committerThreads) { final int committerThreads,
final IOStatisticsContext ioStatisticsContext) {
this.commitOperations = commitOperations; this.commitOperations = commitOperations;
this.jobContext = null; this.jobContext = null;
this.conf = conf; this.conf = conf;
this.jobId = jobId; this.jobId = jobId;
this.collectIOStatistics = false;
this.ioStatisticsContext = Objects.requireNonNull(ioStatisticsContext);
this.auditContextUpdater = new AuditContextUpdater(jobId); this.auditContextUpdater = new AuditContextUpdater(jobId);
this.auditContextUpdater.updateCurrentAuditContext(); this.auditContextUpdater.updateCurrentAuditContext();
this.committerThreads = committerThreads; this.committerThreads = committerThreads;
@ -358,6 +385,44 @@ public String getJobId() {
return jobId; return jobId;
} }
/**
* Collecting thread level IO statistics?
* @return true if thread level IO stats should be collected.
*/
public boolean isCollectIOStatistics() {
return collectIOStatistics;
}
/**
* IOStatistics context of the created thread.
* @return the IOStatistics.
*/
public IOStatisticsContext getIOStatisticsContext() {
return ioStatisticsContext;
}
/**
* Switch to the context IOStatistics context,
* if needed.
*/
public void switchToIOStatisticsContext() {
IOStatisticsContext.setThreadIOStatisticsContext(ioStatisticsContext);
}
/**
* Reset the IOStatistics context if statistics are being
* collected.
* Logs at info.
*/
public void maybeResetIOStatisticsContext() {
if (collectIOStatistics) {
LOG.info("Resetting IO statistics context {}",
ioStatisticsContext.getID());
ioStatisticsContext.reset();
}
}
/** /**
* Submitter for a given thread pool. * Submitter for a given thread pool.
*/ */

View File

@ -62,6 +62,7 @@
import org.apache.hadoop.fs.statistics.DurationTracker; import org.apache.hadoop.fs.statistics.DurationTracker;
import org.apache.hadoop.fs.statistics.IOStatistics; import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSource; import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.fs.statistics.IOStatisticsContext;
import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.util.DurationInfo; import org.apache.hadoop.util.DurationInfo;
import org.apache.hadoop.util.Preconditions; import org.apache.hadoop.util.Preconditions;
@ -639,15 +640,18 @@ public void jobCompleted(boolean success) {
* @param context job context * @param context job context
* @param path path for all work. * @param path path for all work.
* @param committerThreads thread pool size * @param committerThreads thread pool size
* @param ioStatisticsContext IOStatistics context of current thread
* @return the commit context to pass in. * @return the commit context to pass in.
* @throws IOException failure. * @throws IOException failure.
*/ */
public CommitContext createCommitContext( public CommitContext createCommitContext(
JobContext context, JobContext context,
Path path, Path path,
int committerThreads) throws IOException { int committerThreads,
IOStatisticsContext ioStatisticsContext) throws IOException {
return new CommitContext(this, context, return new CommitContext(this, context,
committerThreads); committerThreads,
ioStatisticsContext);
} }
/** /**
@ -668,7 +672,8 @@ public CommitContext createCommitContextForTesting(
return new CommitContext(this, return new CommitContext(this,
getStoreContext().getConfiguration(), getStoreContext().getConfiguration(),
id, id,
committerThreads); committerThreads,
IOStatisticsContext.getCurrentIOStatisticsContext());
} }
/** /**

View File

@ -219,6 +219,13 @@ private PendingSet innerCommitTask(
} }
pendingSet.putExtraData(TASK_ATTEMPT_ID, taskId); pendingSet.putExtraData(TASK_ATTEMPT_ID, taskId);
pendingSet.setJobId(jobId); pendingSet.setJobId(jobId);
// add in the IOStatistics of all the file loading
if (commitContext.isCollectIOStatistics()) {
pendingSet.getIOStatistics()
.aggregate(
commitContext.getIOStatisticsContext().getIOStatistics());
}
Path jobAttemptPath = getJobAttemptPath(context); Path jobAttemptPath = getJobAttemptPath(context);
TaskAttemptID taskAttemptID = context.getTaskAttemptID(); TaskAttemptID taskAttemptID = context.getTaskAttemptID();
Path taskOutcomePath = new Path(jobAttemptPath, Path taskOutcomePath = new Path(jobAttemptPath,

View File

@ -696,6 +696,13 @@ protected int commitTaskInternal(final TaskAttemptContext context,
pendingCommits.add(commit); pendingCommits.add(commit);
} }
// maybe add in the IOStatistics the thread
if (commitContext.isCollectIOStatistics()) {
pendingCommits.getIOStatistics().aggregate(
commitContext.getIOStatisticsContext()
.getIOStatistics());
}
// save the data // save the data
// overwrite any existing file, so whichever task attempt // overwrite any existing file, so whichever task attempt
// committed last wins. // committed last wins.

View File

@ -27,6 +27,7 @@
import org.apache.hadoop.fs.contract.s3a.S3AContract; import org.apache.hadoop.fs.contract.s3a.S3AContract;
import org.apache.hadoop.fs.s3a.tools.MarkerTool; import org.apache.hadoop.fs.s3a.tools.MarkerTool;
import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot; import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
import org.apache.hadoop.fs.statistics.IOStatisticsContext;
import org.apache.hadoop.fs.store.audit.AuditSpan; import org.apache.hadoop.fs.store.audit.AuditSpan;
import org.apache.hadoop.fs.store.audit.AuditSpanSource; import org.apache.hadoop.fs.store.audit.AuditSpanSource;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
@ -38,6 +39,7 @@
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.atomic.AtomicReference;
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset; import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
@ -66,6 +68,15 @@ public abstract class AbstractS3ATestBase extends AbstractFSContractTestBase
*/ */
private AuditSpanSource spanSource; private AuditSpanSource spanSource;
/**
* Atomic references to be used to re-throw an Exception or an ASE
* caught inside a lambda function.
*/
private static final AtomicReference<Exception> FUTURE_EXCEPTION =
new AtomicReference<>();
private static final AtomicReference<AssertionError> FUTURE_ASE =
new AtomicReference<>();
/** /**
* Get the source. * Get the source.
* @return span source * @return span source
@ -99,6 +110,9 @@ public void setup() throws Exception {
S3AFileSystem.initializeClass(); S3AFileSystem.initializeClass();
super.setup(); super.setup();
setSpanSource(getFileSystem()); setSpanSource(getFileSystem());
// Reset the current context's thread IOStatistics.`
// this ensures that the context stats will always be from the test case
IOStatisticsContext.getCurrentIOStatisticsContext().reset();
} }
@Override @Override
@ -263,4 +277,53 @@ public void skipIfClientSideEncryption() {
Assume.assumeTrue("Skipping test if CSE is enabled", Assume.assumeTrue("Skipping test if CSE is enabled",
!getFileSystem().isCSEEnabled()); !getFileSystem().isCSEEnabled());
} }
/**
* If an exception is caught while doing some work in a Lambda function,
* store it in an atomic reference to be thrown later on.
* @param exception Exception caught.
*/
public static void setFutureException(Exception exception) {
FUTURE_EXCEPTION.set(exception);
}
/**
* If an Assertion is caught while doing some work in a Lambda function,
* store it in an atomic reference to be thrown later on.
*
* @param ase Assertion Error caught.
*/
public static void setFutureAse(AssertionError ase) {
FUTURE_ASE.set(ase);
}
/**
* throw the caught exception from the atomic reference and also clear the
* atomic reference so that we don't rethrow in another test.
*
* @throws Exception the exception caught.
*/
public static void maybeReThrowFutureException() throws Exception {
if (FUTURE_EXCEPTION.get() != null) {
Exception exceptionToThrow = FUTURE_EXCEPTION.get();
// reset the atomic ref before throwing.
setFutureAse(null);
throw exceptionToThrow;
}
}
/**
* throw the Assertion error from the atomic reference and also clear the
* atomic reference so that we don't rethrow in another test.
*
* @throws Exception Assertion error caught.
*/
public static void maybeReThrowFutureASE() throws Exception {
if (FUTURE_ASE.get() != null) {
AssertionError aseToThrow = FUTURE_ASE.get();
// reset the atomic ref before throwing.
setFutureAse(null);
throw aseToThrow;
}
}
} }

View File

@ -0,0 +1,487 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.StoreStatisticNames;
import org.apache.hadoop.fs.statistics.IOStatisticsContext;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsContextImpl;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.util.functional.CloseableTaskPoolSubmitter;
import org.apache.hadoop.util.functional.TaskPool;
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertCapabilities;
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BYTES;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_WRITE_BYTES;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsContextIntegration.enableIOStatisticsContext;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsContextIntegration.getCurrentIOStatisticsContext;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsContextIntegration.getThreadSpecificIOStatisticsContext;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsContextIntegration.setThreadIOStatisticsContext;
import static org.apache.hadoop.util.functional.RemoteIterators.foreach;
/**
* Tests to verify the Thread-level IOStatistics.
*/
public class ITestS3AIOStatisticsContext extends AbstractS3ATestBase {
private static final int SMALL_THREADS = 2;
private static final int BYTES_BIG = 100;
private static final int BYTES_SMALL = 50;
private static final String[] IOSTATISTICS_CONTEXT_CAPABILITY =
new String[] {StreamCapabilities.IOSTATISTICS_CONTEXT};
private ExecutorService executor;
@Override
protected Configuration createConfiguration() {
Configuration configuration = super.createConfiguration();
enableIOStatisticsContext();
return configuration;
}
@Override
public void setup() throws Exception {
super.setup();
executor = HadoopExecutors.newFixedThreadPool(SMALL_THREADS);
}
@Override
public void teardown() throws Exception {
if (executor != null) {
executor.shutdown();
}
super.teardown();
}
/**
* Verify that S3AInputStream aggregates per thread IOStats collection
* correctly.
*/
@Test
public void testS3AInputStreamIOStatisticsContext()
throws Exception {
S3AFileSystem fs = getFileSystem();
Path path = path(getMethodName());
byte[] data = dataset(256, 'a', 'z');
byte[] readDataFirst = new byte[BYTES_BIG];
byte[] readDataSecond = new byte[BYTES_SMALL];
writeDataset(fs, path, data, data.length, 1024, true);
CountDownLatch latch = new CountDownLatch(SMALL_THREADS);
try {
for (int i = 0; i < SMALL_THREADS; i++) {
executor.submit(() -> {
try {
// get the thread context and reset
IOStatisticsContext context =
getAndResetThreadStatisticsContext();
try (FSDataInputStream in = fs.open(path)) {
// Assert the InputStream's stream capability to support
// IOStatisticsContext.
assertCapabilities(in, IOSTATISTICS_CONTEXT_CAPABILITY, null);
in.seek(50);
in.read(readDataFirst);
}
assertContextBytesRead(context, BYTES_BIG);
// Stream is closed for a thread. Re-open and do more operations.
try (FSDataInputStream in = fs.open(path)) {
in.seek(100);
in.read(readDataSecond);
}
assertContextBytesRead(context, BYTES_BIG + BYTES_SMALL);
latch.countDown();
} catch (Exception e) {
latch.countDown();
setFutureException(e);
LOG.error("An error occurred while doing a task in the thread", e);
} catch (AssertionError ase) {
latch.countDown();
setFutureAse(ase);
throw ase;
}
});
}
// wait for tasks to finish.
latch.await();
} finally {
executor.shutdown();
}
// Check if an Exception or ASE was caught while the test threads were running.
maybeReThrowFutureException();
maybeReThrowFutureASE();
}
/**
* get the thread context and reset.
* @return thread context
*/
private static IOStatisticsContext getAndResetThreadStatisticsContext() {
IOStatisticsContext context =
IOStatisticsContext.getCurrentIOStatisticsContext();
context.reset();
return context;
}
/**
* Verify that S3ABlockOutputStream aggregates per thread IOStats collection
* correctly.
*/
@Test
public void testS3ABlockOutputStreamIOStatisticsContext()
throws Exception {
S3AFileSystem fs = getFileSystem();
Path path = path(getMethodName());
byte[] writeDataFirst = new byte[BYTES_BIG];
byte[] writeDataSecond = new byte[BYTES_SMALL];
final ExecutorService executorService =
HadoopExecutors.newFixedThreadPool(SMALL_THREADS);
CountDownLatch latch = new CountDownLatch(SMALL_THREADS);
try {
for (int i = 0; i < SMALL_THREADS; i++) {
executorService.submit(() -> {
try {
// get the thread context and reset
IOStatisticsContext context =
getAndResetThreadStatisticsContext();
try (FSDataOutputStream out = fs.create(path)) {
// Assert the OutputStream's stream capability to support
// IOStatisticsContext.
assertCapabilities(out, IOSTATISTICS_CONTEXT_CAPABILITY, null);
out.write(writeDataFirst);
}
assertContextBytesWrite(context, BYTES_BIG);
// Stream is closed for a thread. Re-open and do more operations.
try (FSDataOutputStream out = fs.create(path)) {
out.write(writeDataSecond);
}
assertContextBytesWrite(context, BYTES_BIG + BYTES_SMALL);
latch.countDown();
} catch (Exception e) {
latch.countDown();
setFutureException(e);
LOG.error("An error occurred while doing a task in the thread", e);
} catch (AssertionError ase) {
latch.countDown();
setFutureAse(ase);
throw ase;
}
});
}
// wait for tasks to finish.
latch.await();
} finally {
executorService.shutdown();
}
// Check if an Excp or ASE was caught while the test threads were running.
maybeReThrowFutureException();
maybeReThrowFutureASE();
}
/**
* Verify stats collection and aggregation for constructor thread, Junit
* thread and a worker thread.
*/
@Test
public void testThreadIOStatisticsForDifferentThreads()
throws IOException, InterruptedException {
S3AFileSystem fs = getFileSystem();
Path path = path(getMethodName());
byte[] data = new byte[BYTES_BIG];
long threadIdForTest = Thread.currentThread().getId();
IOStatisticsContext context =
getAndResetThreadStatisticsContext();
Assertions.assertThat(((IOStatisticsContextImpl)context).getThreadID())
.describedAs("Thread ID of %s", context)
.isEqualTo(threadIdForTest);
Assertions.assertThat(((IOStatisticsContextImpl)context).getID())
.describedAs("ID of %s", context)
.isGreaterThan(0);
// Write in the Junit thread.
try (FSDataOutputStream out = fs.create(path)) {
out.write(data);
}
// Read in the Junit thread.
try (FSDataInputStream in = fs.open(path)) {
in.read(data);
}
// Worker thread work and wait for it to finish.
TestWorkerThread workerThread = new TestWorkerThread(path, null);
long workerThreadID = workerThread.getId();
workerThread.start();
workerThread.join();
assertThreadStatisticsForThread(threadIdForTest, BYTES_BIG);
assertThreadStatisticsForThread(workerThreadID, BYTES_SMALL);
}
/**
* Verify stats collection and aggregation for constructor thread, Junit
* thread and a worker thread.
*/
@Test
public void testThreadSharingIOStatistics()
throws IOException, InterruptedException {
S3AFileSystem fs = getFileSystem();
Path path = path(getMethodName());
byte[] data = new byte[BYTES_BIG];
long threadIdForTest = Thread.currentThread().getId();
IOStatisticsContext context =
getAndResetThreadStatisticsContext();
// Write in the Junit thread.
try (FSDataOutputStream out = fs.create(path)) {
out.write(data);
}
// Read in the Junit thread.
try (FSDataInputStream in = fs.open(path)) {
in.read(data);
}
// Worker thread will share the same context.
TestWorkerThread workerThread = new TestWorkerThread(path, context);
long workerThreadID = workerThread.getId();
workerThread.start();
workerThread.join();
assertThreadStatisticsForThread(threadIdForTest, BYTES_BIG + BYTES_SMALL);
}
/**
* Test to verify if setting the current IOStatisticsContext removes the
* current context and creates a new instance of it.
*/
@Test
public void testSettingNullIOStatisticsContext() {
IOStatisticsContext ioStatisticsContextBefore =
getCurrentIOStatisticsContext();
// Set the current IOStatisticsContext to null, which should remove the
// context and set a new one.
setThreadIOStatisticsContext(null);
// Get the context again after setting.
IOStatisticsContext ioStatisticsContextAfter =
getCurrentIOStatisticsContext();
//Verify the context ID after setting to null is different than the previous
// one.
Assertions.assertThat(ioStatisticsContextBefore.getID())
.describedAs("A new IOStaticsContext should be set after setting the "
+ "current to null")
.isNotEqualTo(ioStatisticsContextAfter.getID());
}
/**
* Assert bytes written by the statistics context.
*
* @param context statistics context.
* @param bytes expected bytes.
*/
private void assertContextBytesWrite(IOStatisticsContext context,
int bytes) {
verifyStatisticCounterValue(
context.getIOStatistics(),
STREAM_WRITE_BYTES,
bytes);
}
/**
* Assert bytes read by the statistics context.
*
* @param context statistics context.
* @param readBytes expected bytes.
*/
private void assertContextBytesRead(IOStatisticsContext context,
int readBytes) {
verifyStatisticCounterValue(
context.getIOStatistics(),
STREAM_READ_BYTES,
readBytes);
}
/**
* Assert fixed bytes wrote and read for a particular thread ID.
*
* @param testThreadId thread ID.
* @param expectedBytesWrittenAndRead expected bytes.
*/
private void assertThreadStatisticsForThread(long testThreadId,
int expectedBytesWrittenAndRead) {
LOG.info("Thread ID to be asserted: {}", testThreadId);
IOStatisticsContext ioStatisticsContext =
getThreadSpecificIOStatisticsContext(testThreadId);
Assertions.assertThat(ioStatisticsContext)
.describedAs("IOStatisticsContext for %d", testThreadId)
.isNotNull();
IOStatistics ioStatistics = ioStatisticsContext.snapshot();
assertThatStatisticCounter(ioStatistics,
STREAM_WRITE_BYTES)
.describedAs("Bytes written are not as expected for thread : %s",
testThreadId)
.isEqualTo(expectedBytesWrittenAndRead);
assertThatStatisticCounter(ioStatistics,
STREAM_READ_BYTES)
.describedAs("Bytes read are not as expected for thread : %s",
testThreadId)
.isEqualTo(expectedBytesWrittenAndRead);
}
@Test
public void testListingStatisticsContext() throws Throwable {
describe("verify the list operations update on close()");
S3AFileSystem fs = getFileSystem();
Path path = methodPath();
fs.mkdirs(methodPath());
// after all setup, get the reset context
IOStatisticsContext context =
getAndResetThreadStatisticsContext();
IOStatistics ioStatistics = context.getIOStatistics();
fs.listStatus(path);
verifyStatisticCounterValue(ioStatistics,
StoreStatisticNames.OBJECT_LIST_REQUEST,
1);
context.reset();
foreach(fs.listStatusIterator(path), i -> {});
verifyStatisticCounterValue(ioStatistics,
StoreStatisticNames.OBJECT_LIST_REQUEST,
1);
context.reset();
foreach(fs.listLocatedStatus(path), i -> {});
verifyStatisticCounterValue(ioStatistics,
StoreStatisticNames.OBJECT_LIST_REQUEST,
1);
context.reset();
foreach(fs.listFiles(path, true), i -> {});
verifyStatisticCounterValue(ioStatistics,
StoreStatisticNames.OBJECT_LIST_REQUEST,
1);
}
@Test
public void testListingThroughTaskPool() throws Throwable {
describe("verify the list operations are updated through taskpool");
S3AFileSystem fs = getFileSystem();
Path path = methodPath();
fs.mkdirs(methodPath());
// after all setup, get the reset context
IOStatisticsContext context =
getAndResetThreadStatisticsContext();
IOStatistics ioStatistics = context.getIOStatistics();
CloseableTaskPoolSubmitter submitter =
new CloseableTaskPoolSubmitter(executor);
TaskPool.foreach(fs.listStatusIterator(path))
.executeWith(submitter)
.run(i -> {});
verifyStatisticCounterValue(ioStatistics,
StoreStatisticNames.OBJECT_LIST_REQUEST,
1);
}
/**
* Simulating doing some work in a separate thread.
* If constructed with an IOStatisticsContext then
* that context is switched to before performing the IO.
*/
private class TestWorkerThread extends Thread implements Runnable {
private final Path workerThreadPath;
private final IOStatisticsContext ioStatisticsContext;
/**
* create.
* @param workerThreadPath thread path.
* @param ioStatisticsContext optional statistics context *
*/
TestWorkerThread(
final Path workerThreadPath,
final IOStatisticsContext ioStatisticsContext) {
this.workerThreadPath = workerThreadPath;
this.ioStatisticsContext = ioStatisticsContext;
}
@Override
public void run() {
S3AFileSystem fs = getFileSystem();
byte[] data = new byte[BYTES_SMALL];
// maybe switch context
if (ioStatisticsContext != null) {
IOStatisticsContext.setThreadIOStatisticsContext(ioStatisticsContext);
}
// Write in the worker thread.
try (FSDataOutputStream out = fs.create(workerThreadPath)) {
out.write(data);
} catch (IOException e) {
throw new UncheckedIOException("Failure while writing", e);
}
//Read in the worker thread.
try (FSDataInputStream in = fs.open(workerThreadPath)) {
in.read(data);
} catch (IOException e) {
throw new UncheckedIOException("Failure while reading", e);
}
}
}
}

View File

@ -24,6 +24,7 @@
import org.apache.hadoop.fs.s3a.commit.PutTracker; import org.apache.hadoop.fs.s3a.commit.PutTracker;
import org.apache.hadoop.fs.s3a.impl.PutObjectOptions; import org.apache.hadoop.fs.s3a.impl.PutObjectOptions;
import org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext; import org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext;
import org.apache.hadoop.fs.statistics.IOStatisticsContext;
import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Progressable;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -67,7 +68,11 @@ private S3ABlockOutputStream.BlockOutputStreamBuilder mockS3ABuilder() {
.withProgress(progressable) .withProgress(progressable)
.withPutTracker(putTracker) .withPutTracker(putTracker)
.withWriteOperations(oHelper) .withWriteOperations(oHelper)
.withPutOptions(PutObjectOptions.keepingDirs()); .withPutOptions(PutObjectOptions.keepingDirs())
.withIOStatisticsAggregator(
IOStatisticsContext.getCurrentIOStatisticsContext()
.getAggregator());
return builder; return builder;
} }

View File

@ -26,6 +26,7 @@
import java.util.List; import java.util.List;
import org.assertj.core.api.Assertions; import org.assertj.core.api.Assertions;
import org.junit.AfterClass;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -38,6 +39,9 @@
import org.apache.hadoop.fs.s3a.AbstractS3ATestBase; import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.commit.files.SuccessData; import org.apache.hadoop.fs.s3a.commit.files.SuccessData;
import org.apache.hadoop.fs.statistics.IOStatisticsLogging;
import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
import org.apache.hadoop.fs.statistics.IOStatisticsSupport;
import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptContext;
@ -66,6 +70,12 @@ public abstract class AbstractCommitITest extends AbstractS3ATestBase {
private static final Logger LOG = private static final Logger LOG =
LoggerFactory.getLogger(AbstractCommitITest.class); LoggerFactory.getLogger(AbstractCommitITest.class);
/**
* Job statistics accrued across all test cases.
*/
private static final IOStatisticsSnapshot JOB_STATISTICS =
IOStatisticsSupport.snapshotIOStatistics();
/** /**
* Helper class for commit operations and assertions. * Helper class for commit operations and assertions.
*/ */
@ -92,7 +102,8 @@ protected Configuration createConfiguration() {
FS_S3A_COMMITTER_NAME, FS_S3A_COMMITTER_NAME,
FS_S3A_COMMITTER_STAGING_CONFLICT_MODE, FS_S3A_COMMITTER_STAGING_CONFLICT_MODE,
FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES, FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES,
FAST_UPLOAD_BUFFER); FAST_UPLOAD_BUFFER,
S3A_COMMITTER_EXPERIMENTAL_COLLECT_IOSTATISTICS);
conf.setBoolean(MAGIC_COMMITTER_ENABLED, DEFAULT_MAGIC_COMMITTER_ENABLED); conf.setBoolean(MAGIC_COMMITTER_ENABLED, DEFAULT_MAGIC_COMMITTER_ENABLED);
conf.setLong(MIN_MULTIPART_THRESHOLD, MULTIPART_MIN_SIZE); conf.setLong(MIN_MULTIPART_THRESHOLD, MULTIPART_MIN_SIZE);
@ -100,9 +111,15 @@ protected Configuration createConfiguration() {
conf.set(FAST_UPLOAD_BUFFER, FAST_UPLOAD_BUFFER_ARRAY); conf.set(FAST_UPLOAD_BUFFER, FAST_UPLOAD_BUFFER_ARRAY);
// and bind the report dir // and bind the report dir
conf.set(OPT_SUMMARY_REPORT_DIR, reportDir.toURI().toString()); conf.set(OPT_SUMMARY_REPORT_DIR, reportDir.toURI().toString());
conf.setBoolean(S3A_COMMITTER_EXPERIMENTAL_COLLECT_IOSTATISTICS, true);
return conf; return conf;
} }
@AfterClass
public static void printStatistics() {
LOG.info("Aggregate job statistics {}\n",
IOStatisticsLogging.ioStatisticsToPrettyString(JOB_STATISTICS));
}
/** /**
* Get the log; can be overridden for test case log. * Get the log; can be overridden for test case log.
* @return a log. * @return a log.
@ -397,6 +414,8 @@ public static SuccessData validateSuccessFile(final Path outputPath,
/** /**
* Load a success file; fail if the file is empty/nonexistent. * Load a success file; fail if the file is empty/nonexistent.
* The statistics in {@link #JOB_STATISTICS} are updated with
* the statistics from the success file
* @param fs filesystem * @param fs filesystem
* @param outputPath directory containing the success file. * @param outputPath directory containing the success file.
* @param origin origin of the file * @param origin origin of the file
@ -426,6 +445,8 @@ public static SuccessData loadSuccessFile(final FileSystem fs,
String body = ContractTestUtils.readUTF8(fs, success, -1); String body = ContractTestUtils.readUTF8(fs, success, -1);
LOG.info("Loading committer success file {}. Actual contents=\n{}", success, LOG.info("Loading committer success file {}. Actual contents=\n{}", success,
body); body);
return SuccessData.load(fs, success); SuccessData successData = SuccessData.load(fs, success);
JOB_STATISTICS.aggregate(successData.getIOStatistics());
return successData;
} }
} }

View File

@ -34,6 +34,7 @@
import org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants; import org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants;
import org.apache.hadoop.fs.s3a.commit.impl.CommitContext; import org.apache.hadoop.fs.s3a.commit.impl.CommitContext;
import org.apache.hadoop.fs.s3a.commit.impl.CommitOperations; import org.apache.hadoop.fs.s3a.commit.impl.CommitOperations;
import org.apache.hadoop.fs.statistics.IOStatisticsContext;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
import static org.apache.hadoop.fs.s3a.commit.staging.StagingTestBase.*; import static org.apache.hadoop.fs.s3a.commit.staging.StagingTestBase.*;
@ -92,7 +93,8 @@ protected void verifyFailureConflictOutcome() throws Exception {
// this is done by calling the preCommit method directly, // this is done by calling the preCommit method directly,
final CommitContext commitContext = new CommitOperations(getWrapperFS()). final CommitContext commitContext = new CommitOperations(getWrapperFS()).
createCommitContext(getJob(), getOutputPath(), 0); createCommitContext(getJob(), getOutputPath(), 0,
IOStatisticsContext.getCurrentIOStatisticsContext());
committer.preCommitJob(commitContext, AbstractS3ACommitter.ActiveCommit.empty()); committer.preCommitJob(commitContext, AbstractS3ACommitter.ActiveCommit.empty());
reset(mockFS); reset(mockFS);

View File

@ -37,6 +37,7 @@
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.CONFLICT_MODE_APPEND; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.CONFLICT_MODE_APPEND;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_CONFLICT_MODE; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_CONFLICT_MODE;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.S3A_COMMITTER_EXPERIMENTAL_COLLECT_IOSTATISTICS;
/** ITest of the low level protocol methods. */ /** ITest of the low level protocol methods. */
public class ITestDirectoryCommitProtocol extends ITestStagingCommitProtocol { public class ITestDirectoryCommitProtocol extends ITestStagingCommitProtocol {
@ -51,6 +52,14 @@ protected String getCommitterName() {
return CommitConstants.COMMITTER_NAME_DIRECTORY; return CommitConstants.COMMITTER_NAME_DIRECTORY;
} }
@Override
protected Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
// turn off stats collection to verify that it works
conf.setBoolean(S3A_COMMITTER_EXPERIMENTAL_COLLECT_IOSTATISTICS, false);
return conf;
}
@Override @Override
protected AbstractS3ACommitter createCommitter( protected AbstractS3ACommitter createCommitter(
Path outputPath, Path outputPath,