HDDS-400. Check global replication state for containers of dead node. Contributed by Elek, Marton.

This commit is contained in:
Hanisha Koneru 2018-09-07 11:20:25 -07:00
parent ff64d35716
commit ab90248b30
8 changed files with 279 additions and 76 deletions

View File

@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.container;
import java.io.IOException;
@ -23,18 +22,16 @@
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.replication
.ReplicationActivityStatus;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationActivityStatus;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap;
import org.apache.hadoop.hdds.scm.node.states.ReportResult;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
.ContainerReportFromDatanode;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
@ -59,22 +56,21 @@ public class ContainerReportHandler implements
private ReplicationActivityStatus replicationStatus;
public ContainerReportHandler(Mapping containerMapping,
Node2ContainerMap node2ContainerMap,
ReplicationActivityStatus replicationActivityStatus) {
Preconditions.checkNotNull(containerMapping);
Preconditions.checkNotNull(node2ContainerMap);
Preconditions.checkNotNull(replicationActivityStatus);
this.containerStateManager = containerMapping.getStateManager();
this.containerMapping = containerMapping;
this.node2ContainerMap = node2ContainerMap;
this.containerStateManager = containerMapping.getStateManager();
this.replicationStatus = replicationActivityStatus;
}
@Override
public void onMessage(ContainerReportFromDatanode containerReportFromDatanode,
EventPublisher publisher) {
EventPublisher publisher) {
DatanodeDetails datanodeOrigin =
containerReportFromDatanode.getDatanodeDetails();
@ -88,7 +84,8 @@ public void onMessage(ContainerReportFromDatanode containerReportFromDatanode,
.processContainerReports(datanodeOrigin, containerReport, false);
Set<ContainerID> containerIds = containerReport.getReportsList().stream()
.map(containerProto -> containerProto.getContainerID())
.map(StorageContainerDatanodeProtocolProtos
.ContainerInfo::getContainerID)
.map(ContainerID::new)
.collect(Collectors.toSet());
@ -102,13 +99,12 @@ public void onMessage(ContainerReportFromDatanode containerReportFromDatanode,
for (ContainerID containerID : reportResult.getMissingContainers()) {
containerStateManager
.removeContainerReplica(containerID, datanodeOrigin);
emitReplicationRequestEvent(containerID, publisher);
checkReplicationState(containerID, publisher);
}
for (ContainerID containerID : reportResult.getNewContainers()) {
containerStateManager.addContainerReplica(containerID, datanodeOrigin);
emitReplicationRequestEvent(containerID, publisher);
checkReplicationState(containerID, publisher);
}
} catch (IOException e) {
@ -119,8 +115,9 @@ public void onMessage(ContainerReportFromDatanode containerReportFromDatanode,
}
private void emitReplicationRequestEvent(ContainerID containerID,
EventPublisher publisher) throws SCMException {
private void checkReplicationState(ContainerID containerID,
EventPublisher publisher)
throws SCMException {
ContainerInfo container = containerStateManager.getContainer(containerID);
if (container == null) {
@ -134,18 +131,18 @@ private void emitReplicationRequestEvent(ContainerID containerID,
if (container.isContainerOpen()) {
return;
}
if (replicationStatus.isReplicationEnabled()) {
int existingReplicas =
containerStateManager.getContainerReplicas(containerID).size();
int expectedReplicas = container.getReplicationFactor().getNumber();
if (existingReplicas != expectedReplicas) {
ReplicationRequest replicationState =
containerStateManager.checkReplicationState(containerID);
if (replicationState != null) {
if (replicationStatus.isReplicationEnabled()) {
publisher.fireEvent(SCMEvents.REPLICATE_CONTAINER,
new ReplicationRequest(containerID.getId(), existingReplicas,
container.getReplicationFactor().getNumber()));
replicationState);
} else {
LOG.warn(
"Over/under replicated container but the replication is not "
+ "(yet) enabled: "
+ replicationState.toString());
}
}

View File

@ -19,6 +19,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@ -27,6 +28,7 @@
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest;
import org.apache.hadoop.hdds.scm.container.states.ContainerState;
import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
@ -40,6 +42,7 @@
.InvalidStateTransitionException;
import org.apache.hadoop.ozone.common.statemachine.StateMachine;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -148,7 +151,7 @@ public ContainerStateManager(Configuration configuration,
finalStates);
initializeStateMachine();
this.containerSize =(long)configuration.getStorageSize(
this.containerSize = (long) configuration.getStorageSize(
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT,
StorageUnit.BYTES);
@ -399,7 +402,7 @@ public ContainerInfo getMatchingContainer(final long size,
// container ID.
ContainerState key = new ContainerState(owner, type, factor);
ContainerID lastID = lastUsedMap.get(key);
if(lastID == null) {
if (lastID == null) {
lastID = matchingSet.first();
}
@ -426,7 +429,7 @@ public ContainerInfo getMatchingContainer(final long size,
selectedContainer = findContainerWithSpace(size, resultSet, owner);
}
// Update the allocated Bytes on this container.
if(selectedContainer != null) {
if (selectedContainer != null) {
selectedContainer.updateAllocatedBytes(size);
}
return selectedContainer;
@ -539,9 +542,36 @@ public boolean removeContainerReplica(ContainerID containerID,
DatanodeDetails dn) throws SCMException {
return containers.removeContainerReplica(containerID, dn);
}
/**
* Compare the existing replication number with the expected one.
*/
public ReplicationRequest checkReplicationState(ContainerID containerID)
throws SCMException {
int existingReplicas = getContainerReplicas(containerID).size();
int expectedReplicas = getContainer(containerID)
.getReplicationFactor().getNumber();
if (existingReplicas != expectedReplicas) {
return new ReplicationRequest(containerID.getId(), existingReplicas,
expectedReplicas);
}
return null;
}
/**
* Checks if the container is open.
*/
public boolean isOpen(ContainerID containerID) {
Preconditions.checkNotNull(containerID);
ContainerInfo container = Preconditions
.checkNotNull(getContainer(containerID),
"Container can't be found " + containerID);
return container.isContainerOpen();
}
@VisibleForTesting
public ContainerStateMap getContainerStateMap() {
return containers;
}
}

View File

@ -23,6 +23,8 @@
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerStateManager;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap;
import org.apache.hadoop.hdds.server.events.EventHandler;
@ -62,6 +64,16 @@ public void onMessage(DatanodeDetails datanodeDetails,
try {
containerStateManager.removeContainerReplica(container,
datanodeDetails);
if (!containerStateManager.isOpen(container)) {
ReplicationRequest replicationRequest =
containerStateManager.checkReplicationState(container);
if (replicationRequest != null) {
publisher.fireEvent(SCMEvents.REPLICATE_CONTAINER,
replicationRequest);
}
}
} catch (SCMException e) {
LOG.error("Can't remove container from containerStateMap {}", container
.getId(), e);

View File

@ -17,6 +17,11 @@
package org.apache.hadoop.hdds.scm;
import com.google.common.base.Preconditions;
import org.mockito.Mockito;
import static org.mockito.Mockito.when;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerInfo;
import org.apache.hadoop.hdds.protocol.proto
@ -31,12 +36,18 @@
.StorageContainerDatanodeProtocolProtos.StorageReportProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.StorageTypeProto;
import org.apache.hadoop.hdds.scm.container.ContainerStateManager;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.node.SCMNodeManager;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@ -376,5 +387,39 @@ public static CommandStatusReportsProto createCommandStatusReport(
return report.build();
}
public static
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo
allocateContainer(ContainerStateManager containerStateManager)
throws IOException {
PipelineSelector pipelineSelector = Mockito.mock(PipelineSelector.class);
Pipeline pipeline = new Pipeline("leader", HddsProtos.LifeCycleState.CLOSED,
HddsProtos.ReplicationType.STAND_ALONE,
HddsProtos.ReplicationFactor.THREE,
PipelineID.randomId());
when(pipelineSelector
.getReplicationPipeline(HddsProtos.ReplicationType.STAND_ALONE,
HddsProtos.ReplicationFactor.THREE)).thenReturn(pipeline);
return containerStateManager
.allocateContainer(pipelineSelector,
HddsProtos.ReplicationType.STAND_ALONE,
HddsProtos.ReplicationFactor.THREE, "root").getContainerInfo();
}
public static void closeContainer(ContainerStateManager containerStateManager,
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo
container)
throws SCMException {
containerStateManager.getContainerStateMap()
.updateState(container, container.getState(), LifeCycleState.CLOSING);
containerStateManager.getContainerStateMap()
.updateState(container, container.getState(), LifeCycleState.CLOSED);
}
}

View File

@ -71,9 +71,7 @@ public void resetEventCollector() {
@Test
public void test() throws IOException {
//given
//GIVEN
OzoneConfiguration conf = new OzoneConfiguration();
Node2ContainerMap node2ContainerMap = new Node2ContainerMap();
Mapping mapping = Mockito.mock(Mapping.class);
@ -133,19 +131,9 @@ public void test() throws IOException {
long c3 = cont3.getContainerID();
// Close remaining containers
try {
containerStateManager.getContainerStateMap()
.updateState(cont1, cont1.getState(), LifeCycleState.CLOSING);
containerStateManager.getContainerStateMap()
.updateState(cont1, cont1.getState(), LifeCycleState.CLOSED);
containerStateManager.getContainerStateMap()
.updateState(cont2, cont2.getState(), LifeCycleState.CLOSING);
containerStateManager.getContainerStateMap()
.updateState(cont2, cont2.getState(), LifeCycleState.CLOSED);
TestUtils.closeContainer(containerStateManager, cont1);
TestUtils.closeContainer(containerStateManager, cont2);
} catch (IOException e) {
LOG.info("Failed to change state of open containers.", e);
}
//when
//initial reports before replication is enabled. 2 containers w 3 replicas.

View File

@ -0,0 +1,96 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdds.scm.container;
import java.io.IOException;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
/**
* Testing ContainerStatemanager.
*/
public class TestContainerStateManager {
private ContainerStateManager containerStateManager;
@Before
public void init() throws IOException {
OzoneConfiguration conf = new OzoneConfiguration();
Mapping mapping = Mockito.mock(Mapping.class);
containerStateManager = new ContainerStateManager(conf, mapping);
}
@Test
public void checkReplicationStateOK() throws IOException {
//GIVEN
ContainerInfo c1 = TestUtils.allocateContainer(containerStateManager);
DatanodeDetails d1 = TestUtils.randomDatanodeDetails();
DatanodeDetails d2 = TestUtils.randomDatanodeDetails();
DatanodeDetails d3 = TestUtils.randomDatanodeDetails();
addReplica(c1, d1);
addReplica(c1, d2);
addReplica(c1, d3);
//WHEN
ReplicationRequest replicationRequest = containerStateManager
.checkReplicationState(new ContainerID(c1.getContainerID()));
//THEN
Assert.assertNull(replicationRequest);
}
@Test
public void checkReplicationStateMissingReplica() throws IOException {
//GIVEN
ContainerInfo c1 = TestUtils.allocateContainer(containerStateManager);
DatanodeDetails d1 = TestUtils.randomDatanodeDetails();
DatanodeDetails d2 = TestUtils.randomDatanodeDetails();
addReplica(c1, d1);
addReplica(c1, d2);
//WHEN
ReplicationRequest replicationRequest = containerStateManager
.checkReplicationState(new ContainerID(c1.getContainerID()));
Assert
.assertEquals(c1.getContainerID(), replicationRequest.getContainerId());
Assert.assertEquals(2, replicationRequest.getReplicationCount());
Assert.assertEquals(3, replicationRequest.getExpecReplicationCount());
}
private void addReplica(ContainerInfo c1, DatanodeDetails d1) {
containerStateManager
.addContainerReplica(new ContainerID(c1.getContainerID()), d1);
}
}

View File

@ -18,76 +18,76 @@
package org.apache.hadoop.hdds.scm.node;
import java.util.HashSet;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerInfo;
import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerStateManager;
import org.apache.hadoop.hdds.scm.container.Mapping;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import static org.mockito.Matchers.eq;
import org.mockito.Mockito;
/**
* Test DeadNodeHandler.
*/
public class TestDeadNodeHandler {
private List<ReplicationRequest> sentEvents = new ArrayList<>();
@Test
public void testOnMessage() throws SCMException {
public void testOnMessage() throws IOException {
//GIVEN
DatanodeDetails datanode1 = TestUtils.randomDatanodeDetails();
DatanodeDetails datanode2 = TestUtils.randomDatanodeDetails();
ContainerInfo container1 = TestUtils.getRandomContainerInfo(1);
ContainerInfo container2 = TestUtils.getRandomContainerInfo(2);
ContainerInfo container3 = TestUtils.getRandomContainerInfo(3);
Node2ContainerMap node2ContainerMap = new Node2ContainerMap();
ContainerStateManager containerStateManager = new ContainerStateManager(
new OzoneConfiguration(),
Mockito.mock(Mapping.class)
);
ContainerInfo container1 =
TestUtils.allocateContainer(containerStateManager);
ContainerInfo container2 =
TestUtils.allocateContainer(containerStateManager);
ContainerInfo container3 =
TestUtils.allocateContainer(containerStateManager);
DeadNodeHandler handler =
new DeadNodeHandler(node2ContainerMap, containerStateManager);
node2ContainerMap
.insertNewDatanode(datanode1.getUuid(), new HashSet<ContainerID>() {{
add(new ContainerID(container1.getContainerID()));
add(new ContainerID(container2.getContainerID()));
}});
registerReplicas(node2ContainerMap, datanode1, container1, container2);
registerReplicas(node2ContainerMap, datanode2, container1, container3);
node2ContainerMap
.insertNewDatanode(datanode2.getUuid(), new HashSet<ContainerID>() {{
add(new ContainerID(container1.getContainerID()));
add(new ContainerID(container3.getContainerID()));
}});
registerReplicas(containerStateManager, container1, datanode1, datanode2);
registerReplicas(containerStateManager, container2, datanode1);
registerReplicas(containerStateManager, container3, datanode2);
containerStateManager.getContainerStateMap()
.addContainerReplica(new ContainerID(container1.getContainerID()),
datanode1, datanode2);
TestUtils.closeContainer(containerStateManager, container1);
containerStateManager.getContainerStateMap()
.addContainerReplica(new ContainerID(container2.getContainerID()),
datanode1);
containerStateManager.getContainerStateMap()
.addContainerReplica(new ContainerID(container3.getContainerID()),
datanode2);
EventPublisher publisher = Mockito.mock(EventPublisher.class);
//WHEN datanode1 is dead
handler.onMessage(datanode1, Mockito.mock(EventPublisher.class));
handler.onMessage(datanode1, publisher);
//THEN
//node2ContainerMap has not been changed
Assert.assertEquals(2, node2ContainerMap.size());
@ -108,5 +108,40 @@ public void testOnMessage() throws SCMException {
Assert.assertEquals(1, container3Replicas.size());
Assert.assertEquals(datanode2, container3Replicas.iterator().next());
ArgumentCaptor<ReplicationRequest> replicationRequestParameter =
ArgumentCaptor.forClass(ReplicationRequest.class);
Mockito.verify(publisher)
.fireEvent(eq(SCMEvents.REPLICATE_CONTAINER),
replicationRequestParameter.capture());
Assert
.assertEquals(container1.getContainerID(),
replicationRequestParameter.getValue().getContainerId());
Assert
.assertEquals(1,
replicationRequestParameter.getValue().getReplicationCount());
Assert
.assertEquals(3,
replicationRequestParameter.getValue().getExpecReplicationCount());
}
private void registerReplicas(ContainerStateManager containerStateManager,
ContainerInfo container, DatanodeDetails... datanodes) {
containerStateManager.getContainerStateMap()
.addContainerReplica(new ContainerID(container.getContainerID()),
datanodes);
}
private void registerReplicas(Node2ContainerMap node2ContainerMap,
DatanodeDetails datanode,
ContainerInfo... containers)
throws SCMException {
node2ContainerMap
.insertNewDatanode(datanode.getUuid(),
Arrays.stream(containers)
.map(container -> new ContainerID(container.getContainerID()))
.collect(Collectors.toSet()));
}
}

View File

@ -48,7 +48,7 @@
/**
* Tests for ContainerStateManager.
*/
public class TestContainerStateManager {
public class TestContainerStateManagerIntegration {
private OzoneConfiguration conf;
private MiniOzoneCluster cluster;