HDDS-432. Replication of closed containers is not working.

Contributed by Elek, Marton.
This commit is contained in:
Anu Engineer 2018-09-11 17:00:04 -07:00
parent a406f6f60e
commit 9c238ffc30
4 changed files with 105 additions and 51 deletions

View File

@ -19,11 +19,13 @@
import java.io.FileInputStream; import java.io.FileInputStream;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.List;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMCommandProto; .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.hdds.protocol.proto import org.apache.hadoop.hdds.protocol.proto
@ -44,6 +46,7 @@
import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand; import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import com.google.common.base.Preconditions;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -97,13 +100,19 @@ public void handle(SCMCommand command, OzoneContainer container,
ReplicateContainerCommand replicateCommand = ReplicateContainerCommand replicateCommand =
(ReplicateContainerCommand) command; (ReplicateContainerCommand) command;
try { try {
List<DatanodeDetails> sourceDatanodes =
replicateCommand.getSourceDatanodes();
long containerID = replicateCommand.getContainerID(); long containerID = replicateCommand.getContainerID();
Preconditions.checkArgument(sourceDatanodes.size() > 0,
String.format("Replication command is received for container %d "
+ "but the size of source datanodes was 0.", containerID));
LOG.info("Starting replication of container {} from {}", containerID, LOG.info("Starting replication of container {} from {}", containerID,
replicateCommand.getSourceDatanodes()); sourceDatanodes);
CompletableFuture<Path> tempTarFile = downloader CompletableFuture<Path> tempTarFile = downloader
.getContainerDataFromReplicas(containerID, .getContainerDataFromReplicas(containerID,
replicateCommand.getSourceDatanodes()); sourceDatanodes);
CompletableFuture<Void> result = CompletableFuture<Void> result =
tempTarFile.thenAccept(path -> { tempTarFile.thenAccept(path -> {

View File

@ -106,7 +106,6 @@ public void handleWithErrors() throws TimeoutException, InterruptedException {
handler.handle(command, null, Mockito.mock(StateContext.class), null); handler.handle(command, null, Mockito.mock(StateContext.class), null);
//THEN //THEN
TestGenericTestUtils TestGenericTestUtils
.waitFor(() -> downloader.futureByContainers.size() == 1, 100, 2000); .waitFor(() -> downloader.futureByContainers.size() == 1, 100, 2000);
@ -124,6 +123,24 @@ public void handleWithErrors() throws TimeoutException, InterruptedException {
2000); 2000);
} }
/**
* Can't handle a command if there are no source replicas.
*/
@Test(expected = IllegalArgumentException.class)
public void handleWithoutReplicas()
throws TimeoutException, InterruptedException {
//GIVEN
ReplicateContainerCommand commandWithoutReplicas =
new ReplicateContainerCommand(1L, new ArrayList<>());
//WHEN
handler
.handle(commandWithoutReplicas,
null,
Mockito.mock(StateContext.class),
null);
}
private static class StubDownloader implements ContainerDownloader { private static class StubDownloader implements ContainerDownloader {
private Map<Long, CompletableFuture<Path>> futureByContainers = private Map<Long, CompletableFuture<Path>> futureByContainers =

View File

@ -20,6 +20,7 @@
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@ -116,7 +117,15 @@ public void run() {
//check the current replication //check the current replication
List<DatanodeDetails> datanodesWithReplicas = List<DatanodeDetails> datanodesWithReplicas =
getCurrentReplicas(request); new ArrayList<>(getCurrentReplicas(request));
if (datanodesWithReplicas.size() == 0) {
LOG.warn(
"Container {} should be replicated but can't find any existing "
+ "replicas",
containerID);
return;
}
ReplicationRequest finalRequest = request; ReplicationRequest finalRequest = request;
@ -165,11 +174,10 @@ public void run() {
} }
@VisibleForTesting @VisibleForTesting
protected List<DatanodeDetails> getCurrentReplicas(ReplicationRequest request) protected Set<DatanodeDetails> getCurrentReplicas(ReplicationRequest request)
throws IOException { throws IOException {
//TODO: replication information is not yet available after HDDS-175, return containerStateManager
// should be fixed after HDDS-228 .getContainerReplicas(new ContainerID(request.getContainerId()));
return new ArrayList<>();
} }
@VisibleForTesting @VisibleForTesting

View File

@ -18,6 +18,8 @@
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
@ -26,27 +28,22 @@
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.protocol.proto import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ReplicateContainerCommandProto;
.StorageContainerDatanodeProtocolProtos.ReplicateContainerCommandProto;
import org.apache.hadoop.hdds.scm.TestUtils; import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerStateManager; import org.apache.hadoop.hdds.scm.container.ContainerStateManager;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
import org.apache.hadoop.hdds.scm.container.placement.algorithms import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicy;
.ContainerPlacementPolicy; import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager.ReplicationRequestToRepeat;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager
.ReplicationRequestToRepeat;
import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.server.events.EventQueue; import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.ozone.lease.LeaseManager; import org.apache.hadoop.ozone.lease.LeaseManager;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import static org.apache.hadoop.hdds.scm.events.SCMEvents import static org.apache.hadoop.hdds.scm.events.SCMEvents.TRACK_REPLICATE_COMMAND;
.TRACK_REPLICATE_COMMAND;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -69,6 +66,8 @@ public class TestReplicationManager {
private ContainerPlacementPolicy containerPlacementPolicy; private ContainerPlacementPolicy containerPlacementPolicy;
private List<DatanodeDetails> listOfDatanodeDetails; private List<DatanodeDetails> listOfDatanodeDetails;
private LeaseManager<Long> leaseManager;
private ReplicationManager replicationManager;
@Before @Before
public void initReplicationManager() throws IOException { public void initReplicationManager() throws IOException {
@ -86,7 +85,6 @@ public void initReplicationManager() throws IOException {
containerStateManager = Mockito.mock(ContainerStateManager.class); containerStateManager = Mockito.mock(ContainerStateManager.class);
//container with 2 replicas
ContainerInfo containerInfo = new ContainerInfo.Builder() ContainerInfo containerInfo = new ContainerInfo.Builder()
.setState(LifeCycleState.CLOSED) .setState(LifeCycleState.CLOSED)
.build(); .build();
@ -94,6 +92,16 @@ public void initReplicationManager() throws IOException {
when(containerStateManager.getContainer(anyObject())) when(containerStateManager.getContainer(anyObject()))
.thenReturn(containerInfo); .thenReturn(containerInfo);
when(containerStateManager.getContainerReplicas(new ContainerID(1L)))
.thenReturn(new HashSet<>(Arrays.asList(
listOfDatanodeDetails.get(0),
listOfDatanodeDetails.get(1)
)));
when(containerStateManager.getContainerReplicas(new ContainerID(3L)))
.thenReturn(new HashSet<>());
queue = new EventQueue(); queue = new EventQueue();
trackReplicationEvents = new ArrayList<>(); trackReplicationEvents = new ArrayList<>();
@ -104,32 +112,53 @@ public void initReplicationManager() throws IOException {
queue.addHandler(SCMEvents.DATANODE_COMMAND, queue.addHandler(SCMEvents.DATANODE_COMMAND,
(event, publisher) -> copyEvents.add(event)); (event, publisher) -> copyEvents.add(event));
leaseManager = new LeaseManager<>("Test", 100000L);
replicationManager = new ReplicationManager(containerPlacementPolicy,
containerStateManager, queue, leaseManager);
}
/**
* Container should be replicated but no source replicas.
*/
@Test()
public void testNoExistingReplicas() throws InterruptedException {
try {
leaseManager.start();
replicationManager.start();
//WHEN
queue.fireEvent(SCMEvents.REPLICATE_CONTAINER,
new ReplicationRequest(3L, (short) 2, System.currentTimeMillis(),
(short) 3));
Thread.sleep(500L);
queue.processAll(1000L);
//THEN
Assert.assertEquals(0, trackReplicationEvents.size());
Assert.assertEquals(0, copyEvents.size());
} finally {
if (leaseManager != null) {
leaseManager.shutdown();
}
}
} }
@Test @Test
public void testEventSending() throws InterruptedException, IOException { public void testEventSending() throws InterruptedException, IOException {
//GIVEN //GIVEN
LeaseManager<Long> leaseManager = new LeaseManager<>("Test", 100000L);
try { try {
leaseManager.start(); leaseManager.start();
ReplicationManager replicationManager =
new ReplicationManager(containerPlacementPolicy,
containerStateManager,
queue, leaseManager) {
@Override
protected List<DatanodeDetails> getCurrentReplicas(
ReplicationRequest request) throws IOException {
return listOfDatanodeDetails.subList(0, 2);
}
};
replicationManager.start(); replicationManager.start();
//WHEN //WHEN
queue.fireEvent(SCMEvents.REPLICATE_CONTAINER, queue.fireEvent(SCMEvents.REPLICATE_CONTAINER,
new ReplicationRequest(1L, (short) 2, System.currentTimeMillis(), new ReplicationRequest(1L, (short) 2, System.currentTimeMillis(),
(short) 3)); (short) 3));
@ -138,7 +167,6 @@ protected List<DatanodeDetails> getCurrentReplicas(
queue.processAll(1000L); queue.processAll(1000L);
//THEN //THEN
Assert.assertEquals(1, trackReplicationEvents.size()); Assert.assertEquals(1, trackReplicationEvents.size());
Assert.assertEquals(1, copyEvents.size()); Assert.assertEquals(1, copyEvents.size());
} finally { } finally {
@ -150,22 +178,14 @@ protected List<DatanodeDetails> getCurrentReplicas(
@Test @Test
public void testCommandWatcher() throws InterruptedException, IOException { public void testCommandWatcher() throws InterruptedException, IOException {
LeaseManager<Long> rapidLeaseManager =
new LeaseManager<>("Test", 1000L);
Logger.getRootLogger().setLevel(Level.DEBUG); replicationManager = new ReplicationManager(containerPlacementPolicy,
LeaseManager<Long> leaseManager = new LeaseManager<>("Test", 1000L); containerStateManager, queue, rapidLeaseManager);
try { try {
leaseManager.start(); rapidLeaseManager.start();
ReplicationManager replicationManager =
new ReplicationManager(containerPlacementPolicy,
containerStateManager, queue, leaseManager) {
@Override
protected List<DatanodeDetails> getCurrentReplicas(
ReplicationRequest request) throws IOException {
return listOfDatanodeDetails.subList(0, 2);
}
};
replicationManager.start(); replicationManager.start();
queue.fireEvent(SCMEvents.REPLICATE_CONTAINER, queue.fireEvent(SCMEvents.REPLICATE_CONTAINER,
@ -192,8 +212,8 @@ protected List<DatanodeDetails> getCurrentReplicas(
Assert.assertEquals(2, copyEvents.size()); Assert.assertEquals(2, copyEvents.size());
} finally { } finally {
if (leaseManager != null) { if (rapidLeaseManager != null) {
leaseManager.shutdown(); rapidLeaseManager.shutdown();
} }
} }
} }
@ -209,7 +229,7 @@ public static Pipeline createPipeline(Iterable<DatanodeDetails> ids)
ReplicationType.STAND_ALONE, ReplicationFactor.ONE, ReplicationType.STAND_ALONE, ReplicationFactor.ONE,
PipelineID.randomId()); PipelineID.randomId());
pipeline.addMember(leader); pipeline.addMember(leader);
for (; i.hasNext(); ) { while (i.hasNext()) {
pipeline.addMember(i.next()); pipeline.addMember(i.next());
} }
return pipeline; return pipeline;