HADOOP-17101. Replace Guava Function with Java8+ Function
Signed-off-by: Jonathan Eagles <jeagles@gmail.com>
(cherry picked from commit 98fcffe93f
)
This commit is contained in:
parent
292e22578a
commit
5969922305
@ -123,7 +123,7 @@
|
|||||||
<property name="regexp" value="true"/>
|
<property name="regexp" value="true"/>
|
||||||
<property name="illegalPkgs" value="^sun\.[^.]+"/>
|
<property name="illegalPkgs" value="^sun\.[^.]+"/>
|
||||||
<property name="illegalClasses"
|
<property name="illegalClasses"
|
||||||
value="^com\.google\.common\.base\.(Optional)"/>
|
value="^com\.google\.common\.base\.(Optional|Function), ^com\.google\.common\.collect\.(ImmutableListMultimap)"/>
|
||||||
</module>
|
</module>
|
||||||
<module name="RedundantImport"/>
|
<module name="RedundantImport"/>
|
||||||
<module name="UnusedImports"/>
|
<module name="UnusedImports"/>
|
||||||
|
@ -17,15 +17,12 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.blockmanagement;
|
package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||||
|
|
||||||
import com.google.common.base.Function;
|
|
||||||
import com.google.common.base.Joiner;
|
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import com.google.common.collect.HashMultimap;
|
import com.google.common.collect.HashMultimap;
|
||||||
import com.google.common.collect.Iterators;
|
|
||||||
import com.google.common.collect.Multimap;
|
import com.google.common.collect.Multimap;
|
||||||
import com.google.common.collect.UnmodifiableIterator;
|
import com.google.common.collect.UnmodifiableIterator;
|
||||||
|
|
||||||
import javax.annotation.Nullable;
|
|
||||||
import java.net.InetAddress;
|
import java.net.InetAddress;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
@ -101,14 +98,16 @@ public InetSocketAddress next() {
|
|||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
StringBuilder sb = new StringBuilder("HostSet(");
|
StringBuilder sb = new StringBuilder("HostSet(");
|
||||||
Joiner.on(",").appendTo(sb, Iterators.transform(iterator(),
|
Iterator<InetSocketAddress> iter = iterator();
|
||||||
new Function<InetSocketAddress, String>() {
|
String sep = "";
|
||||||
@Override
|
while (iter.hasNext()) {
|
||||||
public String apply(@Nullable InetSocketAddress addr) {
|
InetSocketAddress addr = iter.next();
|
||||||
assert addr != null;
|
sb.append(sep);
|
||||||
return addr.getAddress().getHostAddress() + ":" + addr.getPort();
|
sb.append(addr.getAddress().getHostAddress());
|
||||||
}
|
sb.append(':');
|
||||||
}));
|
sb.append(addr.getPort());
|
||||||
return sb.append(")").toString();
|
sep = ",";
|
||||||
|
}
|
||||||
|
return sb.append(')').toString();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -24,8 +24,10 @@
|
|||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.PriorityQueue;
|
import java.util.PriorityQueue;
|
||||||
import java.util.SortedSet;
|
import java.util.SortedSet;
|
||||||
import java.util.concurrent.CopyOnWriteArrayList;
|
import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
@ -38,13 +40,9 @@
|
|||||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
|
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
|
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import com.google.common.collect.ImmutableList;
|
|
||||||
import com.google.common.collect.ImmutableListMultimap;
|
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import com.google.common.collect.Multimaps;
|
|
||||||
import com.google.common.collect.Sets;
|
import com.google.common.collect.Sets;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -634,7 +632,7 @@ public void apply(JournalAndStream jas) throws IOException {
|
|||||||
*/
|
*/
|
||||||
public synchronized RemoteEditLogManifest getEditLogManifest(long fromTxId) {
|
public synchronized RemoteEditLogManifest getEditLogManifest(long fromTxId) {
|
||||||
// Collect RemoteEditLogs available from each FileJournalManager
|
// Collect RemoteEditLogs available from each FileJournalManager
|
||||||
List<RemoteEditLog> allLogs = Lists.newArrayList();
|
List<RemoteEditLog> allLogs = new ArrayList<>();
|
||||||
for (JournalAndStream j : journals) {
|
for (JournalAndStream j : journals) {
|
||||||
if (j.getManager() instanceof FileJournalManager) {
|
if (j.getManager() instanceof FileJournalManager) {
|
||||||
FileJournalManager fjm = (FileJournalManager)j.getManager();
|
FileJournalManager fjm = (FileJournalManager)j.getManager();
|
||||||
@ -645,15 +643,17 @@ public synchronized RemoteEditLogManifest getEditLogManifest(long fromTxId) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Group logs by their starting txid
|
// Group logs by their starting txid
|
||||||
ImmutableListMultimap<Long, RemoteEditLog> logsByStartTxId =
|
final Map<Long, List<RemoteEditLog>> logsByStartTxId = new HashMap<>();
|
||||||
Multimaps.index(allLogs, RemoteEditLog.GET_START_TXID);
|
allLogs.forEach(input -> {
|
||||||
|
long key = RemoteEditLog.GET_START_TXID.apply(input);
|
||||||
|
logsByStartTxId.computeIfAbsent(key, k-> new ArrayList<>()).add(input);
|
||||||
|
});
|
||||||
long curStartTxId = fromTxId;
|
long curStartTxId = fromTxId;
|
||||||
|
List<RemoteEditLog> logs = new ArrayList<>();
|
||||||
List<RemoteEditLog> logs = Lists.newArrayList();
|
|
||||||
while (true) {
|
while (true) {
|
||||||
ImmutableList<RemoteEditLog> logGroup = logsByStartTxId.get(curStartTxId);
|
List<RemoteEditLog> logGroup =
|
||||||
|
logsByStartTxId.getOrDefault(curStartTxId, Collections.emptyList());
|
||||||
if (logGroup.isEmpty()) {
|
if (logGroup.isEmpty()) {
|
||||||
// we have a gap in logs - for example because we recovered some old
|
// we have a gap in logs - for example because we recovered some old
|
||||||
// storage directory with ancient logs. Clear out any logs we've
|
// storage directory with ancient logs. Clear out any logs we've
|
||||||
|
@ -17,8 +17,8 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.protocol;
|
package org.apache.hadoop.hdfs.server.protocol;
|
||||||
|
|
||||||
import com.google.common.base.Function;
|
|
||||||
import com.google.common.collect.ComparisonChain;
|
import com.google.common.collect.ComparisonChain;
|
||||||
|
import java.util.function.Function;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||||
|
|
||||||
public class RemoteEditLog implements Comparable<RemoteEditLog> {
|
public class RemoteEditLog implements Comparable<RemoteEditLog> {
|
||||||
@ -82,16 +82,13 @@ public int hashCode() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Guava <code>Function</code> which applies {@link #getStartTxId()}
|
* Java <code>Function</code> which applies {@link #getStartTxId()}
|
||||||
*/
|
*/
|
||||||
public static final Function<RemoteEditLog, Long> GET_START_TXID =
|
public static final Function<RemoteEditLog, Long> GET_START_TXID =
|
||||||
new Function<RemoteEditLog, Long>() {
|
log -> {
|
||||||
@Override
|
|
||||||
public Long apply(RemoteEditLog log) {
|
|
||||||
if (null == log) {
|
if (null == log) {
|
||||||
return HdfsServerConstants.INVALID_TXID;
|
return HdfsServerConstants.INVALID_TXID;
|
||||||
}
|
}
|
||||||
return log.getStartTxId();
|
return log.getStartTxId();
|
||||||
}
|
};
|
||||||
};
|
|
||||||
}
|
}
|
||||||
|
@ -37,9 +37,7 @@
|
|||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
import java.util.concurrent.atomic.LongAccumulator;
|
import java.util.concurrent.atomic.LongAccumulator;
|
||||||
|
|
||||||
import com.google.common.base.Function;
|
|
||||||
import com.google.common.base.Joiner;
|
import com.google.common.base.Joiner;
|
||||||
import com.google.common.collect.Iterables;
|
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
@ -304,15 +302,11 @@ public static void setFailoverConfigurations(Configuration conf, String logicalN
|
|||||||
public static <P extends FailoverProxyProvider<?>> void
|
public static <P extends FailoverProxyProvider<?>> void
|
||||||
setFailoverConfigurations(Configuration conf, String logicalName,
|
setFailoverConfigurations(Configuration conf, String logicalName,
|
||||||
List<InetSocketAddress> nnAddresses, Class<P> classFPP) {
|
List<InetSocketAddress> nnAddresses, Class<P> classFPP) {
|
||||||
setFailoverConfigurations(conf, logicalName,
|
final List<String> addresses = new ArrayList();
|
||||||
Iterables.transform(nnAddresses, new Function<InetSocketAddress, String>() {
|
nnAddresses.forEach(
|
||||||
|
addr -> addresses.add(
|
||||||
// transform the inet address to a simple string
|
"hdfs://" + addr.getHostName() + ":" + addr.getPort()));
|
||||||
@Override
|
setFailoverConfigurations(conf, logicalName, addresses, classFPP);
|
||||||
public String apply(InetSocketAddress addr) {
|
|
||||||
return "hdfs://" + addr.getHostName() + ":" + addr.getPort();
|
|
||||||
}
|
|
||||||
}), classFPP);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static <P extends FailoverProxyProvider<?>>
|
public static <P extends FailoverProxyProvider<?>>
|
||||||
|
@ -23,8 +23,7 @@
|
|||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
import javax.annotation.Nullable;
|
|
||||||
|
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
|
||||||
@ -49,8 +48,6 @@
|
|||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import com.google.common.base.Function;
|
|
||||||
import com.google.common.collect.Iterables;
|
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import com.google.common.collect.Sets;
|
import com.google.common.collect.Sets;
|
||||||
|
|
||||||
@ -353,13 +350,10 @@ public static void verifyFileStatuses(List<Path> expectedPaths,
|
|||||||
List<FileStatus> fetchedStatuses, final FileSystem localFs) {
|
List<FileStatus> fetchedStatuses, final FileSystem localFs) {
|
||||||
Assert.assertEquals(expectedPaths.size(), fetchedStatuses.size());
|
Assert.assertEquals(expectedPaths.size(), fetchedStatuses.size());
|
||||||
|
|
||||||
Iterable<Path> fqExpectedPaths = Iterables.transform(expectedPaths,
|
Iterable<Path> fqExpectedPaths =
|
||||||
new Function<Path, Path>() {
|
expectedPaths.stream().map(
|
||||||
@Override
|
input -> localFs.makeQualified(input)).collect(Collectors.toList());
|
||||||
public Path apply(Path input) {
|
|
||||||
return localFs.makeQualified(input);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
Set<Path> expectedPathSet = Sets.newHashSet(fqExpectedPaths);
|
Set<Path> expectedPathSet = Sets.newHashSet(fqExpectedPaths);
|
||||||
for (FileStatus fileStatus : fetchedStatuses) {
|
for (FileStatus fileStatus : fetchedStatuses) {
|
||||||
@ -374,13 +368,10 @@ public Path apply(Path input) {
|
|||||||
|
|
||||||
|
|
||||||
private void verifySplits(List<String> expected, List<InputSplit> splits) {
|
private void verifySplits(List<String> expected, List<InputSplit> splits) {
|
||||||
Iterable<String> pathsFromSplits = Iterables.transform(splits,
|
Iterable<String> pathsFromSplits =
|
||||||
new Function<InputSplit, String>() {
|
splits.stream().map(
|
||||||
@Override
|
input-> ((FileSplit) input).getPath().toString())
|
||||||
public String apply(@Nullable InputSplit input) {
|
.collect(Collectors.toList());
|
||||||
return ((FileSplit) input).getPath().toString();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
Set<String> expectedSet = Sets.newHashSet(expected);
|
Set<String> expectedSet = Sets.newHashSet(expected);
|
||||||
for (String splitPathString : pathsFromSplits) {
|
for (String splitPathString : pathsFromSplits) {
|
||||||
|
@ -22,7 +22,6 @@
|
|||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
import org.apache.commons.lang3.Range;
|
import org.apache.commons.lang3.Range;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
@ -35,8 +34,6 @@
|
|||||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationsRequestProto;
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationsRequestProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationsRequestProtoOrBuilder;
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationsRequestProtoOrBuilder;
|
||||||
|
|
||||||
import com.google.common.base.Function;
|
|
||||||
import com.google.common.collect.Iterables;
|
|
||||||
import org.apache.hadoop.thirdparty.protobuf.TextFormat;
|
import org.apache.hadoop.thirdparty.protobuf.TextFormat;
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
@ -88,13 +85,8 @@ private void mergeLocalToBuilder() {
|
|||||||
}
|
}
|
||||||
if (applicationStates != null && !applicationStates.isEmpty()) {
|
if (applicationStates != null && !applicationStates.isEmpty()) {
|
||||||
builder.clearApplicationStates();
|
builder.clearApplicationStates();
|
||||||
builder.addAllApplicationStates(Iterables.transform(applicationStates,
|
applicationStates.forEach(input ->
|
||||||
new Function<YarnApplicationState, YarnApplicationStateProto>() {
|
builder.addApplicationStates(ProtoUtils.convertToProtoFormat(input)));
|
||||||
@Override
|
|
||||||
public YarnApplicationStateProto apply(YarnApplicationState input) {
|
|
||||||
return ProtoUtils.convertToProtoFormat(input);
|
|
||||||
}
|
|
||||||
}));
|
|
||||||
}
|
}
|
||||||
if (applicationTags != null && !applicationTags.isEmpty()) {
|
if (applicationTags != null && !applicationTags.isEmpty()) {
|
||||||
builder.clearApplicationTags();
|
builder.clearApplicationTags();
|
||||||
|
Loading…
Reference in New Issue
Block a user