diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java index d1895a8985..cb677c272c 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java @@ -19,11 +19,13 @@ import java.io.FileInputStream; import java.nio.file.Files; import java.nio.file.Path; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMCommandProto; import org.apache.hadoop.hdds.protocol.proto @@ -44,6 +46,7 @@ import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; +import com.google.common.base.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -97,13 +100,19 @@ public void handle(SCMCommand command, OzoneContainer container, ReplicateContainerCommand replicateCommand = (ReplicateContainerCommand) command; try { - + List sourceDatanodes = + replicateCommand.getSourceDatanodes(); long containerID = replicateCommand.getContainerID(); + + Preconditions.checkArgument(sourceDatanodes.size() > 0, + String.format("Replication command is received for container %d " + + "but the size of source datanodes was 0.", containerID)); + LOG.info("Starting replication of container {} from {}", containerID, - replicateCommand.getSourceDatanodes()); + sourceDatanodes); CompletableFuture tempTarFile = downloader .getContainerDataFromReplicas(containerID, - replicateCommand.getSourceDatanodes()); + sourceDatanodes); CompletableFuture result = tempTarFile.thenAccept(path -> { diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReplicateContainerCommandHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReplicateContainerCommandHandler.java index 6a14d333e2..6529922fa5 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReplicateContainerCommandHandler.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReplicateContainerCommandHandler.java @@ -106,7 +106,6 @@ public void handleWithErrors() throws TimeoutException, InterruptedException { handler.handle(command, null, Mockito.mock(StateContext.class), null); //THEN - TestGenericTestUtils .waitFor(() -> downloader.futureByContainers.size() == 1, 100, 2000); @@ -124,6 +123,24 @@ public void handleWithErrors() throws TimeoutException, InterruptedException { 2000); } + /** + * Can't handle a command if there are no source replicas. + */ + @Test(expected = IllegalArgumentException.class) + public void handleWithoutReplicas() + throws TimeoutException, InterruptedException { + //GIVEN + ReplicateContainerCommand commandWithoutReplicas = + new ReplicateContainerCommand(1L, new ArrayList<>()); + + //WHEN + handler + .handle(commandWithoutReplicas, + null, + Mockito.mock(StateContext.class), + null); + + } private static class StubDownloader implements ContainerDownloader { private Map> futureByContainers = diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java index 4a980f77a6..ddecdbcfa5 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Objects; +import java.util.Set; import java.util.concurrent.ThreadFactory; import org.apache.hadoop.hdds.protocol.DatanodeDetails; @@ -116,7 +117,15 @@ public void run() { //check the current replication List datanodesWithReplicas = - getCurrentReplicas(request); + new ArrayList<>(getCurrentReplicas(request)); + + if (datanodesWithReplicas.size() == 0) { + LOG.warn( + "Container {} should be replicated but can't find any existing " + + "replicas", + containerID); + return; + } ReplicationRequest finalRequest = request; @@ -165,11 +174,10 @@ public void run() { } @VisibleForTesting - protected List getCurrentReplicas(ReplicationRequest request) + protected Set getCurrentReplicas(ReplicationRequest request) throws IOException { - //TODO: replication information is not yet available after HDDS-175, - // should be fixed after HDDS-228 - return new ArrayList<>(); + return containerStateManager + .getContainerReplicas(new ContainerID(request.getContainerId())); } @VisibleForTesting diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java index da0591392a..06beb7c174 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java @@ -18,6 +18,8 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Objects; @@ -26,27 +28,22 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ReplicateContainerCommandProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ReplicateContainerCommandProto; import org.apache.hadoop.hdds.scm.TestUtils; +import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerStateManager; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; -import org.apache.hadoop.hdds.scm.container.placement.algorithms - .ContainerPlacementPolicy; -import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager - .ReplicationRequestToRepeat; +import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicy; +import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager.ReplicationRequestToRepeat; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.server.events.EventQueue; import org.apache.hadoop.ozone.lease.LeaseManager; import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; import com.google.common.base.Preconditions; -import static org.apache.hadoop.hdds.scm.events.SCMEvents - .TRACK_REPLICATE_COMMAND; -import org.apache.log4j.Level; -import org.apache.log4j.Logger; +import static org.apache.hadoop.hdds.scm.events.SCMEvents.TRACK_REPLICATE_COMMAND; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -69,6 +66,8 @@ public class TestReplicationManager { private ContainerPlacementPolicy containerPlacementPolicy; private List listOfDatanodeDetails; + private LeaseManager leaseManager; + private ReplicationManager replicationManager; @Before public void initReplicationManager() throws IOException { @@ -86,7 +85,6 @@ public void initReplicationManager() throws IOException { containerStateManager = Mockito.mock(ContainerStateManager.class); - //container with 2 replicas ContainerInfo containerInfo = new ContainerInfo.Builder() .setState(LifeCycleState.CLOSED) .build(); @@ -94,6 +92,16 @@ public void initReplicationManager() throws IOException { when(containerStateManager.getContainer(anyObject())) .thenReturn(containerInfo); + when(containerStateManager.getContainerReplicas(new ContainerID(1L))) + .thenReturn(new HashSet<>(Arrays.asList( + listOfDatanodeDetails.get(0), + listOfDatanodeDetails.get(1) + ))); + + + when(containerStateManager.getContainerReplicas(new ContainerID(3L))) + .thenReturn(new HashSet<>()); + queue = new EventQueue(); trackReplicationEvents = new ArrayList<>(); @@ -104,32 +112,53 @@ public void initReplicationManager() throws IOException { queue.addHandler(SCMEvents.DATANODE_COMMAND, (event, publisher) -> copyEvents.add(event)); + leaseManager = new LeaseManager<>("Test", 100000L); + + replicationManager = new ReplicationManager(containerPlacementPolicy, + containerStateManager, queue, leaseManager); + + + + } + + /** + * Container should be replicated but no source replicas. + */ + @Test() + public void testNoExistingReplicas() throws InterruptedException { + try { + leaseManager.start(); + replicationManager.start(); + + //WHEN + queue.fireEvent(SCMEvents.REPLICATE_CONTAINER, + new ReplicationRequest(3L, (short) 2, System.currentTimeMillis(), + (short) 3)); + + Thread.sleep(500L); + queue.processAll(1000L); + + //THEN + Assert.assertEquals(0, trackReplicationEvents.size()); + Assert.assertEquals(0, copyEvents.size()); + + } finally { + if (leaseManager != null) { + leaseManager.shutdown(); + } + } } @Test public void testEventSending() throws InterruptedException, IOException { - //GIVEN - - LeaseManager leaseManager = new LeaseManager<>("Test", 100000L); try { leaseManager.start(); - ReplicationManager replicationManager = - new ReplicationManager(containerPlacementPolicy, - containerStateManager, - queue, leaseManager) { - @Override - protected List getCurrentReplicas( - ReplicationRequest request) throws IOException { - return listOfDatanodeDetails.subList(0, 2); - } - }; replicationManager.start(); //WHEN - queue.fireEvent(SCMEvents.REPLICATE_CONTAINER, new ReplicationRequest(1L, (short) 2, System.currentTimeMillis(), (short) 3)); @@ -138,7 +167,6 @@ protected List getCurrentReplicas( queue.processAll(1000L); //THEN - Assert.assertEquals(1, trackReplicationEvents.size()); Assert.assertEquals(1, copyEvents.size()); } finally { @@ -150,22 +178,14 @@ protected List getCurrentReplicas( @Test public void testCommandWatcher() throws InterruptedException, IOException { + LeaseManager rapidLeaseManager = + new LeaseManager<>("Test", 1000L); - Logger.getRootLogger().setLevel(Level.DEBUG); - LeaseManager leaseManager = new LeaseManager<>("Test", 1000L); + replicationManager = new ReplicationManager(containerPlacementPolicy, + containerStateManager, queue, rapidLeaseManager); try { - leaseManager.start(); - - ReplicationManager replicationManager = - new ReplicationManager(containerPlacementPolicy, - containerStateManager, queue, leaseManager) { - @Override - protected List getCurrentReplicas( - ReplicationRequest request) throws IOException { - return listOfDatanodeDetails.subList(0, 2); - } - }; + rapidLeaseManager.start(); replicationManager.start(); queue.fireEvent(SCMEvents.REPLICATE_CONTAINER, @@ -192,8 +212,8 @@ protected List getCurrentReplicas( Assert.assertEquals(2, copyEvents.size()); } finally { - if (leaseManager != null) { - leaseManager.shutdown(); + if (rapidLeaseManager != null) { + rapidLeaseManager.shutdown(); } } } @@ -209,7 +229,7 @@ public static Pipeline createPipeline(Iterable ids) ReplicationType.STAND_ALONE, ReplicationFactor.ONE, PipelineID.randomId()); pipeline.addMember(leader); - for (; i.hasNext(); ) { + while (i.hasNext()) { pipeline.addMember(i.next()); } return pipeline;