From 1f71c4ae71427a8a7476eaef64187a5643596552 Mon Sep 17 00:00:00 2001 From: Ahmed Hussein Date: Wed, 15 Jul 2020 11:39:06 -0500 Subject: [PATCH] HADOOP-17099. Replace Guava Predicate with Java8+ Predicate Signed-off-by: Jonathan Eagles --- .../main/resources/checkstyle/checkstyle.xml | 2 +- .../hadoop/metrics2/impl/MetricsRecords.java | 26 ++++--- .../metrics2/impl/TestMetricsSystemImpl.java | 30 +++---- .../CombinedHostFileManager.java | 78 +++++++------------ .../namenode/NameNodeResourceChecker.java | 25 +++--- .../server/namenode/snapshot/Snapshot.java | 19 ++--- .../logaggregation/AggregatedLogFormat.java | 12 +-- .../LogAggregationFileController.java | 23 ++---- .../LogAggregationIndexedFileController.java | 21 ++--- .../logaggregation/AppLogAggregatorImpl.java | 16 +--- 10 files changed, 91 insertions(+), 161 deletions(-) diff --git a/hadoop-build-tools/src/main/resources/checkstyle/checkstyle.xml b/hadoop-build-tools/src/main/resources/checkstyle/checkstyle.xml index e0a55f70a6..3e6ae7a927 100644 --- a/hadoop-build-tools/src/main/resources/checkstyle/checkstyle.xml +++ b/hadoop-build-tools/src/main/resources/checkstyle/checkstyle.xml @@ -123,7 +123,7 @@ + value="^com\.google\.common\.base\.(Optional|Function|Predicate), ^com\.google\.common\.collect\.(ImmutableListMultimap)"/> diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/MetricsRecords.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/MetricsRecords.java index 5d52cad66b..786571441f 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/MetricsRecords.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/MetricsRecords.java @@ -18,8 +18,8 @@ package org.apache.hadoop.metrics2.impl; -import com.google.common.base.Predicate; -import com.google.common.collect.Iterables; +import java.util.function.Predicate; +import java.util.stream.StreamSupport; import org.apache.hadoop.metrics2.AbstractMetric; import org.apache.hadoop.metrics2.MetricsRecord; import org.apache.hadoop.metrics2.MetricsTag; @@ -65,16 +65,22 @@ public static void assertMetricNotNull(MetricsRecord record, resourceLimitMetric); } - private static MetricsTag getFirstTagByName(MetricsRecord record, String name) { - return Iterables.getFirst(Iterables.filter(record.tags(), - new MetricsTagPredicate(name)), null); + private static MetricsTag getFirstTagByName(MetricsRecord record, + String name) { + if (record.tags() == null) { + return null; + } + return record.tags().stream().filter( + new MetricsTagPredicate(name)).findFirst().orElse(null); } private static AbstractMetric getFirstMetricByName( MetricsRecord record, String name) { - return Iterables.getFirst( - Iterables.filter(record.metrics(), new AbstractMetricPredicate(name)), - null); + if (record.metrics() == null) { + return null; + } + return StreamSupport.stream(record.metrics().spliterator(), false) + .filter(new AbstractMetricPredicate(name)).findFirst().orElse(null); } private static class MetricsTagPredicate implements Predicate { @@ -86,7 +92,7 @@ public MetricsTagPredicate(String tagName) { } @Override - public boolean apply(MetricsTag input) { + public boolean test(MetricsTag input) { return input.name().equals(tagName); } } @@ -101,7 +107,7 @@ public AbstractMetricPredicate( } @Override - public boolean apply(AbstractMetric input) { + public boolean test(AbstractMetric input) { return input.name().equals(metricName); } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java index 1b40a17bdd..fd6f1672f9 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java @@ -23,9 +23,7 @@ import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; - -import javax.annotation.Nullable; - +import java.util.stream.StreamSupport; import org.junit.Test; import org.junit.runner.RunWith; @@ -38,7 +36,6 @@ import static org.junit.Assert.*; import static org.mockito.Mockito.*; -import com.google.common.base.Predicate; import com.google.common.base.Supplier; import com.google.common.collect.Iterables; @@ -59,7 +56,6 @@ import org.apache.hadoop.metrics2.lib.MutableCounterLong; import org.apache.hadoop.metrics2.lib.MutableRate; import org.apache.hadoop.metrics2.lib.MutableGaugeLong; -import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -246,13 +242,9 @@ public void run() { for (Thread t : threads) t.join(); assertEquals(0L, ms.droppedPubAll.value()); - assertTrue(StringUtils.join("\n", Arrays.asList(results)), - Iterables.all(Arrays.asList(results), new Predicate() { - @Override - public boolean apply(@Nullable String input) { - return input.equalsIgnoreCase("Passed"); - } - })); + assertTrue(String.join("\n", Arrays.asList(results)), + Arrays.asList(results).stream().allMatch( + input -> input.equalsIgnoreCase("Passed"))); ms.stop(); ms.shutdown(); } @@ -482,14 +474,12 @@ public Object answer(InvocationOnMock invocation) throws Throwable { ms.onTimerEvent(); verify(dataSink, timeout(500).times(2)).putMetrics(r1.capture()); List mr = r1.getAllValues(); - Number qSize = Iterables.find(mr.get(1).metrics(), - new Predicate() { - @Override - public boolean apply(@Nullable AbstractMetric input) { - assert input != null; - return input.name().equals("Sink_slowSinkQsize"); - } - }).value(); + Number qSize = StreamSupport.stream(mr.get(1).metrics().spliterator(), + false).filter( + input -> { + assert input != null; + return input.name().equals("Sink_slowSinkQsize"); + }).findFirst().get().value(); assertEquals(1, qSize); } finally { proceedSignal.countDown(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CombinedHostFileManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CombinedHostFileManager.java index d607789420..9a43014a76 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CombinedHostFileManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CombinedHostFileManager.java @@ -21,9 +21,8 @@ import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; import com.google.common.collect.UnmodifiableIterator; -import com.google.common.collect.Iterables; -import com.google.common.collect.Collections2; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -40,7 +39,7 @@ import java.util.Iterator; import java.util.Map; -import com.google.common.base.Predicate; + import org.apache.hadoop.hdfs.util.CombinedHostsFileReader; @@ -82,37 +81,26 @@ synchronized void add(InetAddress addr, // If the includes list is empty, act as if everything is in the // includes list. synchronized boolean isIncluded(final InetSocketAddress address) { - return emptyInServiceNodeLists || Iterables.any( - allDNs.get(address.getAddress()), - new Predicate() { - public boolean apply(DatanodeAdminProperties input) { - return input.getPort() == 0 || - input.getPort() == address.getPort(); - } - }); + return emptyInServiceNodeLists || allDNs.get(address.getAddress()) + .stream().anyMatch( + input -> input.getPort() == 0 || + input.getPort() == address.getPort()); } synchronized boolean isExcluded(final InetSocketAddress address) { - return Iterables.any(allDNs.get(address.getAddress()), - new Predicate() { - public boolean apply(DatanodeAdminProperties input) { - return input.getAdminState().equals( - AdminStates.DECOMMISSIONED) && - (input.getPort() == 0 || - input.getPort() == address.getPort()); - } - }); + return allDNs.get(address.getAddress()).stream().anyMatch( + input -> input.getAdminState().equals( + AdminStates.DECOMMISSIONED) && + (input.getPort() == 0 || + input.getPort() == address.getPort())); } synchronized String getUpgradeDomain(final InetSocketAddress address) { - Iterable datanode = Iterables.filter( - allDNs.get(address.getAddress()), - new Predicate() { - public boolean apply(DatanodeAdminProperties input) { - return (input.getPort() == 0 || - input.getPort() == address.getPort()); - } - }); + Iterable datanode = + allDNs.get(address.getAddress()).stream().filter( + input -> (input.getPort() == 0 || + input.getPort() == address.getPort())).collect( + Collectors.toList()); return datanode.iterator().hasNext() ? datanode.iterator().next().getUpgradeDomain() : null; } @@ -127,36 +115,22 @@ public Iterator iterator() { } Iterable getExcludes() { - return new Iterable() { - @Override - public Iterator iterator() { - return new HostIterator( - Collections2.filter(allDNs.entries(), - new Predicate>() { - public boolean apply(java.util.Map.Entry entry) { - return entry.getValue().getAdminState().equals( - AdminStates.DECOMMISSIONED); - } - } - )); - } - }; + return () -> new HostIterator( + allDNs.entries().stream().filter( + entry -> entry.getValue().getAdminState().equals( + AdminStates.DECOMMISSIONED)).collect( + Collectors.toList())); } synchronized long getMaintenanceExpireTimeInMS( final InetSocketAddress address) { - Iterable datanode = Iterables.filter( - allDNs.get(address.getAddress()), - new Predicate() { - public boolean apply(DatanodeAdminProperties input) { - return input.getAdminState().equals( + Iterable datanode = + allDNs.get(address.getAddress()).stream().filter( + input -> input.getAdminState().equals( AdminStates.IN_MAINTENANCE) && (input.getPort() == 0 || - input.getPort() == address.getPort()); - } - }); + input.getPort() == address.getPort())).collect( + Collectors.toList()); // if DN isn't set to maintenance state, ignore MaintenanceExpireTimeInMS // set in the config. return datanode.iterator().hasNext() ? diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeResourceChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeResourceChecker.java index d0245d8035..de8734f15e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeResourceChecker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeResourceChecker.java @@ -24,7 +24,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.Map; - +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -34,8 +34,6 @@ import org.apache.hadoop.hdfs.server.common.Util; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Collections2; -import com.google.common.base.Predicate; /** * @@ -116,18 +114,15 @@ public NameNodeResourceChecker(Configuration conf) throws IOException { Collection extraCheckedVolumes = Util.stringCollectionAsURIs(conf .getTrimmedStringCollection(DFSConfigKeys.DFS_NAMENODE_CHECKED_VOLUMES_KEY)); - - Collection localEditDirs = Collections2.filter( - FSNamesystem.getNamespaceEditsDirs(conf), - new Predicate() { - @Override - public boolean apply(URI input) { - if (input.getScheme().equals(NNStorage.LOCAL_URI_SCHEME)) { - return true; - } - return false; - } - }); + + Collection localEditDirs = + FSNamesystem.getNamespaceEditsDirs(conf).stream().filter( + input -> { + if (input.getScheme().equals(NNStorage.LOCAL_URI_SCHEME)) { + return true; + } + return false; + }).collect(Collectors.toList()); // Add all the local edits dirs, marking some as required if they are // configured as such. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/Snapshot.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/Snapshot.java index 515f164bd8..f13ef61134 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/Snapshot.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/Snapshot.java @@ -24,7 +24,7 @@ import java.util.Arrays; import java.util.Comparator; import java.util.Date; - +import java.util.stream.Collectors; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSUtil; @@ -38,9 +38,6 @@ import org.apache.hadoop.hdfs.server.namenode.XAttrFeature; import org.apache.hadoop.hdfs.util.ReadOnlyList; -import com.google.common.base.Predicate; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; import org.apache.hadoop.security.AccessControlException; /** Snapshot of a sub-tree in the namesystem. */ @@ -149,20 +146,14 @@ static Snapshot read(DataInput in, FSImageFormat.Loader loader) static public class Root extends INodeDirectory { Root(INodeDirectory other) { // Always preserve ACL, XAttr. - super(other, false, Lists.newArrayList( - Iterables.filter(Arrays.asList(other.getFeatures()), new Predicate() { - - @Override - public boolean apply(Feature input) { - if (AclFeature.class.isInstance(input) + super(other, false, Arrays.asList(other.getFeatures()).stream().filter( + input -> { + if (AclFeature.class.isInstance(input) || XAttrFeature.class.isInstance(input)) { return true; } return false; - } - - })) - .toArray(new Feature[0])); + }).collect(Collectors.toList()).toArray(new Feature[0])); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java index 4d0beaa3b9..a8f19499ca 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java @@ -74,7 +74,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Predicate; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; @@ -355,14 +354,9 @@ private Set getFileCandidates(Set candidates, : this.logAggregationContext.getRolledLogsExcludePattern(), candidates, true); - Iterable mask = - Iterables.filter(candidates, new Predicate() { - @Override - public boolean apply(File next) { - return !alreadyUploadedLogFiles - .contains(getLogFileMetaData(next)); - } - }); + Iterable mask = Iterables.filter(candidates, (input) -> + !alreadyUploadedLogFiles + .contains(getLogFileMetaData(input))); return Sets.newHashSet(mask); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java index ab6eb61320..2bf5f4e6a4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java @@ -19,9 +19,7 @@ package org.apache.hadoop.yarn.logaggregation.filecontroller; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Predicate; -import com.google.common.collect.Iterables; -import com.google.common.collect.Sets; + import java.io.FileNotFoundException; import java.io.IOException; import java.io.OutputStream; @@ -35,7 +33,7 @@ import java.util.List; import java.util.Map; import java.util.Set; - +import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; @@ -532,17 +530,12 @@ protected void cleanOldLogs(Path remoteNodeLogFileForApp, Set status = new HashSet(Arrays.asList(remoteFS.listStatus(appDir))); - Iterable mask = - Iterables.filter(status, new Predicate() { - @Override - public boolean apply(FileStatus next) { - return next.getPath().getName() - .contains(LogAggregationUtils.getNodeString(nodeId)) - && !next.getPath().getName().endsWith( - LogAggregationUtils.TMP_FILE_SUFFIX); - } - }); - status = Sets.newHashSet(mask); + status = status.stream().filter( + next -> next.getPath().getName() + .contains(LogAggregationUtils.getNodeString(nodeId)) + && !next.getPath().getName().endsWith( + LogAggregationUtils.TMP_FILE_SUFFIX)).collect( + Collectors.toSet()); // Normally, we just need to delete one oldest log // before we upload a new log. // If we can not delete the older logs in this cycle, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java index e94a92a6e2..20c112b85b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java @@ -19,9 +19,7 @@ package org.apache.hadoop.yarn.logaggregation.filecontroller.ifile; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Predicate; -import com.google.common.collect.Iterables; -import com.google.common.collect.Sets; + import java.io.File; import java.io.FileInputStream; import java.io.IOException; @@ -43,6 +41,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.stream.Collectors; import org.apache.commons.lang3.SerializationUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -706,16 +705,12 @@ public int compare(ContainerLogMeta o1, ContainerLogMeta o2) { public Map parseCheckSumFiles( List fileList) throws IOException { Map checkSumFiles = new HashMap<>(); - Set status = new HashSet(fileList); - Iterable mask = - Iterables.filter(status, new Predicate() { - @Override - public boolean apply(FileStatus next) { - return next.getPath().getName().endsWith( - CHECK_SUM_FILE_SUFFIX); - } - }); - status = Sets.newHashSet(mask); + Set status = + new HashSet<>(fileList).stream().filter( + next -> next.getPath().getName().endsWith( + CHECK_SUM_FILE_SUFFIX)).collect( + Collectors.toSet()); + FileContext fc = null; for (FileStatus file : status) { FSDataInputStream checksumFileInputStream = null; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java index 245dc103e9..bcb4054a55 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java @@ -28,6 +28,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,8 +73,6 @@ import org.apache.hadoop.yarn.util.Times; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Predicate; -import com.google.common.collect.Iterables; import com.google.common.collect.Sets; @@ -663,16 +662,9 @@ public Set doContainerLogAggregation( .getCurrentUpLoadedFileMeta()); // if any of the previous uploaded logs have been deleted, // we need to remove them from alreadyUploadedLogs - Iterable mask = - Iterables.filter(uploadedFileMeta, new Predicate() { - @Override - public boolean apply(String next) { - return logValue.getAllExistingFilesMeta().contains(next); - } - }); - - this.uploadedFileMeta = Sets.newHashSet(mask); - + this.uploadedFileMeta = uploadedFileMeta.stream().filter( + next -> logValue.getAllExistingFilesMeta().contains(next)).collect( + Collectors.toSet()); // need to return files uploaded or older-than-retention clean up. return Sets.union(logValue.getCurrentUpLoadedFilesPath(), logValue.getObsoleteRetentionLogFiles());