diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java index 946abfbba7..4c4de7f35b 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java @@ -110,7 +110,7 @@ public void destroyPipeline() throws IOException { final RaftGroup group = RatisHelper.newRaftGroup(pipeline); LOG.debug("destroying pipeline:{} with {}", pipeline.getId(), group); callRatisRpc(pipeline.getMachines(), (raftClient, peer) -> raftClient - .groupRemove(group.getGroupId(), peer.getId())); + .groupRemove(group.getGroupId(), true, peer.getId())); } /** diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java index 492be82359..856d1136fa 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java @@ -46,6 +46,11 @@ private HddsConfigKeys() { public static final String HDDS_CONTAINER_REPORT_INTERVAL_DEFAULT = "60s"; + public static final String HDDS_PIPELINE_REPORT_INTERVAL = + "hdds.pipeline.report.interval"; + public static final String HDDS_PIPELINE_REPORT_INTERVAL_DEFAULT = + "60s"; + public static final String HDDS_COMMAND_STATUS_REPORT_INTERVAL = "hdds.command.status.report.interval"; public static final String HDDS_COMMAND_STATUS_REPORT_INTERVAL_DEFAULT = diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java index ef148e5550..777efa70f8 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java @@ -34,7 +34,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Map; -import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; import java.util.List; /** @@ -83,7 +83,7 @@ public Pipeline(String leaderID, HddsProtos.LifeCycleState lifeCycleState, this.type = replicationType; this.factor = replicationFactor; this.id = id; - datanodes = new TreeMap<>(); + datanodes = new ConcurrentHashMap<>(); } @Override @@ -151,9 +151,21 @@ public DatanodeDetails getLeader() { return getDatanodes().get(leaderID); } - public void addMember(DatanodeDetails datanodeDetails) { - datanodes.put(datanodeDetails.getUuid().toString(), - datanodeDetails); + /** + * Adds a datanode to pipeline + * @param datanodeDetails datanode to be added. + * @return true if the dn was not earlier present, false otherwise + */ + public boolean addMember(DatanodeDetails datanodeDetails) { + return datanodes.put(datanodeDetails.getUuid().toString(), + datanodeDetails) == null; + + } + + public void resetPipeline() { + // reset datanodes in pipeline and learn about them through + // pipeline reports on SCM restart + datanodes.clear(); } public Map getDatanodes() { diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/PipelineID.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/PipelineID.java index 473ebc5a99..6e27a71f3f 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/PipelineID.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/PipelineID.java @@ -28,7 +28,7 @@ * in Ratis as RaftGroupId, GroupID is used by the datanodes to initialize * the ratis group they are part of. */ -public class PipelineID { +public final class PipelineID implements Comparable { private UUID id; private RaftGroupId groupId; @@ -42,8 +42,12 @@ public static PipelineID randomId() { return new PipelineID(UUID.randomUUID()); } + public static PipelineID valueOf(UUID id) { + return new PipelineID(id); + } + public static PipelineID valueOf(RaftGroupId groupId) { - return new PipelineID(groupId.getUuid()); + return valueOf(groupId.getUuid()); } public RaftGroupId getRaftGroupID() { @@ -67,6 +71,11 @@ public String toString() { return "pipelineId=" + id; } + @Override + public int compareTo(PipelineID o) { + return this.id.compareTo(o.id); + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index a74124e30e..f7681e8e70 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -224,6 +224,14 @@ received from SCM to SCM. Unit could be defined with postfix (ns,ms,s,m,h,d) + + hdds.pipeline.report.interval + 60000ms + OZONE, PIPELINE, MANAGEMENT + Time interval of the datanode to send pipeline report. Each + datanode periodically send pipeline report to SCM. Unit could be + defined with postfix (ns,ms,s,m,h,d) + ozone.administrators diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/hdds/scm/HddsServerUtil.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/hdds/scm/HddsServerUtil.java index 580d0279ad..d505be379c 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/hdds/scm/HddsServerUtil.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/hdds/scm/HddsServerUtil.java @@ -18,12 +18,15 @@ package org.apache.hadoop.hdds.scm; import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; import java.net.InetSocketAddress; import java.util.HashMap; import java.util.Map; @@ -312,4 +315,22 @@ public static int getContainerPort(Configuration conf) { services.put(OZONE_SCM_SERVICE_ID, serviceInstances); return services; } + + public static String getOzoneDatanodeRatisDirectory(Configuration conf) { + final String ratisDir = File.separator + "ratis"; + String storageDir = conf.get( + OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR); + + if (Strings.isNullOrEmpty(storageDir)) { + storageDir = conf.get(OzoneConfigKeys + .OZONE_METADATA_DIRS); + Preconditions.checkNotNull(storageDir, "ozone.metadata.dirs " + + "cannot be null, Please check your configs."); + storageDir = storageDir.concat(ratisDir); + LOG.warn("Storage directory for Ratis is not configured." + + "Mapping Ratis storage under {}. It is a good idea " + + "to map this to an SSD disk.", storageDir); + } + return storageDir; + } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/PipelineReportPublisher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/PipelineReportPublisher.java new file mode 100644 index 0000000000..e7f4347e9e --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/PipelineReportPublisher.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.container.common.report; + +import com.google.common.base.Preconditions; +import org.apache.commons.lang3.RandomUtils; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; +import org.apache.hadoop.hdds.scm.HddsServerUtil; + +import java.util.concurrent.TimeUnit; + +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL; +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL_DEFAULT; + + +/** + * Publishes Pipeline which will be sent to SCM as part of heartbeat. + * PipelineReport consist of the following information about each containers: + * - pipelineID + * + */ +public class PipelineReportPublisher extends + ReportPublisher { + + private Long pipelineReportInterval = null; + + @Override + protected long getReportFrequency() { + if (pipelineReportInterval == null) { + pipelineReportInterval = getConf().getTimeDuration( + HDDS_PIPELINE_REPORT_INTERVAL, + HDDS_PIPELINE_REPORT_INTERVAL_DEFAULT, + TimeUnit.MILLISECONDS); + + long heartbeatFrequency = HddsServerUtil.getScmHeartbeatInterval( + getConf()); + + Preconditions.checkState( + heartbeatFrequency <= pipelineReportInterval, + HDDS_PIPELINE_REPORT_INTERVAL + + " cannot be configured lower than heartbeat frequency."); + } + // Add a random delay (0~30s) on top of the pipeline report + // interval (60s) so tha the SCM is overwhelmed by the pipeline reports + // sent in sync. + return pipelineReportInterval + getRandomReportDelay(); + } + + private long getRandomReportDelay() { + return RandomUtils.nextLong(0, pipelineReportInterval); + } + + @Override + protected PipelineReportsProto getReport() { + return getContext().getParent().getContainer().getPipelineReport(); + } +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisherFactory.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisherFactory.java index ea89280729..1c456a0519 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisherFactory.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisherFactory.java @@ -19,6 +19,8 @@ import com.google.protobuf.GeneratedMessage; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.protocol.proto. + StorageContainerDatanodeProtocolProtos.PipelineReportsProto; import org.apache.hadoop.hdds.protocol.proto. StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto; import org.apache.hadoop.hdds.protocol.proto @@ -53,6 +55,8 @@ public ReportPublisherFactory(Configuration conf) { ContainerReportPublisher.class); report2publisher.put(CommandStatusReportsProto.class, CommandStatusReportPublisher.class); + report2publisher.put(PipelineReportsProto.class, + PipelineReportPublisher.class); } /** diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java index ccab0956e7..690aa015b1 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java @@ -21,6 +21,8 @@ import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; import org.apache.hadoop.ozone.container.common.statemachine .EndpointStateMachine; import org.apache.hadoop.hdds.protocol.proto @@ -108,13 +110,15 @@ public EndpointStateMachine.EndPointStates call() throws Exception { rpcEndPoint.lock(); try { - ContainerReportsProto contianerReport = datanodeContainerManager + ContainerReportsProto containerReport = datanodeContainerManager .getContainerReport(); NodeReportProto nodeReport = datanodeContainerManager.getNodeReport(); + PipelineReportsProto pipelineReportsProto = + datanodeContainerManager.getPipelineReport(); // TODO : Add responses to the command Queue. SCMRegisteredResponseProto response = rpcEndPoint.getEndPoint() .register(datanodeDetails.getProtoBufMessage(), nodeReport, - contianerReport); + containerReport, pipelineReportsProto); Preconditions.checkState(UUID.fromString(response.getDatanodeUUID()) .equals(datanodeDetails.getUuid()), "Unexpected datanode ID in the response."); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java index 4a90144f4e..83e742c171 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java @@ -24,6 +24,9 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .ContainerCommandRequestProto; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.PipelineReport; +import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; @@ -38,6 +41,9 @@ import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.SocketAddress; +import java.util.Collections; +import java.util.List; +import java.util.UUID; /** * Creates a Grpc server endpoint that acts as the communication layer for @@ -47,6 +53,7 @@ public final class XceiverServerGrpc implements XceiverServerSpi { private static final Logger LOG = LoggerFactory.getLogger(XceiverServerGrpc.class); private int port; + private UUID id; private Server server; private final ContainerDispatcher storageContainer; @@ -59,6 +66,7 @@ public XceiverServerGrpc(DatanodeDetails datanodeDetails, Configuration conf, ContainerDispatcher dispatcher, BindableService... additionalServices) { Preconditions.checkNotNull(conf); + this.id = datanodeDetails.getUuid(); this.port = conf.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT, OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT); // Get an available port on current node and @@ -123,4 +131,12 @@ public void submitRequest(ContainerCommandRequestProto request, HddsProtos.PipelineID pipelineID) { storageContainer.dispatch(request); } + + @Override + public List getPipelineReport() { + return Collections.singletonList( + PipelineReport.newBuilder() + .setPipelineID(PipelineID.valueOf(id).getProtobuf()) + .build()); + } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java index 1863f6d759..8c3fa5c8dd 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java @@ -21,8 +21,11 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .ContainerCommandRequestProto; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.PipelineReport; import java.io.IOException; +import java.util.List; /** A server endpoint that acts as the communication layer for Ozone * containers. */ @@ -49,4 +52,10 @@ public interface XceiverServerSpi { void submitRequest(ContainerCommandRequestProto request, HddsProtos.PipelineID pipelineID) throws IOException; + + /** + * Get pipeline report for the XceiverServer instance. + * @return list of report for each pipeline. + */ + List getPipelineReport(); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java index 24ea0b9a0d..d88995b3d9 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java @@ -19,17 +19,18 @@ package org.apache.hadoop.ozone.container.common.transport.server.ratis; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.base.Strings; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .ContainerCommandRequestProto; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.PipelineReport; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ClosePipelineInfo; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.PipelineAction; +import org.apache.hadoop.hdds.scm.HddsServerUtil; import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; @@ -68,6 +69,8 @@ import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.SocketAddress; +import java.util.ArrayList; +import java.util.List; import java.util.Objects; import java.util.UUID; import java.util.concurrent.ArrayBlockingQueue; @@ -96,12 +99,12 @@ private static long nextCallId() { private final ReplicationLevel replicationLevel; private long nodeFailureTimeoutMs; - private XceiverServerRatis(DatanodeDetails dd, int port, String storageDir, + private XceiverServerRatis(DatanodeDetails dd, int port, ContainerDispatcher dispatcher, Configuration conf, StateContext context) throws IOException { Objects.requireNonNull(dd, "id == null"); this.port = port; - RaftProperties serverProperties = newRaftProperties(conf, storageDir); + RaftProperties serverProperties = newRaftProperties(conf); final int numWriteChunkThreads = conf.getInt( OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_KEY, OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_DEFAULT); @@ -118,15 +121,13 @@ private XceiverServerRatis(DatanodeDetails dd, int port, String storageDir, new ContainerStateMachine(dispatcher, chunkExecutor, this); this.server = RaftServer.newBuilder() .setServerId(RatisHelper.toRaftPeerId(dd)) - .setGroup(RatisHelper.emptyRaftGroup()) .setProperties(serverProperties) .setStateMachine(stateMachine) .build(); } - private RaftProperties newRaftProperties(Configuration conf, - String storageDir) { + private RaftProperties newRaftProperties(Configuration conf) { final RaftProperties properties = new RaftProperties(); // Set rpc type @@ -235,6 +236,7 @@ private RaftProperties newRaftProperties(Configuration conf, nodeFailureTimeoutMs = nodeFailureTimeout.toLong(TimeUnit.MILLISECONDS); // Set the ratis storage directory + String storageDir = HddsServerUtil.getOzoneDatanodeRatisDirectory(conf); RaftServerConfigKeys.setStorageDir(properties, new File(storageDir)); // For grpc set the maximum message size @@ -253,23 +255,9 @@ private RaftProperties newRaftProperties(Configuration conf, public static XceiverServerRatis newXceiverServerRatis( DatanodeDetails datanodeDetails, Configuration ozoneConf, ContainerDispatcher dispatcher, StateContext context) throws IOException { - final String ratisDir = File.separator + "ratis"; int localPort = ozoneConf.getInt( OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_PORT, OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_PORT_DEFAULT); - String storageDir = ozoneConf.get( - OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR); - - if (Strings.isNullOrEmpty(storageDir)) { - storageDir = ozoneConf.get(OzoneConfigKeys - .OZONE_METADATA_DIRS); - Preconditions.checkNotNull(storageDir, "ozone.metadata.dirs " + - "cannot be null, Please check your configs."); - storageDir = storageDir.concat(ratisDir); - LOG.warn("Storage directory for Ratis is not configured. Mapping Ratis " + - "storage under {}. It is a good idea to map this to an SSD disk.", - storageDir); - } // Get an available port on current node and // use that as the container port @@ -282,13 +270,6 @@ public static XceiverServerRatis newXceiverServerRatis( socket.bind(address); localPort = socket.getLocalPort(); LOG.info("Found a free port for the server : {}", localPort); - // If we have random local ports configured this means that it - // probably running under MiniOzoneCluster. Ratis locks the storage - // directories, so we need to pass different local directory for each - // local instance. So we map ratis directories under datanode ID. - storageDir = - storageDir.concat(File.separator + - datanodeDetails.getUuidString()); } catch (IOException e) { LOG.error("Unable find a random free port for the server, " + "fallback to use default port {}", localPort, e); @@ -296,7 +277,7 @@ public static XceiverServerRatis newXceiverServerRatis( } datanodeDetails.setPort( DatanodeDetails.newPort(DatanodeDetails.Port.Name.RATIS, localPort)); - return new XceiverServerRatis(datanodeDetails, localPort, storageDir, + return new XceiverServerRatis(datanodeDetails, localPort, dispatcher, ozoneConf, context); } @@ -363,7 +344,7 @@ private void processReply(RaftClientReply reply) { public void submitRequest( ContainerCommandRequestProto request, HddsProtos.PipelineID pipelineID) throws IOException { - // ReplicationLevel.ALL ensures the transactions corresponding to + // ReplicationLevel.MAJORITY ensures the transactions corresponding to // the request here are applied on all the raft servers. RaftClientRequest raftClientRequest = createRaftClientRequest(request, pipelineID, @@ -427,13 +408,27 @@ private void handlePipelineFailure(RaftGroupId groupId, + ".Reason : " + action.getClosePipeline().getDetailedReason()); } - void handleNodeSlowness( - RaftGroup group, RoleInfoProto roleInfoProto) { + @Override + public List getPipelineReport() { + try { + Iterable gids = server.getGroupIds(); + List reports = new ArrayList<>(); + for (RaftGroupId groupId : gids) { + reports.add(PipelineReport.newBuilder() + .setPipelineID(PipelineID.valueOf(groupId).getProtobuf()) + .build()); + } + return reports; + } catch (Exception e) { + return null; + } + } + + void handleNodeSlowness(RaftGroup group, RoleInfoProto roleInfoProto) { handlePipelineFailure(group.getGroupId(), roleInfoProto); } - void handleNoLeader( - RaftGroup group, RoleInfoProto roleInfoProto) { + void handleNoLeader(RaftGroup group, RoleInfoProto roleInfoProto) { handlePipelineFailure(group.getGroupId(), roleInfoProto); } } \ No newline at end of file diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java index 72a5804a9e..ebacf756fc 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java @@ -25,6 +25,8 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher; import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; @@ -164,6 +166,16 @@ public ContainerSet getContainerSet() { return this.containerSet.getContainerReport(); } + public PipelineReportsProto getPipelineReport() { + PipelineReportsProto.Builder pipelineReportsProto = + PipelineReportsProto.newBuilder(); + for (XceiverServerSpi serverInstance : server) { + pipelineReportsProto + .addAllPipelineReport(serverInstance.getPipelineReport()); + } + return pipelineReportsProto.build(); + } + /** * Submit ContainerRequest. * @param request diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java index a950a3144a..9296524fca 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java @@ -18,6 +18,8 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto; import org.apache.hadoop.hdds.protocol.proto @@ -69,9 +71,11 @@ SCMHeartbeatResponseProto sendHeartbeat(SCMHeartbeatRequestProto heartbeat) * @param containerReportsRequestProto - Container Reports. * @return SCM Command. */ - SCMRegisteredResponseProto register(DatanodeDetailsProto datanodeDetails, - NodeReportProto nodeReport, ContainerReportsProto - containerReportsRequestProto) throws IOException; + SCMRegisteredResponseProto register( + DatanodeDetailsProto datanodeDetails, + NodeReportProto nodeReport, + ContainerReportsProto containerReportsRequestProto, + PipelineReportsProto pipelineReports) throws IOException; /** * Used by datanode to send block deletion ACK to SCM. diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java index c9ef43f9c1..b3c3eb359e 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java @@ -19,6 +19,8 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.NodeReportProto; import org.apache.hadoop.hdds.protocol.proto @@ -51,10 +53,12 @@ public interface StorageContainerNodeProtocol { * Register the node if the node finds that it is not registered with any SCM. * @param datanodeDetails DatanodeDetails * @param nodeReport NodeReportProto + * @param pipelineReport PipelineReportsProto * @return SCMHeartbeatResponseProto */ RegisteredCommand register(DatanodeDetails datanodeDetails, - NodeReportProto nodeReport); + NodeReportProto nodeReport, + PipelineReportsProto pipelineReport); /** * Send heartbeat to indicate the datanode is alive and doing well. diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java index 40fe189600..b9cf6f9fc9 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java @@ -19,6 +19,8 @@ import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; import org.apache.hadoop.hdds.protocol.proto @@ -149,12 +151,14 @@ public SCMHeartbeatResponseProto sendHeartbeat( @Override public SCMRegisteredResponseProto register( DatanodeDetailsProto datanodeDetailsProto, NodeReportProto nodeReport, - ContainerReportsProto containerReportsRequestProto) + ContainerReportsProto containerReportsRequestProto, + PipelineReportsProto pipelineReportsProto) throws IOException { SCMRegisterRequestProto.Builder req = SCMRegisterRequestProto.newBuilder(); req.setDatanodeDetails(datanodeDetailsProto); req.setContainerReport(containerReportsRequestProto); + req.setPipelineReports(pipelineReportsProto); req.setNodeReport(nodeReport); final SCMRegisteredResponseProto response; try { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java index 7e8bd8a2ac..ed0182263a 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java @@ -18,6 +18,8 @@ import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.NodeReportProto; import org.apache.hadoop.hdds.protocol.proto @@ -76,8 +78,9 @@ public SCMRegisteredResponseProto register(RpcController controller, ContainerReportsProto containerRequestProto = request .getContainerReport(); NodeReportProto dnNodeReport = request.getNodeReport(); + PipelineReportsProto pipelineReport = request.getPipelineReports(); return impl.register(request.getDatanodeDetails(), dnNodeReport, - containerRequestProto); + containerRequestProto, pipelineReport); } catch (IOException e) { throw new ServiceException(e); } diff --git a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto index 0a6934342e..78758cbe46 100644 --- a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto +++ b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto @@ -52,6 +52,7 @@ message SCMRegisterRequestProto { required DatanodeDetailsProto datanodeDetails = 1; required NodeReportProto nodeReport = 2; required ContainerReportsProto containerReport = 3; + required PipelineReportsProto pipelineReports = 4; } /** @@ -82,6 +83,7 @@ message SCMHeartbeatRequestProto { optional CommandStatusReportsProto commandStatusReport = 4; optional ContainerActionsProto containerActions = 5; optional PipelineActionsProto pipelineActions = 6; + optional PipelineReportsProto pipelineReports = 7; } /* @@ -163,6 +165,14 @@ message ContainerAction { optional Reason reason = 3; } +message PipelineReport { + required PipelineID pipelineID = 1; +} + +message PipelineReportsProto { + repeated PipelineReport pipelineReport = 1; +} + message PipelineActionsProto { repeated PipelineAction pipelineActions = 1; } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java index 751775f627..27b6272ef6 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java @@ -18,6 +18,10 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; import org.apache.hadoop.hdds.protocol.proto. StorageContainerDatanodeProtocolProtos.CommandStatus; import org.apache.hadoop.hdds.scm.VersionInfo; @@ -214,8 +218,8 @@ private void sleepIfNeeded() { public StorageContainerDatanodeProtocolProtos .SCMRegisteredResponseProto register( DatanodeDetailsProto datanodeDetailsProto, NodeReportProto nodeReport, - StorageContainerDatanodeProtocolProtos.ContainerReportsProto - containerReportsRequestProto) + ContainerReportsProto containerReportsRequestProto, + PipelineReportsProto pipelineReportsProto) throws IOException { rpcCount.incrementAndGet(); updateNodeReport(datanodeDetailsProto, nodeReport); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java index 206e24be3c..eb0a0b44e2 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java @@ -466,24 +466,6 @@ public ContainerWithPipeline getMatchingContainerWithPipeline( return new ContainerWithPipeline(containerInfo, pipeline); } - public void handlePipelineClose(PipelineID pipelineID) { - try { - Pipeline pipeline = pipelineSelector.getPipeline(pipelineID); - if (pipeline != null) { - pipelineSelector.finalizePipeline(pipeline); - } else { - LOG.debug("pipeline:{} not found", pipelineID); - } - } catch (Exception e) { - LOG.info("failed to close pipeline:{}", pipelineID, e); - } - } - - public Set getPipelineOnDatanode( - DatanodeDetails datanodeDetails) { - return pipelineSelector.getPipelineId(datanodeDetails.getUuid()); - } - /** * Process container report from Datanode. *

@@ -710,7 +692,6 @@ public MetadataStore getContainerStore() { return containerStore; } - @VisibleForTesting public PipelineSelector getPipelineSelector() { return pipelineSelector; } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java index dcbd49c5ec..3f156deb3e 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java @@ -89,20 +89,20 @@ public void onMessage(ContainerReportFromDatanode containerReportFromDatanode, .map(ContainerID::new) .collect(Collectors.toSet()); - ReportResult reportResult = node2ContainerMap + ReportResult reportResult = node2ContainerMap .processReport(datanodeOrigin.getUuid(), containerIds); //we have the report, so we can update the states for the next iteration. node2ContainerMap .setContainersForDatanode(datanodeOrigin.getUuid(), containerIds); - for (ContainerID containerID : reportResult.getMissingContainers()) { + for (ContainerID containerID : reportResult.getMissingEntries()) { containerStateManager .removeContainerReplica(containerID, datanodeOrigin); checkReplicationState(containerID, publisher); } - for (ContainerID containerID : reportResult.getNewContainers()) { + for (ContainerID containerID : reportResult.getNewEntries()) { containerStateManager.addContainerReplica(containerID, datanodeOrigin); checkReplicationState(containerID, publisher); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java index 1b0c57c352..5ed80cb47f 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java @@ -25,13 +25,12 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; -import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; +import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector; import java.io.Closeable; import java.io.IOException; import java.util.List; import java.util.Map; -import java.util.Set; /** * Mapping class contains the mapping from a name to a pipeline mapping. This is @@ -138,15 +137,5 @@ ContainerWithPipeline getMatchingContainerWithPipeline(long size, String owner, ReplicationType type, ReplicationFactor factor, LifeCycleState state) throws IOException; - /** - * Handle a pipeline close event. - * @param pipelineID pipeline id - */ - void handlePipelineClose(PipelineID pipelineID); - - /** - * Get set of pipeline for a specific datanode. - * @param datanodeDetails datanode for which pipelines needs to be fetched. - */ - Set getPipelineOnDatanode(DatanodeDetails datanodeDetails); + PipelineSelector getPipelineSelector(); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java index 9d72eb106a..745e052be3 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java @@ -27,9 +27,12 @@ .DeleteBlockCommandStatus; import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler .ReplicationStatus; -import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler.CloseContainerRetryableReq; +import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler + .CloseContainerRetryableReq; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; +import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher + .PipelineReportFromDatanode; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher .PipelineActionsFromDatanode; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher @@ -72,8 +75,7 @@ public final class SCMEvents { /** * ContainerReports are send out by Datanodes. This report is received by - * SCMDatanodeHeartbeatDispatcher and Container_Report Event - * isTestSCMDatanodeHeartbeatDispatcher generated. + * SCMDatanodeHeartbeatDispatcher and Container_Report Event is generated. */ public static final TypedEvent CONTAINER_REPORT = new TypedEvent<>(ContainerReportFromDatanode.class, "Container_Report"); @@ -86,6 +88,13 @@ public final class SCMEvents { CONTAINER_ACTIONS = new TypedEvent<>(ContainerActionsFromDatanode.class, "Container_Actions"); + /** + * PipelineReports are send out by Datanodes. This report is received by + * SCMDatanodeHeartbeatDispatcher and Pipeline_Report Event is generated. + */ + public static final TypedEvent PIPELINE_REPORT = + new TypedEvent<>(PipelineReportFromDatanode.class, "Pipeline_Report"); + /** * PipelineActions are sent by Datanode. This event is received by * SCMDatanodeHeartbeatDispatcher and PIPELINE_ACTIONS event is generated. diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java index fca08bd177..58da1ccc7d 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java @@ -19,6 +19,8 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; import org.apache.hadoop.hdds.scm.node.states.NodeAlreadyExistsException; import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; import org.apache.hadoop.hdds.scm.server.StorageContainerManager; @@ -363,7 +365,8 @@ public VersionResponse getVersion(SCMVersionRequestProto versionRequest) { */ @Override public RegisteredCommand register( - DatanodeDetails datanodeDetails, NodeReportProto nodeReport) { + DatanodeDetails datanodeDetails, NodeReportProto nodeReport, + PipelineReportsProto pipelineReportsProto) { InetAddress dnAddress = Server.getRemoteIp(); if (dnAddress != null) { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java index b435e777ae..ddbba82533 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java @@ -19,17 +19,13 @@ package org.apache.hadoop.hdds.scm.node; import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.scm.container.Mapping; -import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; -import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap; +import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector; import org.apache.hadoop.hdds.server.events.EventHandler; import org.apache.hadoop.hdds.server.events.EventPublisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Set; - /** * Handles Stale node event. */ @@ -37,22 +33,17 @@ public class StaleNodeHandler implements EventHandler { static final Logger LOG = LoggerFactory.getLogger(StaleNodeHandler.class); private final Node2ContainerMap node2ContainerMap; - private final Mapping containerManager; + private final PipelineSelector pipelineSelector; public StaleNodeHandler(Node2ContainerMap node2ContainerMap, - Mapping containerManager) { + PipelineSelector pipelineSelector) { this.node2ContainerMap = node2ContainerMap; - this.containerManager = containerManager; + this.pipelineSelector = pipelineSelector; } @Override public void onMessage(DatanodeDetails datanodeDetails, EventPublisher publisher) { - Set pipelineIDs = - containerManager.getPipelineOnDatanode(datanodeDetails); - for (PipelineID id : pipelineIDs) { - LOG.info("closing pipeline {}.", id); - publisher.fireEvent(SCMEvents.PIPELINE_CLOSE, id); - } + pipelineSelector.handleStaleNode(datanodeDetails); } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMap.java index 97c254be31..549080a25e 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMap.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMap.java @@ -18,21 +18,15 @@ package org.apache.hadoop.hdds.scm.node.states; -import java.util.Collections; import java.util.HashSet; -import java.util.Map; import java.util.Set; -import java.util.TreeSet; import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes - .DUPLICATE_DATANODE; import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes .NO_SUCH_DATANODE; @@ -40,26 +34,23 @@ * This data structure maintains the list of containers that is on a datanode. * This information is built from the DN container reports. */ -public class Node2ContainerMap { - private final Map> dn2ContainerMap; +public class Node2ContainerMap extends Node2ObjectsMap { /** * Constructs a Node2ContainerMap Object. */ public Node2ContainerMap() { - dn2ContainerMap = new ConcurrentHashMap<>(); + super(); } /** - * Returns true if this a datanode that is already tracked by - * Node2ContainerMap. + * Returns null if there no containers associated with this datanode ID. * - * @param datanodeID - UUID of the Datanode. - * @return True if this is tracked, false if this map does not know about it. + * @param datanode - UUID + * @return Set of containers or Null. */ - public boolean isKnownDatanode(UUID datanodeID) { - Preconditions.checkNotNull(datanodeID); - return dn2ContainerMap.containsKey(datanodeID); + public Set getContainers(UUID datanode) { + return getObjects(datanode); } /** @@ -70,13 +61,7 @@ public boolean isKnownDatanode(UUID datanodeID) { */ public void insertNewDatanode(UUID datanodeID, Set containerIDs) throws SCMException { - Preconditions.checkNotNull(containerIDs); - Preconditions.checkNotNull(datanodeID); - if (dn2ContainerMap.putIfAbsent(datanodeID, new HashSet<>(containerIDs)) - != null) { - throw new SCMException("Node already exists in the map", - DUPLICATE_DATANODE); - } + super.insertNewDatanode(datanodeID, containerIDs); } /** @@ -91,103 +76,15 @@ public void setContainersForDatanode(UUID datanodeID, Set containers) throws SCMException { Preconditions.checkNotNull(datanodeID); Preconditions.checkNotNull(containers); - if (dn2ContainerMap + if (dn2ObjectMap .computeIfPresent(datanodeID, (k, v) -> new HashSet<>(containers)) == null) { throw new SCMException("No such datanode", NO_SUCH_DATANODE); } } - /** - * Removes datanode Entry from the map. - * - * @param datanodeID - Datanode ID. - */ - public void removeDatanode(UUID datanodeID) { - Preconditions.checkNotNull(datanodeID); - dn2ContainerMap.computeIfPresent(datanodeID, (k, v) -> null); - } - - /** - * Returns null if there no containers associated with this datanode ID. - * - * @param datanode - UUID - * @return Set of containers or Null. - */ - public Set getContainers(UUID datanode) { - Preconditions.checkNotNull(datanode); - return dn2ContainerMap.computeIfPresent(datanode, (k, v) -> - Collections.unmodifiableSet(v)); - } - - public ReportResult processReport(UUID datanodeID, Set - containers) { - Preconditions.checkNotNull(datanodeID); - Preconditions.checkNotNull(containers); - - if (!isKnownDatanode(datanodeID)) { - return ReportResult.ReportResultBuilder.newBuilder() - .setStatus(ReportStatus.NEW_DATANODE_FOUND) - .setNewContainers(containers) - .build(); - } - - // Conditions like Zero length containers should be handled by removeAll. - Set currentSet = dn2ContainerMap.get(datanodeID); - TreeSet newContainers = new TreeSet<>(containers); - newContainers.removeAll(currentSet); - - TreeSet missingContainers = new TreeSet<>(currentSet); - missingContainers.removeAll(containers); - - if (newContainers.isEmpty() && missingContainers.isEmpty()) { - return ReportResult.ReportResultBuilder.newBuilder() - .setStatus(ReportStatus.ALL_IS_WELL) - .build(); - } - - if (newContainers.isEmpty() && !missingContainers.isEmpty()) { - return ReportResult.ReportResultBuilder.newBuilder() - .setStatus(ReportStatus.MISSING_CONTAINERS) - .setMissingContainers(missingContainers) - .build(); - } - - if (!newContainers.isEmpty() && missingContainers.isEmpty()) { - return ReportResult.ReportResultBuilder.newBuilder() - .setStatus(ReportStatus.NEW_CONTAINERS_FOUND) - .setNewContainers(newContainers) - .build(); - } - - if (!newContainers.isEmpty() && !missingContainers.isEmpty()) { - return ReportResult.ReportResultBuilder.newBuilder() - .setStatus(ReportStatus.MISSING_AND_NEW_CONTAINERS_FOUND) - .setNewContainers(newContainers) - .setMissingContainers(missingContainers) - .build(); - } - - // default status & Make compiler happy - return ReportResult.ReportResultBuilder.newBuilder() - .setStatus(ReportStatus.ALL_IS_WELL) - .build(); - } - - /** - * Results possible from processing a container report by - * Node2ContainerMapper. - */ - public enum ReportStatus { - ALL_IS_WELL, - MISSING_CONTAINERS, - NEW_CONTAINERS_FOUND, - MISSING_AND_NEW_CONTAINERS_FOUND, - NEW_DATANODE_FOUND - } - @VisibleForTesting public int size() { - return dn2ContainerMap.size(); + return dn2ObjectMap.size(); } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ObjectsMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ObjectsMap.java new file mode 100644 index 0000000000..e49a79c64f --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ObjectsMap.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + * + */ + +package org.apache.hadoop.hdds.scm.node.states; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.hadoop.hdds.scm.exceptions.SCMException; + +import java.util.UUID; +import java.util.Set; +import java.util.Map; +import java.util.TreeSet; +import java.util.HashSet; +import java.util.Collections; + +import java.util.concurrent.ConcurrentHashMap; + +import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.DUPLICATE_DATANODE; + +/** + * This data structure maintains the list of containers that is on a datanode. + * This information is built from the DN container reports. + */ +public class Node2ObjectsMap { + protected final Map> dn2ObjectMap; + + /** + * Constructs a Node2ContainerMap Object. + */ + public Node2ObjectsMap() { + dn2ObjectMap = new ConcurrentHashMap<>(); + } + + /** + * Returns true if this a datanode that is already tracked by + * Node2ContainerMap. + * + * @param datanodeID - UUID of the Datanode. + * @return True if this is tracked, false if this map does not know about it. + */ + public boolean isKnownDatanode(UUID datanodeID) { + Preconditions.checkNotNull(datanodeID); + return dn2ObjectMap.containsKey(datanodeID); + } + + /** + * Insert a new datanode into Node2Container Map. + * + * @param datanodeID -- Datanode UUID + * @param containerIDs - List of ContainerIDs. + */ + public void insertNewDatanode(UUID datanodeID, Set containerIDs) + throws SCMException { + Preconditions.checkNotNull(containerIDs); + Preconditions.checkNotNull(datanodeID); + if (dn2ObjectMap.putIfAbsent(datanodeID, new HashSet<>(containerIDs)) + != null) { + throw new SCMException("Node already exists in the map", + DUPLICATE_DATANODE); + } + } + + /** + * Removes datanode Entry from the map. + * + * @param datanodeID - Datanode ID. + */ + void removeDatanode(UUID datanodeID) { + Preconditions.checkNotNull(datanodeID); + dn2ObjectMap.computeIfPresent(datanodeID, (k, v) -> null); + } + + /** + * Returns null if there no containers associated with this datanode ID. + * + * @param datanode - UUID + * @return Set of containers or Null. + */ + Set getObjects(UUID datanode) { + Preconditions.checkNotNull(datanode); + final Set s = dn2ObjectMap.get(datanode); + return s != null? Collections.unmodifiableSet(s): Collections.emptySet(); + } + + public ReportResult.ReportResultBuilder newBuilder() { + return new ReportResult.ReportResultBuilder<>(); + } + + public ReportResult processReport(UUID datanodeID, Set objects) { + Preconditions.checkNotNull(datanodeID); + Preconditions.checkNotNull(objects); + + if (!isKnownDatanode(datanodeID)) { + return newBuilder() + .setStatus(ReportResult.ReportStatus.NEW_DATANODE_FOUND) + .setNewEntries(objects) + .build(); + } + + // Conditions like Zero length containers should be handled by removeAll. + Set currentSet = dn2ObjectMap.get(datanodeID); + TreeSet newObjects = new TreeSet<>(objects); + newObjects.removeAll(currentSet); + + TreeSet missingObjects = new TreeSet<>(currentSet); + missingObjects.removeAll(objects); + + if (newObjects.isEmpty() && missingObjects.isEmpty()) { + return newBuilder() + .setStatus(ReportResult.ReportStatus.ALL_IS_WELL) + .build(); + } + + if (newObjects.isEmpty() && !missingObjects.isEmpty()) { + return newBuilder() + .setStatus(ReportResult.ReportStatus.MISSING_ENTRIES) + .setMissingEntries(missingObjects) + .build(); + } + + if (!newObjects.isEmpty() && missingObjects.isEmpty()) { + return newBuilder() + .setStatus(ReportResult.ReportStatus.NEW_ENTRIES_FOUND) + .setNewEntries(newObjects) + .build(); + } + + if (!newObjects.isEmpty() && !missingObjects.isEmpty()) { + return newBuilder() + .setStatus(ReportResult.ReportStatus.MISSING_AND_NEW_ENTRIES_FOUND) + .setNewEntries(newObjects) + .setMissingEntries(missingObjects) + .build(); + } + + // default status & Make compiler happy + return newBuilder() + .setStatus(ReportResult.ReportStatus.ALL_IS_WELL) + .build(); + } + + @VisibleForTesting + public int size() { + return dn2ObjectMap.size(); + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/ReportResult.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/ReportResult.java index 9bb6cf1587..0c7610fc7b 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/ReportResult.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/ReportResult.java @@ -19,83 +19,92 @@ package org.apache.hadoop.hdds.scm.node.states; -import org.apache.hadoop.hdds.scm.container.ContainerID; - import java.util.Collections; import java.util.Set; import com.google.common.base.Preconditions; /** - * A Container Report gets processsed by the Node2Container and returns - * Report Result class. + * A Container/Pipeline Report gets processed by the + * Node2Container/Node2Pipeline and returns Report Result class. */ -public class ReportResult { - private Node2ContainerMap.ReportStatus status; - private Set missingContainers; - private Set newContainers; +public final class ReportResult { + private ReportStatus status; + private Set missingEntries; + private Set newEntries; - ReportResult(Node2ContainerMap.ReportStatus status, - Set missingContainers, - Set newContainers) { + private ReportResult(ReportStatus status, + Set missingEntries, + Set newEntries) { this.status = status; - Preconditions.checkNotNull(missingContainers); - Preconditions.checkNotNull(newContainers); - this.missingContainers = missingContainers; - this.newContainers = newContainers; + Preconditions.checkNotNull(missingEntries); + Preconditions.checkNotNull(newEntries); + this.missingEntries = missingEntries; + this.newEntries = newEntries; } - public Node2ContainerMap.ReportStatus getStatus() { + public ReportStatus getStatus() { return status; } - public Set getMissingContainers() { - return missingContainers; + public Set getMissingEntries() { + return missingEntries; } - public Set getNewContainers() { - return newContainers; + public Set getNewEntries() { + return newEntries; } - static class ReportResultBuilder { - private Node2ContainerMap.ReportStatus status; - private Set missingContainers; - private Set newContainers; + /** + * Result after processing report for node2Object map. + * @param + */ + public static class ReportResultBuilder { + private ReportStatus status; + private Set missingEntries; + private Set newEntries; - static ReportResultBuilder newBuilder() { - return new ReportResultBuilder(); - } - - public ReportResultBuilder setStatus( - Node2ContainerMap.ReportStatus newstatus) { - this.status = newstatus; + public ReportResultBuilder setStatus( + ReportStatus newStatus) { + this.status = newStatus; return this; } - public ReportResultBuilder setMissingContainers( - Set missingContainersLit) { - this.missingContainers = missingContainersLit; + public ReportResultBuilder setMissingEntries( + Set missingEntriesList) { + this.missingEntries = missingEntriesList; return this; } - public ReportResultBuilder setNewContainers( - Set newContainersList) { - this.newContainers = newContainersList; + public ReportResultBuilder setNewEntries( + Set newEntriesList) { + this.newEntries = newEntriesList; return this; } - ReportResult build() { + public ReportResult build() { - Set nullSafeMissingContainers = this.missingContainers; - Set nullSafeNewContainers = this.newContainers; - if (nullSafeNewContainers == null) { - nullSafeNewContainers = Collections.emptySet(); + Set nullSafeMissingEntries = this.missingEntries; + Set nullSafeNewEntries = this.newEntries; + if (nullSafeNewEntries == null) { + nullSafeNewEntries = Collections.emptySet(); } - if (nullSafeMissingContainers == null) { - nullSafeMissingContainers = Collections.emptySet(); + if (nullSafeMissingEntries == null) { + nullSafeMissingEntries = Collections.emptySet(); } - return new ReportResult(status, nullSafeMissingContainers, - nullSafeNewContainers); + return new ReportResult(status, nullSafeMissingEntries, + nullSafeNewEntries); } } + + /** + * Results possible from processing a report. + */ + public enum ReportStatus { + ALL_IS_WELL, + MISSING_ENTRIES, + NEW_ENTRIES_FOUND, + MISSING_AND_NEW_ENTRIES_FOUND, + NEW_DATANODE_FOUND, + } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/Node2PipelineMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/Node2PipelineMap.java index 363ce71556..87f2222b5f 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/Node2PipelineMap.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/Node2PipelineMap.java @@ -16,19 +16,15 @@ * */ -package org.apache.hadoop.hdds.scm.pipelines; +package org.apache.hadoop.hdds.scm.node.states; -import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; -import java.util.Collections; import java.util.HashSet; -import java.util.Map; import java.util.Set; import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; /** * This data structure maintains the list of pipelines which the given datanode is a part of. This @@ -36,33 +32,11 @@ * *

TODO: this information needs to be regenerated from pipeline reports on SCM restart */ -public class Node2PipelineMap { - private final Map> dn2PipelineMap; +public class Node2PipelineMap extends Node2ObjectsMap { /** Constructs a Node2PipelineMap Object. */ public Node2PipelineMap() { - dn2PipelineMap = new ConcurrentHashMap<>(); - } - - /** - * Returns true if this a datanode that is already tracked by Node2PipelineMap. - * - * @param datanodeID - UUID of the Datanode. - * @return True if this is tracked, false if this map does not know about it. - */ - private boolean isKnownDatanode(UUID datanodeID) { - Preconditions.checkNotNull(datanodeID); - return dn2PipelineMap.containsKey(datanodeID); - } - - /** - * Removes datanode Entry from the map. - * - * @param datanodeID - Datanode ID. - */ - public synchronized void removeDatanode(UUID datanodeID) { - Preconditions.checkNotNull(datanodeID); - dn2PipelineMap.computeIfPresent(datanodeID, (k, v) -> null); + super(); } /** @@ -72,9 +46,7 @@ public synchronized void removeDatanode(UUID datanodeID) { * @return Set of pipelines or Null. */ public Set getPipelines(UUID datanode) { - Preconditions.checkNotNull(datanode); - final Set s = dn2PipelineMap.get(datanode); - return s != null? Collections.unmodifiableSet(s): Collections.emptySet(); + return getObjects(datanode); } /** @@ -85,7 +57,7 @@ public Set getPipelines(UUID datanode) { public synchronized void addPipeline(Pipeline pipeline) { for (DatanodeDetails details : pipeline.getDatanodes().values()) { UUID dnId = details.getUuid(); - dn2PipelineMap.computeIfAbsent(dnId, k -> new HashSet<>()) + dn2ObjectMap.computeIfAbsent(dnId, k -> new HashSet<>()) .add(pipeline.getId()); } } @@ -93,16 +65,11 @@ public synchronized void addPipeline(Pipeline pipeline) { public synchronized void removePipeline(Pipeline pipeline) { for (DatanodeDetails details : pipeline.getDatanodes().values()) { UUID dnId = details.getUuid(); - dn2PipelineMap.computeIfPresent( - dnId, + dn2ObjectMap.computeIfPresent(dnId, (k, v) -> { v.remove(pipeline.getId()); return v; }); } } - - public Map> getDn2PipelineMap() { - return Collections.unmodifiableMap(dn2PipelineMap); - } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineCloseHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineCloseHandler.java index 733dec5633..e49678fee8 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineCloseHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineCloseHandler.java @@ -17,22 +17,36 @@ package org.apache.hadoop.hdds.scm.pipelines; -import org.apache.hadoop.hdds.scm.container.Mapping; +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; import org.apache.hadoop.hdds.server.events.EventHandler; import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Handles pipeline close event. */ public class PipelineCloseHandler implements EventHandler { - private final Mapping mapping; - public PipelineCloseHandler(Mapping mapping) { - this.mapping = mapping; + private static final Logger LOG = LoggerFactory + .getLogger(PipelineCloseHandler.class); + + private final PipelineSelector pipelineSelector; + public PipelineCloseHandler(PipelineSelector pipelineSelector) { + this.pipelineSelector = pipelineSelector; } @Override public void onMessage(PipelineID pipelineID, EventPublisher publisher) { - mapping.handlePipelineClose(pipelineID); + Pipeline pipeline = pipelineSelector.getPipeline(pipelineID); + try { + if (pipeline != null) { + pipelineSelector.finalizePipeline(pipeline); + } else { + LOG.debug("pipeline:{} not found", pipelineID); + } + } catch (Exception e) { + LOG.info("failed to close pipeline:{}", pipelineID, e); + } } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java index 07ff2b0918..ca2e878637 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java @@ -18,6 +18,8 @@ import java.util.ArrayList; import java.util.LinkedList; + +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; @@ -36,7 +38,7 @@ public abstract class PipelineManager { private static final Logger LOG = LoggerFactory.getLogger(PipelineManager.class); - private final ArrayList activePipelines; + protected final ArrayList activePipelines; public PipelineManager() { activePipelines = new ArrayList<>(); @@ -45,7 +47,10 @@ public PipelineManager() { } } - private static class ActivePipelines { + /** + * List of active pipelines. + */ + public static class ActivePipelines { private final List activePipelines; private final AtomicInteger pipelineIndex; @@ -55,10 +60,12 @@ private static class ActivePipelines { } void addPipeline(PipelineID pipelineID) { - activePipelines.add(pipelineID); + if (!activePipelines.contains(pipelineID)) { + activePipelines.add(pipelineID); + } } - void removePipeline(PipelineID pipelineID) { + public void removePipeline(PipelineID pipelineID) { activePipelines.remove(pipelineID); } @@ -117,17 +124,6 @@ void addOpenPipeline(Pipeline pipeline) { .addPipeline(pipeline.getId()); } - protected static int getReplicationCount(ReplicationFactor factor) { - switch (factor) { - case ONE: - return 1; - case THREE: - return 3; - default: - throw new IllegalArgumentException("Unexpected replication count"); - } - } - public abstract Pipeline allocatePipeline( ReplicationFactor replicationFactor); @@ -137,6 +133,14 @@ public abstract Pipeline allocatePipeline( */ public abstract void initializePipeline(Pipeline pipeline) throws IOException; + public void processPipelineReport(Pipeline pipeline, DatanodeDetails dn) { + if (pipeline.addMember(dn) + &&(pipeline.getDatanodes().size() == pipeline.getFactor().getNumber()) + && pipeline.getLifeCycleState() == HddsProtos.LifeCycleState.OPEN) { + addOpenPipeline(pipeline); + } + } + /** * Creates a pipeline with a specified replication factor and type. * @param replicationFactor - Replication Factor. @@ -157,27 +161,11 @@ public Pipeline createPipeline(ReplicationFactor replicationFactor, * Remove the pipeline from active allocation. * @param pipeline pipeline to be finalized */ - public synchronized void finalizePipeline(Pipeline pipeline) { - activePipelines.get(pipeline.getFactor().ordinal()) - .removePipeline(pipeline.getId()); - } + public abstract boolean finalizePipeline(Pipeline pipeline); /** * * @param pipeline */ public abstract void closePipeline(Pipeline pipeline) throws IOException; - - /** - * list members in the pipeline. - * @return the datanode - */ - public abstract List getMembers(PipelineID pipelineID) - throws IOException; - - /** - * Update the datanode list of the pipeline. - */ - public abstract void updatePipeline(PipelineID pipelineID, - List newDatanodes) throws IOException; } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineReportHandler.java new file mode 100644 index 0000000000..933792bee3 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineReportHandler.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.pipelines; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; +import org.apache.hadoop.hdds.scm.server + .SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode; +import org.apache.hadoop.hdds.server.events.EventHandler; +import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Handles Node Reports from datanode. + */ +public class PipelineReportHandler implements + EventHandler { + + private static final Logger LOGGER = LoggerFactory + .getLogger(PipelineReportHandler.class); + private final PipelineSelector pipelineSelector; + + public PipelineReportHandler(PipelineSelector pipelineSelector) { + Preconditions.checkNotNull(pipelineSelector); + this.pipelineSelector = pipelineSelector; + } + + @Override + public void onMessage(PipelineReportFromDatanode pipelineReportFromDatanode, + EventPublisher publisher) { + Preconditions.checkNotNull(pipelineReportFromDatanode); + DatanodeDetails dn = pipelineReportFromDatanode.getDatanodeDetails(); + PipelineReportsProto pipelineReport = + pipelineReportFromDatanode.getReport(); + Preconditions.checkNotNull(dn, "Pipeline Report is " + + "missing DatanodeDetails."); + LOGGER.trace("Processing pipeline report for dn: {}", dn); + pipelineSelector.processPipelineReport(dn, pipelineReport); + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java index 82946bdd7d..59d937ef7d 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java @@ -16,9 +16,12 @@ */ package org.apache.hadoop.hdds.scm.pipelines; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.StorageUnit; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; @@ -30,6 +33,7 @@ import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.node.states.Node2PipelineMap; import org.apache.hadoop.hdds.scm.pipelines.ratis.RatisManagerImpl; import org.apache.hadoop.hdds.scm.pipelines.standalone.StandaloneManagerImpl; import org.apache.hadoop.hdds.protocol.DatanodeDetails; @@ -75,11 +79,9 @@ public class PipelineSelector { private static final Logger LOG = LoggerFactory.getLogger(PipelineSelector.class); private final ContainerPlacementPolicy placementPolicy; - private final NodeManager nodeManager; + private final Map pipelineManagerMap; private final Configuration conf; private final EventPublisher eventPublisher; - private final RatisManagerImpl ratisManager; - private final StandaloneManagerImpl standaloneManager; private final long containerSize; private final MetadataStore pipelineStore; private final PipelineStateManager stateManager; @@ -96,7 +98,6 @@ public class PipelineSelector { */ public PipelineSelector(NodeManager nodeManager, Configuration conf, EventPublisher eventPublisher, int cacheSizeMB) throws IOException { - this.nodeManager = nodeManager; this.conf = conf; this.eventPublisher = eventPublisher; this.placementPolicy = createContainerPlacementPolicy(nodeManager, conf); @@ -106,12 +107,14 @@ public PipelineSelector(NodeManager nodeManager, Configuration conf, StorageUnit.BYTES); node2PipelineMap = new Node2PipelineMap(); pipelineMap = new ConcurrentHashMap<>(); - this.standaloneManager = - new StandaloneManagerImpl(this.nodeManager, placementPolicy, - containerSize); - this.ratisManager = - new RatisManagerImpl(this.nodeManager, placementPolicy, containerSize, - conf); + pipelineManagerMap = new HashMap<>(); + + pipelineManagerMap.put(ReplicationType.STAND_ALONE, + new StandaloneManagerImpl(nodeManager, placementPolicy, + containerSize)); + pipelineManagerMap.put(ReplicationType.RATIS, + new RatisManagerImpl(nodeManager, placementPolicy, + containerSize, conf)); long pipelineCreationLeaseTimeout = conf.getTimeDuration( ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT, ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT_DEFAULT, @@ -154,6 +157,7 @@ private void reloadExistingPipelines() throws IOException { } } + @VisibleForTesting public Set getOpenContainerIDsByPipeline(PipelineID pipelineID) { return pipeline2ContainerMap.get(pipelineID); } @@ -226,30 +230,6 @@ private static ContainerPlacementPolicy createContainerPlacementPolicy( } } - /** - * Return the pipeline manager from the replication type. - * - * @param replicationType - Replication Type Enum. - * @return pipeline Manager. - * @throws IllegalArgumentException If an pipeline type gets added - * and this function is not modified we will throw. - */ - private PipelineManager getPipelineManager(ReplicationType replicationType) - throws IllegalArgumentException { - switch (replicationType) { - case RATIS: - return this.ratisManager; - case STAND_ALONE: - return this.standaloneManager; - case CHAINED: - throw new IllegalArgumentException("Not implemented yet"); - default: - throw new IllegalArgumentException("Unexpected enum found. Does not" + - " know how to handle " + replicationType.toString()); - } - - } - /** * This function is called by the Container Manager while allocating a new * container. The client specifies what kind of replication pipeline is needed @@ -260,7 +240,7 @@ private PipelineManager getPipelineManager(ReplicationType replicationType) public Pipeline getReplicationPipeline(ReplicationType replicationType, HddsProtos.ReplicationFactor replicationFactor) throws IOException { - PipelineManager manager = getPipelineManager(replicationType); + PipelineManager manager = pipelineManagerMap.get(replicationType); Preconditions.checkNotNull(manager, "Found invalid pipeline manager"); LOG.debug("Getting replication pipeline forReplicationType {} :" + " ReplicationFactor {}", replicationType.toString(), @@ -316,7 +296,7 @@ public Pipeline getPipeline(PipelineID pipelineID) { * Finalize a given pipeline. */ public void finalizePipeline(Pipeline pipeline) throws IOException { - PipelineManager manager = getPipelineManager(pipeline.getType()); + PipelineManager manager = pipelineManagerMap.get(pipeline.getType()); Preconditions.checkNotNull(manager, "Found invalid pipeline manager"); if (pipeline.getLifeCycleState() == LifeCycleState.CLOSING || pipeline.getLifeCycleState() == LifeCycleState.CLOSED) { @@ -327,17 +307,17 @@ public void finalizePipeline(Pipeline pipeline) throws IOException { } // Remove the pipeline from active allocation - manager.finalizePipeline(pipeline); - - LOG.info("Finalizing pipeline. pipelineID: {}", pipeline.getId()); - updatePipelineState(pipeline, HddsProtos.LifeCycleEvent.FINALIZE); - closePipelineIfNoOpenContainers(pipeline); + if (manager.finalizePipeline(pipeline)) { + LOG.info("Finalizing pipeline. pipelineID: {}", pipeline.getId()); + updatePipelineState(pipeline, HddsProtos.LifeCycleEvent.FINALIZE); + closePipelineIfNoOpenContainers(pipeline); + } } /** * Close a given pipeline. */ - public void closePipelineIfNoOpenContainers(Pipeline pipeline) + private void closePipelineIfNoOpenContainers(Pipeline pipeline) throws IOException { if (pipeline.getLifeCycleState() != LifeCycleState.CLOSING) { return; @@ -354,7 +334,7 @@ public void closePipelineIfNoOpenContainers(Pipeline pipeline) * Close a given pipeline. */ private void closePipeline(Pipeline pipeline) throws IOException { - PipelineManager manager = getPipelineManager(pipeline.getType()); + PipelineManager manager = pipelineManagerMap.get(pipeline.getType()); Preconditions.checkNotNull(manager, "Found invalid pipeline manager"); LOG.debug("Closing pipeline. pipelineID: {}", pipeline.getId()); HashSet containers = @@ -367,7 +347,7 @@ private void closePipeline(Pipeline pipeline) throws IOException { * Add to a given pipeline. */ private void addOpenPipeline(Pipeline pipeline) { - PipelineManager manager = getPipelineManager(pipeline.getType()); + PipelineManager manager = pipelineManagerMap.get(pipeline.getType()); Preconditions.checkNotNull(manager, "Found invalid pipeline manager"); LOG.debug("Adding Open pipeline. pipelineID: {}", pipeline.getId()); manager.addOpenPipeline(pipeline); @@ -381,7 +361,7 @@ private void closeContainersByPipeline(Pipeline pipeline) { } } - public Set getPipelineId(UUID dnId) { + public Set getPipelineByDnID(UUID dnId) { return node2PipelineMap.getPipelines(dnId); } @@ -400,6 +380,9 @@ private void addExistingPipeline(Pipeline pipeline) throws IOException { pipelineMap.put(pipeline.getId(), pipeline); pipeline2ContainerMap.put(pipeline.getId(), new HashSet<>()); node2PipelineMap.addPipeline(pipeline); + // reset the datanodes in the pipeline + // they will be reset on + pipeline.resetPipeline(); break; case CLOSED: // if the pipeline is in closed state, nothing to do. @@ -409,6 +392,36 @@ private void addExistingPipeline(Pipeline pipeline) throws IOException { } } + public void handleStaleNode(DatanodeDetails dn) { + Set pipelineIDs = getPipelineByDnID(dn.getUuid()); + for (PipelineID id : pipelineIDs) { + LOG.info("closing pipeline {}.", id); + eventPublisher.fireEvent(SCMEvents.PIPELINE_CLOSE, id); + } + } + + void processPipelineReport(DatanodeDetails dn, + PipelineReportsProto pipelineReport) { + Set reportedPipelines = new HashSet<>(); + pipelineReport.getPipelineReportList(). + forEach(p -> + reportedPipelines.add( + processPipelineReport(p.getPipelineID(), dn))); + + //TODO: handle missing pipelines and new pipelines later + } + + private PipelineID processPipelineReport( + HddsProtos.PipelineID id, DatanodeDetails dn) { + PipelineID pipelineID = PipelineID.getFromProtobuf(id); + Pipeline pipeline = pipelineMap.get(pipelineID); + if (pipeline != null) { + pipelineManagerMap.get(pipeline.getType()) + .processPipelineReport(pipeline, dn); + } + return pipelineID; + } + /** * Update the Pipeline State to the next state. * diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java index d3cec882bb..905a5b553b 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java @@ -73,20 +73,19 @@ public RatisManagerImpl(NodeManager nodeManager, public Pipeline allocatePipeline(ReplicationFactor factor) { List newNodesList = new LinkedList<>(); List datanodes = nodeManager.getNodes(NodeState.HEALTHY); - int count = getReplicationCount(factor); //TODO: Add Raft State to the Nodes, so we can query and skip nodes from // data from datanode instead of maintaining a set. for (DatanodeDetails datanode : datanodes) { Preconditions.checkNotNull(datanode); if (!ratisMembers.contains(datanode)) { newNodesList.add(datanode); - if (newNodesList.size() == count) { + if (newNodesList.size() == factor.getNumber()) { // once a datanode has been added to a pipeline, exclude it from // further allocations ratisMembers.addAll(newNodesList); PipelineID pipelineID = PipelineID.randomId(); LOG.info("Allocating a new ratis pipeline of size: {} id: {}", - count, pipelineID); + factor.getNumber(), pipelineID); return PipelineSelector.newPipelineFromNodes(newNodesList, ReplicationType.RATIS, factor, pipelineID); } @@ -103,6 +102,17 @@ public void initializePipeline(Pipeline pipeline) throws IOException { } } + public void processPipelineReport(Pipeline pipeline, DatanodeDetails dn) { + super.processPipelineReport(pipeline, dn); + ratisMembers.add(dn); + } + + public synchronized boolean finalizePipeline(Pipeline pipeline) { + activePipelines.get(pipeline.getFactor().ordinal()) + .removePipeline(pipeline.getId()); + return true; + } + /** * Close the pipeline. */ @@ -116,29 +126,4 @@ public void closePipeline(Pipeline pipeline) throws IOException { Preconditions.checkArgument(ratisMembers.remove(node)); } } - - /** - * list members in the pipeline . - * - * @param pipelineID - * @return the datanode - */ - @Override - public List getMembers(PipelineID pipelineID) - throws IOException { - return null; - } - - /** - * Update the datanode list of the pipeline. - * - * @param pipelineID - * @param newDatanodes - */ - @Override - public void updatePipeline(PipelineID pipelineID, - List newDatanodes) - throws IOException { - - } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java index ed2fc2fe68..045afb6ceb 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java @@ -74,18 +74,19 @@ public StandaloneManagerImpl(NodeManager nodeManager, public Pipeline allocatePipeline(ReplicationFactor factor) { List newNodesList = new LinkedList<>(); List datanodes = nodeManager.getNodes(NodeState.HEALTHY); - int count = getReplicationCount(factor); for (DatanodeDetails datanode : datanodes) { Preconditions.checkNotNull(datanode); if (!standAloneMembers.contains(datanode)) { newNodesList.add(datanode); - if (newNodesList.size() == count) { + if (newNodesList.size() == factor.getNumber()) { // once a datanode has been added to a pipeline, exclude it from // further allocations standAloneMembers.addAll(newNodesList); - PipelineID pipelineID = PipelineID.randomId(); + // Standalone pipeline use node id as pipeline + PipelineID pipelineID = + PipelineID.valueOf(newNodesList.get(0).getUuid()); LOG.info("Allocating a new standalone pipeline of size: {} id: {}", - count, pipelineID); + factor.getNumber(), pipelineID); return PipelineSelector.newPipelineFromNodes(newNodesList, ReplicationType.STAND_ALONE, ReplicationFactor.ONE, pipelineID); } @@ -98,6 +99,17 @@ public void initializePipeline(Pipeline pipeline) { // Nothing to be done for standalone pipeline } + public void processPipelineReport(Pipeline pipeline, DatanodeDetails dn) { + super.processPipelineReport(pipeline, dn); + standAloneMembers.add(dn); + } + + public synchronized boolean finalizePipeline(Pipeline pipeline) { + activePipelines.get(pipeline.getFactor().ordinal()) + .removePipeline(pipeline.getId()); + return false; + } + /** * Close the pipeline. */ @@ -107,28 +119,4 @@ public void closePipeline(Pipeline pipeline) throws IOException { Preconditions.checkArgument(standAloneMembers.remove(node)); } } - - /** - * list members in the pipeline . - * - * @param pipelineID - * @return the datanode - */ - @Override - public List getMembers(PipelineID pipelineID) - throws IOException { - return null; - } - - /** - * Update the datanode list of the pipeline. - * - * @param pipelineID - * @param newDatanodes - */ - @Override - public void updatePipeline(PipelineID pipelineID, List - newDatanodes) throws IOException { - - } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java index a651f62371..e65de8ba35 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java @@ -19,6 +19,8 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.PipelineActionsProto; import org.apache.hadoop.hdds.protocol.proto @@ -46,6 +48,7 @@ import static org.apache.hadoop.hdds.scm.events.SCMEvents.NODE_REPORT; import static org.apache.hadoop.hdds.scm.events.SCMEvents.CMD_STATUS_REPORT; import static org.apache.hadoop.hdds.scm.events.SCMEvents.PIPELINE_ACTIONS; +import static org.apache.hadoop.hdds.scm.events.SCMEvents.PIPELINE_REPORT; /** * This class is responsible for dispatching heartbeat from datanode to @@ -103,6 +106,14 @@ public List dispatch(SCMHeartbeatRequestProto heartbeat) { heartbeat.getContainerActions())); } + if (heartbeat.hasPipelineReports()) { + LOG.debug("Dispatching Pipeline Report."); + eventPublisher.fireEvent(PIPELINE_REPORT, + new PipelineReportFromDatanode(datanodeDetails, + heartbeat.getPipelineReports())); + + } + if (heartbeat.hasPipelineActions()) { LOG.debug("Dispatching Pipeline Actions."); eventPublisher.fireEvent(PIPELINE_ACTIONS, @@ -178,6 +189,18 @@ public ContainerActionsFromDatanode(DatanodeDetails datanodeDetails, } } + /** + * Pipeline report event payload with origin. + */ + public static class PipelineReportFromDatanode + extends ReportFromDatanode { + + public PipelineReportFromDatanode(DatanodeDetails datanodeDetails, + PipelineReportsProto report) { + super(datanodeDetails, report); + } + } + /** * Pipeline action event payload with origin. */ diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java index 8a09dc899d..4a0d3e5f70 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java @@ -33,6 +33,8 @@ .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto; import org.apache.hadoop.hdds.protocol.proto @@ -74,7 +76,10 @@ import org.apache.hadoop.hdds.scm.HddsServerUtil; import org.apache.hadoop.hdds.scm.events.SCMEvents; -import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ReportFromDatanode; +import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher + .ReportFromDatanode; +import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher + .PipelineReportFromDatanode; import org.apache.hadoop.hdds.server.events.EventPublisher; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.ProtobufRpcEngine; @@ -102,6 +107,7 @@ import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HANDLER_COUNT_DEFAULT; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HANDLER_COUNT_KEY; +import static org.apache.hadoop.hdds.scm.events.SCMEvents.PIPELINE_REPORT; import static org.apache.hadoop.hdds.scm.server.StorageContainerManager.startRpcServer; import static org.apache.hadoop.hdds.server.ServerUtils.updateRPCListenAddress; @@ -190,13 +196,14 @@ public SCMVersionResponseProto getVersion(SCMVersionRequestProto public SCMRegisteredResponseProto register( HddsProtos.DatanodeDetailsProto datanodeDetailsProto, NodeReportProto nodeReport, - ContainerReportsProto containerReportsProto) + ContainerReportsProto containerReportsProto, + PipelineReportsProto pipelineReportsProto) throws IOException { DatanodeDetails datanodeDetails = DatanodeDetails .getFromProtoBuf(datanodeDetailsProto); // TODO : Return the list of Nodes that forms the SCM HA. RegisteredCommand registeredCommand = scm.getScmNodeManager() - .register(datanodeDetails, nodeReport); + .register(datanodeDetails, nodeReport, pipelineReportsProto); if (registeredCommand.getError() == SCMRegisteredResponseProto.ErrorCode.success) { scm.getScmContainerManager().processContainerReports(datanodeDetails, @@ -204,6 +211,9 @@ public SCMRegisteredResponseProto register( eventPublisher.fireEvent(SCMEvents.NODE_REGISTRATION_CONT_REPORT, new NodeRegistrationContainerReport(datanodeDetails, containerReportsProto)); + eventPublisher.fireEvent(PIPELINE_REPORT, + new PipelineReportFromDatanode(datanodeDetails, + pipelineReportsProto)); } return getRegisteredResponse(registeredCommand); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index 8e76606387..21691490ed 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -64,6 +64,7 @@ import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap; import org.apache.hadoop.hdds.scm.pipelines.PipelineCloseHandler; import org.apache.hadoop.hdds.scm.pipelines.PipelineActionEventHandler; +import org.apache.hadoop.hdds.scm.pipelines.PipelineReportHandler; import org.apache.hadoop.hdds.server.ServiceRuntimeInfoImpl; import org.apache.hadoop.hdds.server.events.EventPublisher; import org.apache.hadoop.hdds.server.events.EventQueue; @@ -217,13 +218,16 @@ private StorageContainerManager(OzoneConfiguration conf) throws IOException { new CloseContainerEventHandler(scmContainerManager); NodeReportHandler nodeReportHandler = new NodeReportHandler(scmNodeManager); - + PipelineReportHandler pipelineReportHandler = + new PipelineReportHandler( + scmContainerManager.getPipelineSelector()); CommandStatusReportHandler cmdStatusReportHandler = new CommandStatusReportHandler(); NewNodeHandler newNodeHandler = new NewNodeHandler(node2ContainerMap); StaleNodeHandler staleNodeHandler = - new StaleNodeHandler(node2ContainerMap, scmContainerManager); + new StaleNodeHandler(node2ContainerMap, + scmContainerManager.getPipelineSelector()); DeadNodeHandler deadNodeHandler = new DeadNodeHandler(node2ContainerMap, getScmContainerManager().getStateManager()); ContainerActionsHandler actionsHandler = new ContainerActionsHandler(); @@ -240,7 +244,7 @@ private StorageContainerManager(OzoneConfiguration conf) throws IOException { new PipelineActionEventHandler(); PipelineCloseHandler pipelineCloseHandler = - new PipelineCloseHandler(scmContainerManager); + new PipelineCloseHandler(scmContainerManager.getPipelineSelector()); long watcherTimeout = conf.getTimeDuration(ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT, @@ -300,6 +304,7 @@ private StorageContainerManager(OzoneConfiguration conf) throws IOException { eventQueue.addHandler(SCMEvents.CHILL_MODE_STATUS, (BlockManagerImpl) scmBlockManager); eventQueue.addHandler(SCMEvents.CHILL_MODE_STATUS, clientProtocolServer); + eventQueue.addHandler(SCMEvents.PIPELINE_REPORT, pipelineReportHandler); registerMXBean(); } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java index 7af9dda4fb..24a16c77bb 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java @@ -17,6 +17,8 @@ package org.apache.hadoop.hdds.scm; import com.google.common.base.Preconditions; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; import org.mockito.Mockito; import static org.mockito.Mockito.when; @@ -139,7 +141,8 @@ private static DatanodeDetails createDatanodeDetails(String uuid, public static DatanodeDetails createRandomDatanodeAndRegister( SCMNodeManager nodeManager) { return getDatanodeDetails( - nodeManager.register(randomDatanodeDetails(), null)); + nodeManager.register(randomDatanodeDetails(), null, + getRandomPipelineReports())); } /** @@ -299,6 +302,11 @@ public static ContainerReportsProto getRandomContainerReports( return getContainerReports(containerInfos); } + + public static PipelineReportsProto getRandomPipelineReports() { + return PipelineReportsProto.newBuilder().build(); + } + /** * Creates container report with the given ContainerInfo(s). * diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java index 088b7005cd..21e44a3168 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java @@ -16,6 +16,8 @@ */ package org.apache.hadoop.hdds.scm.container; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; import org.apache.hadoop.hdds.scm.TestUtils; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; @@ -356,7 +358,7 @@ public VersionResponse getVersion(SCMVersionRequestProto versionRequest) { */ @Override public RegisteredCommand register(DatanodeDetails datanodeDetails, - NodeReportProto nodeReport) { + NodeReportProto nodeReport, PipelineReportsProto pipelineReportsProto) { return null; } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java index f438c8bc23..cbe96eee84 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java @@ -286,7 +286,8 @@ public void testScmHeartbeatAfterRestart() throws Exception { TestUtils.createStorageReport(dnId, storagePath, 100, 10, 90, null); try (SCMNodeManager nodemanager = createNodeManager(conf)) { nodemanager.register(datanodeDetails, - TestUtils.createNodeReport(report)); + TestUtils.createNodeReport(report), + TestUtils.getRandomPipelineReports()); List command = nodemanager.processHeartbeat(datanodeDetails); Assert.assertTrue(nodemanager.getAllNodes().contains(datanodeDetails)); Assert.assertTrue("On regular HB calls, SCM responses a " @@ -1122,7 +1123,8 @@ public void testHandlingSCMCommandEvent() { eq.addHandler(DATANODE_COMMAND, nodemanager); nodemanager - .register(datanodeDetails, TestUtils.createNodeReport(report)); + .register(datanodeDetails, TestUtils.createNodeReport(report), + TestUtils.getRandomPipelineReports()); eq.fireEvent(DATANODE_COMMAND, new CommandForDatanode<>(datanodeDetails.getUuid(), new CloseContainerCommand(1L, ReplicationType.STAND_ALONE, diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/TestNode2ContainerMap.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/TestNode2ContainerMap.java index 14a74e9b16..ec1d5279d0 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/TestNode2ContainerMap.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/TestNode2ContainerMap.java @@ -116,7 +116,7 @@ public void testProcessReportCheckOneNode() throws SCMException { Assert.assertTrue(map.isKnownDatanode(key)); ReportResult result = map.processReport(key, values); Assert.assertEquals(result.getStatus(), - Node2ContainerMap.ReportStatus.ALL_IS_WELL); + ReportResult.ReportStatus.ALL_IS_WELL); } @Test @@ -181,9 +181,9 @@ public void testProcessReportDetectNewDataNode() throws SCMException { UUID key = getFirstKey(); TreeSet values = testData.get(key); ReportResult result = map.processReport(key, values); - Assert.assertEquals(Node2ContainerMap.ReportStatus.NEW_DATANODE_FOUND, + Assert.assertEquals(ReportResult.ReportStatus.NEW_DATANODE_FOUND, result.getStatus()); - Assert.assertEquals(result.getNewContainers().size(), values.size()); + Assert.assertEquals(result.getNewEntries().size(), values.size()); } /** @@ -216,15 +216,15 @@ public void testProcessReportDetectNewContainers() throws SCMException { ReportResult result = map.processReport(key, newContainersSet); //Assert that expected size of missing container is same as addedContainers - Assert.assertEquals(Node2ContainerMap.ReportStatus.NEW_CONTAINERS_FOUND, + Assert.assertEquals(ReportResult.ReportStatus.NEW_ENTRIES_FOUND, result.getStatus()); Assert.assertEquals(addedContainers.size(), - result.getNewContainers().size()); + result.getNewEntries().size()); // Assert that the Container IDs are the same as we added new. Assert.assertTrue("All objects are not removed.", - result.getNewContainers().removeAll(addedContainers)); + result.getNewEntries().removeAll(addedContainers)); } /** @@ -261,14 +261,14 @@ public void testProcessReportDetectMissingContainers() throws SCMException { //Assert that expected size of missing container is same as addedContainers - Assert.assertEquals(Node2ContainerMap.ReportStatus.MISSING_CONTAINERS, + Assert.assertEquals(ReportResult.ReportStatus.MISSING_ENTRIES, result.getStatus()); Assert.assertEquals(removedContainers.size(), - result.getMissingContainers().size()); + result.getMissingEntries().size()); // Assert that the Container IDs are the same as we added new. Assert.assertTrue("All missing containers not found.", - result.getMissingContainers().removeAll(removedContainers)); + result.getMissingEntries().removeAll(removedContainers)); } @Test @@ -307,21 +307,21 @@ public void testProcessReportDetectNewAndMissingContainers() throws Assert.assertEquals( - Node2ContainerMap.ReportStatus.MISSING_AND_NEW_CONTAINERS_FOUND, + ReportResult.ReportStatus.MISSING_AND_NEW_ENTRIES_FOUND, result.getStatus()); Assert.assertEquals(removedContainers.size(), - result.getMissingContainers().size()); + result.getMissingEntries().size()); // Assert that the Container IDs are the same as we added new. Assert.assertTrue("All missing containers not found.", - result.getMissingContainers().removeAll(removedContainers)); + result.getMissingEntries().removeAll(removedContainers)); Assert.assertEquals(insertedSet.size(), - result.getNewContainers().size()); + result.getNewEntries().size()); // Assert that the Container IDs are the same as we added new. Assert.assertTrue("All inserted containers are not found.", - result.getNewContainers().removeAll(insertedSet)); + result.getNewEntries().removeAll(insertedSet)); } } \ No newline at end of file diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java index a513f6c54e..390746f4dc 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java @@ -280,7 +280,8 @@ public void testRegister() throws Exception { .register(nodeToRegister.getProtoBufMessage(), TestUtils .createNodeReport( getStorageReports(nodeToRegister.getUuid())), - TestUtils.getRandomContainerReports(10)); + TestUtils.getRandomContainerReports(10), + TestUtils.getRandomPipelineReports()); Assert.assertNotNull(responseProto); Assert.assertEquals(nodeToRegister.getUuidString(), responseProto.getDatanodeUUID()); @@ -308,6 +309,8 @@ private EndpointStateMachine registerTaskHelper(InetSocketAddress scmAddress, .createNodeReport(getStorageReports(UUID.randomUUID()))); when(ozoneContainer.getContainerReport()).thenReturn( TestUtils.getRandomContainerReports(10)); + when(ozoneContainer.getPipelineReport()).thenReturn( + TestUtils.getRandomPipelineReports()); RegisterEndpointTask endpointTask = new RegisterEndpointTask(rpcEndPoint, conf, ozoneContainer, mock(StateContext.class)); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java index a0249aaa0e..e8a6892895 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java @@ -17,6 +17,8 @@ package org.apache.hadoop.ozone.container.testutils; import com.google.common.base.Preconditions; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; import org.apache.hadoop.hdds.scm.node.CommandQueue; @@ -252,7 +254,8 @@ public VersionResponse getVersion(SCMVersionRequestProto versionRequest) { */ @Override public RegisteredCommand register(DatanodeDetails dd, - NodeReportProto nodeReport) { + NodeReportProto nodeReport, + PipelineReportsProto pipelineReportsProto) { return null; } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java index aefa6b0c5a..ad3798ea36 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java @@ -98,7 +98,7 @@ public void testPipelineMap() throws IOException { // get pipeline details by dnid Set pipelines = mapping.getPipelineSelector() - .getPipelineId(dns.get(0).getUuid()); + .getPipelineByDnID(dns.get(0).getUuid()); Assert.assertEquals(1, pipelines.size()); pipelines.forEach(p -> Assert.assertEquals(p, ratisContainer.getPipeline().getId())); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java index a5828e117a..5eabfb9106 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java @@ -119,7 +119,7 @@ public void testPipelineCloseWithClosedContainer() throws IOException { HddsProtos.LifeCycleState.CLOSED); for (DatanodeDetails dn : ratisContainer1.getPipeline().getMachines()) { // Assert that the pipeline has been removed from Node2PipelineMap as well - Assert.assertEquals(pipelineSelector.getPipelineId( + Assert.assertEquals(pipelineSelector.getPipelineByDnID( dn.getUuid()).size(), 0); } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java index 3999d76658..fb94b3c221 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdds.scm.pipeline; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.scm.container.ContainerMapping; import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; import org.apache.hadoop.hdds.scm.server.StorageContainerManager; @@ -87,7 +88,7 @@ public static void shutdown() { } @Test - public void testPipelineWithScmRestart() { + public void testPipelineWithScmRestart() throws IOException { // After restart make sure that the pipeline are still present Pipeline ratisPipeline1AfterRestart = newMapping.getPipelineSelector() .getPipeline(ratisPipeline1.getId()); @@ -97,5 +98,22 @@ public void testPipelineWithScmRestart() { Assert.assertNotSame(ratisPipeline2AfterRestart, ratisPipeline2); Assert.assertEquals(ratisPipeline1AfterRestart, ratisPipeline1); Assert.assertEquals(ratisPipeline2AfterRestart, ratisPipeline2); + + for (DatanodeDetails dn : ratisPipeline1.getMachines()) { + Assert.assertEquals(dn, ratisPipeline1AfterRestart.getDatanodes() + .get(dn.getUuidString())); + } + + for (DatanodeDetails dn : ratisPipeline2.getMachines()) { + Assert.assertEquals(dn, ratisPipeline2AfterRestart.getDatanodes() + .get(dn.getUuidString())); + } + + // Try creating a new ratis pipeline, it should be from the same pipeline + // as was before restart + Pipeline newRatisPipeline = + newMapping.allocateContainer(RATIS, THREE, "Owner1") + .getPipeline(); + Assert.assertEquals(newRatisPipeline.getId(), ratisPipeline1.getId()); } } \ No newline at end of file diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java index 1cb2cda87d..a83c16e8e8 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java @@ -36,8 +36,12 @@ import java.io.Closeable; import java.io.IOException; import java.net.URISyntaxException; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; + /** * Helpers for Ratis tests. */ @@ -60,6 +64,7 @@ class RatisTestSuite implements Closeable { public RatisTestSuite() throws IOException, TimeoutException, InterruptedException { conf = newOzoneConfiguration(RPC); + cluster = newMiniOzoneCluster(NUM_DATANODES, conf); } @@ -96,6 +101,8 @@ static OzoneConfiguration newOzoneConfiguration(RpcType rpc) { static void initRatisConf(RpcType rpc, Configuration conf) { conf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY, true); conf.set(OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY, rpc.name()); + conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 1, TimeUnit.SECONDS); + conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 30, TimeUnit.SECONDS); LOG.info(OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY + " = " + rpc.name()); } @@ -104,6 +111,8 @@ static MiniOzoneCluster newMiniOzoneCluster( int numDatanodes, OzoneConfiguration conf) throws IOException, TimeoutException, InterruptedException { final MiniOzoneCluster cluster = MiniOzoneCluster.newBuilder(conf) + .setHbInterval(1000) + .setHbProcessorInterval(1000) .setNumDatanodes(numDatanodes).build(); cluster.waitForClusterToBeReady(); return cluster; diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java index 6377f11bac..02cd985609 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java @@ -136,6 +136,7 @@ public void init() throws Exception { ozoneCluster = MiniOzoneCluster.newBuilder(conf) .setNumDatanodes(1) .setHbInterval(1000) + .setHbProcessorInterval(1000) .build(); ozoneCluster.waitForClusterToBeReady(); client = new RpcClient(conf); @@ -328,7 +329,6 @@ private static void restartDatanode(MiniOzoneCluster cluster, int datanodeIdx) cluster.restartHddsDatanode(datanodeIdx); } - @Ignore("Causes a JVm exit") @Test public void testPutAndGetKeyWithDnRestart() throws Exception { runTestPutAndGetKeyWithDnRestart( diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/client/TestKeysRatis.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/client/TestKeysRatis.java index 915d0f6b65..2e8f539452 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/client/TestKeysRatis.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/client/TestKeysRatis.java @@ -26,7 +26,6 @@ import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; -import org.junit.Ignore; import org.junit.rules.Timeout; import static org.apache.hadoop.ozone.web.client @@ -83,7 +82,6 @@ public void testPutKey() throws Exception { getMultiPartKey(delimiter))); } - @Ignore("disabling for now, datanodes restart with ratis is buggy") @Test public void testPutAndGetKeyWithDnRestart() throws Exception { runTestPutAndGetKeyWithDnRestart( diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index 275ae6e1d8..caf6d4f9d3 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -101,7 +101,7 @@ 1.0.0-M33 - 0.3.0-50588bd-SNAPSHOT + 0.3.0-eca3531-SNAPSHOT 1.0-alpha-1 3.3.1 2.4.12