HADOOP-19203. WrappedIO BulkDelete API to raise IOEs as UncheckedIOExceptions (#6885)
* WrappedIO methods raise UncheckedIOExceptions *New class org.apache.hadoop.util.functional.FunctionalIO with wrap/unwrap and the ability to generate a java.util.function.Supplier around a CallableRaisingIOE. Contributed by Steve Loughran
This commit is contained in:
parent
6545b7eeef
commit
8ac9c1839a
@ -18,7 +18,7 @@
|
|||||||
|
|
||||||
package org.apache.hadoop.io.wrappedio;
|
package org.apache.hadoop.io.wrappedio;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.UncheckedIOException;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
@ -29,17 +29,19 @@
|
|||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.util.functional.FunctionalIO.uncheckIOExceptions;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Reflection-friendly access to APIs which are not available in
|
* Reflection-friendly access to APIs which are not available in
|
||||||
* some of the older Hadoop versions which libraries still
|
* some of the older Hadoop versions which libraries still
|
||||||
* compile against.
|
* compile against.
|
||||||
* <p>
|
* <p>
|
||||||
* The intent is to avoid the need for complex reflection operations
|
* The intent is to avoid the need for complex reflection operations
|
||||||
* including wrapping of parameter classes, direct instatiation of
|
* including wrapping of parameter classes, direct instantiation of
|
||||||
* new classes etc.
|
* new classes etc.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Public
|
@InterfaceAudience.Public
|
||||||
@InterfaceStability.Evolving
|
@InterfaceStability.Unstable
|
||||||
public final class WrappedIO {
|
public final class WrappedIO {
|
||||||
|
|
||||||
private WrappedIO() {
|
private WrappedIO() {
|
||||||
@ -52,12 +54,15 @@ private WrappedIO() {
|
|||||||
* @return a number greater than or equal to zero.
|
* @return a number greater than or equal to zero.
|
||||||
* @throws UnsupportedOperationException bulk delete under that path is not supported.
|
* @throws UnsupportedOperationException bulk delete under that path is not supported.
|
||||||
* @throws IllegalArgumentException path not valid.
|
* @throws IllegalArgumentException path not valid.
|
||||||
* @throws IOException problems resolving paths
|
* @throws UncheckedIOException if an IOE was raised.
|
||||||
*/
|
*/
|
||||||
public static int bulkDelete_pageSize(FileSystem fs, Path path) throws IOException {
|
public static int bulkDelete_pageSize(FileSystem fs, Path path) {
|
||||||
|
|
||||||
|
return uncheckIOExceptions(() -> {
|
||||||
try (BulkDelete bulk = fs.createBulkDelete(path)) {
|
try (BulkDelete bulk = fs.createBulkDelete(path)) {
|
||||||
return bulk.pageSize();
|
return bulk.pageSize();
|
||||||
}
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -79,15 +84,17 @@ public static int bulkDelete_pageSize(FileSystem fs, Path path) throws IOExcepti
|
|||||||
* @param paths list of paths which must be absolute and under the base path.
|
* @param paths list of paths which must be absolute and under the base path.
|
||||||
* @return a list of all the paths which couldn't be deleted for a reason other than "not found" and any associated error message.
|
* @return a list of all the paths which couldn't be deleted for a reason other than "not found" and any associated error message.
|
||||||
* @throws UnsupportedOperationException bulk delete under that path is not supported.
|
* @throws UnsupportedOperationException bulk delete under that path is not supported.
|
||||||
* @throws IOException IO problems including networking, authentication and more.
|
* @throws UncheckedIOException if an IOE was raised.
|
||||||
* @throws IllegalArgumentException if a path argument is invalid.
|
* @throws IllegalArgumentException if a path argument is invalid.
|
||||||
*/
|
*/
|
||||||
public static List<Map.Entry<Path, String>> bulkDelete_delete(FileSystem fs,
|
public static List<Map.Entry<Path, String>> bulkDelete_delete(FileSystem fs,
|
||||||
Path base,
|
Path base,
|
||||||
Collection<Path> paths)
|
Collection<Path> paths) {
|
||||||
throws IOException {
|
|
||||||
|
return uncheckIOExceptions(() -> {
|
||||||
try (BulkDelete bulk = fs.createBulkDelete(base)) {
|
try (BulkDelete bulk = fs.createBulkDelete(base)) {
|
||||||
return bulk.bulkDelete(paths);
|
return bulk.bulkDelete(paths);
|
||||||
}
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -41,7 +41,7 @@
|
|||||||
* raised by the callable and wrapping them as appropriate.
|
* raised by the callable and wrapping them as appropriate.
|
||||||
* @param <T> return type.
|
* @param <T> return type.
|
||||||
*/
|
*/
|
||||||
public final class CommonCallableSupplier<T> implements Supplier {
|
public final class CommonCallableSupplier<T> implements Supplier<T> {
|
||||||
|
|
||||||
private static final Logger LOG =
|
private static final Logger LOG =
|
||||||
LoggerFactory.getLogger(CommonCallableSupplier.class);
|
LoggerFactory.getLogger(CommonCallableSupplier.class);
|
||||||
@ -57,7 +57,7 @@ public CommonCallableSupplier(final Callable<T> call) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Object get() {
|
public T get() {
|
||||||
try {
|
try {
|
||||||
return call.call();
|
return call.call();
|
||||||
} catch (RuntimeException e) {
|
} catch (RuntimeException e) {
|
||||||
@ -155,4 +155,5 @@ public static void maybeAwaitCompletion(
|
|||||||
waitForCompletion(future);
|
waitForCompletion(future);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,99 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.util.functional;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.UncheckedIOException;
|
||||||
|
import java.util.function.Supplier;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Functional utilities for IO operations.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public final class FunctionalIO {
|
||||||
|
|
||||||
|
private FunctionalIO() {
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Invoke any operation, wrapping IOExceptions with
|
||||||
|
* {@code UncheckedIOException}.
|
||||||
|
* @param call callable
|
||||||
|
* @param <T> type of result
|
||||||
|
* @return result
|
||||||
|
* @throws UncheckedIOException if an IOE was raised.
|
||||||
|
*/
|
||||||
|
public static <T> T uncheckIOExceptions(CallableRaisingIOE<T> call) {
|
||||||
|
try {
|
||||||
|
return call.apply();
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new UncheckedIOException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wrap a {@link CallableRaisingIOE} as a {@link Supplier}.
|
||||||
|
* This is similar to {@link CommonCallableSupplier}, except that
|
||||||
|
* only IOExceptions are caught and wrapped; all other exceptions are
|
||||||
|
* propagated unchanged.
|
||||||
|
* @param <T> type of result
|
||||||
|
*/
|
||||||
|
private static final class UncheckedIOExceptionSupplier<T> implements Supplier<T> {
|
||||||
|
|
||||||
|
private final CallableRaisingIOE<T> call;
|
||||||
|
|
||||||
|
private UncheckedIOExceptionSupplier(CallableRaisingIOE<T> call) {
|
||||||
|
this.call = call;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public T get() {
|
||||||
|
return uncheckIOExceptions(call);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wrap a {@link CallableRaisingIOE} as a {@link Supplier}.
|
||||||
|
* @param call call to wrap
|
||||||
|
* @param <T> type of result
|
||||||
|
* @return a supplier which invokes the call.
|
||||||
|
*/
|
||||||
|
public static <T> Supplier<T> toUncheckedIOExceptionSupplier(CallableRaisingIOE<T> call) {
|
||||||
|
return new UncheckedIOExceptionSupplier<>(call);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Invoke the supplier, catching any {@code UncheckedIOException} raised,
|
||||||
|
* extracting the inner IOException and rethrowing it.
|
||||||
|
* @param call call to invoke
|
||||||
|
* @param <T> type of result
|
||||||
|
* @return result
|
||||||
|
* @throws IOException if the call raised an IOException wrapped by an UncheckedIOException.
|
||||||
|
*/
|
||||||
|
public static <T> T extractIOExceptions(Supplier<T> call) throws IOException {
|
||||||
|
try {
|
||||||
|
return call.get();
|
||||||
|
} catch (UncheckedIOException e) {
|
||||||
|
throw e.getCause();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,97 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.util.functional;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.UncheckedIOException;
|
||||||
|
|
||||||
|
import org.assertj.core.api.Assertions;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import org.apache.hadoop.test.AbstractHadoopTestBase;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
||||||
|
import static org.apache.hadoop.util.functional.FunctionalIO.extractIOExceptions;
|
||||||
|
import static org.apache.hadoop.util.functional.FunctionalIO.toUncheckedIOExceptionSupplier;
|
||||||
|
import static org.apache.hadoop.util.functional.FunctionalIO.uncheckIOExceptions;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test the functional IO class.
|
||||||
|
*/
|
||||||
|
public class TestFunctionalIO extends AbstractHadoopTestBase {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verify that IOEs are caught and wrapped.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testUncheckIOExceptions() throws Throwable {
|
||||||
|
final IOException raised = new IOException("text");
|
||||||
|
final UncheckedIOException ex = intercept(UncheckedIOException.class, "text", () ->
|
||||||
|
uncheckIOExceptions(() -> {
|
||||||
|
throw raised;
|
||||||
|
}));
|
||||||
|
Assertions.assertThat(ex.getCause())
|
||||||
|
.describedAs("Cause of %s", ex)
|
||||||
|
.isSameAs(raised);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verify that UncheckedIOEs are not double wrapped.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testUncheckIOExceptionsUnchecked() throws Throwable {
|
||||||
|
final UncheckedIOException raised = new UncheckedIOException(
|
||||||
|
new IOException("text"));
|
||||||
|
final UncheckedIOException ex = intercept(UncheckedIOException.class, "text", () ->
|
||||||
|
uncheckIOExceptions(() -> {
|
||||||
|
throw raised;
|
||||||
|
}));
|
||||||
|
Assertions.assertThat(ex)
|
||||||
|
.describedAs("Propagated Exception %s", ex)
|
||||||
|
.isSameAs(raised);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Supplier will also wrap IOEs.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testUncheckedSupplier() throws Throwable {
|
||||||
|
intercept(UncheckedIOException.class, "text", () ->
|
||||||
|
toUncheckedIOExceptionSupplier(() -> {
|
||||||
|
throw new IOException("text");
|
||||||
|
}).get());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The wrap/unwrap code which will be used to invoke operations
|
||||||
|
* through reflection.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testUncheckAndExtract() throws Throwable {
|
||||||
|
final IOException raised = new IOException("text");
|
||||||
|
final IOException ex = intercept(IOException.class, "text", () ->
|
||||||
|
extractIOExceptions(toUncheckedIOExceptionSupplier(() -> {
|
||||||
|
throw raised;
|
||||||
|
})));
|
||||||
|
Assertions.assertThat(ex)
|
||||||
|
.describedAs("Propagated Exception %s", ex)
|
||||||
|
.isSameAs(raised);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user