diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java index 875d0638d5..1bade8e279 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java @@ -40,7 +40,12 @@ .DeleteBlocksCommandHandler; import org.apache.hadoop.ozone.container.common.statemachine.commandhandler .ReplicateContainerCommandHandler; +import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker; import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; +import org.apache.hadoop.ozone.container.replication.ContainerReplicator; +import org.apache.hadoop.ozone.container.replication.DownloadAndImportReplicator; +import org.apache.hadoop.ozone.container.replication.ReplicationSupervisor; +import org.apache.hadoop.ozone.container.replication.SimpleContainerDownloader; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.apache.hadoop.util.Time; import org.apache.hadoop.util.concurrent.HadoopExecutors; @@ -69,6 +74,7 @@ public class DatanodeStateMachine implements Closeable { private AtomicLong nextHB; private Thread stateMachineThread = null; private Thread cmdProcessThread = null; + private final ReplicationSupervisor supervisor; /** * Constructs a a datanode state machine. @@ -89,14 +95,21 @@ public DatanodeStateMachine(DatanodeDetails datanodeDetails, new OzoneConfiguration(conf), context); nextHB = new AtomicLong(Time.monotonicNow()); + ContainerReplicator replicator = + new DownloadAndImportReplicator(container.getContainerSet(), + container.getDispatcher(), + new SimpleContainerDownloader(conf), new TarContainerPacker()); + + supervisor = + new ReplicationSupervisor(container.getContainerSet(), replicator, 10); + // When we add new handlers just adding a new handler here should do the // trick. commandDispatcher = CommandDispatcher.newBuilder() .addHandler(new CloseContainerCommandHandler()) .addHandler(new DeleteBlocksCommandHandler(container.getContainerSet(), conf)) - .addHandler(new ReplicateContainerCommandHandler(conf, - container.getContainerSet(), container.getDispatcher())) + .addHandler(new ReplicateContainerCommandHandler(conf, supervisor)) .setConnectionManager(connectionManager) .setContainer(container) .setContext(context) @@ -295,6 +308,7 @@ public DatanodeStates getNextState() { public void startDaemon() { Runnable startStateMachineTask = () -> { try { + supervisor.start(); start(); LOG.info("Ozone container server started."); } catch (Exception ex) { @@ -323,6 +337,7 @@ public void join() throws InterruptedException { */ public synchronized void stopDaemon() { try { + supervisor.stop(); context.setState(DatanodeStates.SHUTDOWN); reportManager.shutdown(); this.close(); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java index cb677c272c..09c379fd10 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java @@ -16,33 +16,17 @@ */ package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; -import java.io.FileInputStream; -import java.nio.file.Files; -import java.nio.file.Path; import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMCommandProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type; -import org.apache.hadoop.ozone.container.common.impl.ContainerData; -import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml; -import org.apache.hadoop.ozone.container.common.impl.ContainerSet; -import org.apache.hadoop.ozone.container.common.interfaces.Container; -import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; -import org.apache.hadoop.ozone.container.common.interfaces.Handler; -import org.apache.hadoop.ozone.container.common.statemachine - .SCMConnectionManager; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type; +import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; -import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker; import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; -import org.apache.hadoop.ozone.container.replication.ContainerDownloader; -import org.apache.hadoop.ozone.container.replication.SimpleContainerDownloader; +import org.apache.hadoop.ozone.container.replication.ReplicationSupervisor; +import org.apache.hadoop.ozone.container.replication.ReplicationTask; import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; @@ -58,39 +42,19 @@ public class ReplicateContainerCommandHandler implements CommandHandler { static final Logger LOG = LoggerFactory.getLogger(ReplicateContainerCommandHandler.class); - private ContainerDispatcher containerDispatcher; - private int invocationCount; private long totalTime; - private ContainerDownloader downloader; - private Configuration conf; - private TarContainerPacker packer = new TarContainerPacker(); - - private ContainerSet containerSet; - - private Lock lock = new ReentrantLock(); + private ReplicationSupervisor supervisor; public ReplicateContainerCommandHandler( Configuration conf, - ContainerSet containerSet, - ContainerDispatcher containerDispatcher, - ContainerDownloader downloader) { + ReplicationSupervisor supervisor) { this.conf = conf; - this.containerSet = containerSet; - this.downloader = downloader; - this.containerDispatcher = containerDispatcher; - } - - public ReplicateContainerCommandHandler( - Configuration conf, - ContainerSet containerSet, - ContainerDispatcher containerDispatcher) { - this(conf, containerSet, containerDispatcher, - new SimpleContainerDownloader(conf)); + this.supervisor = supervisor; } @Override @@ -108,72 +72,12 @@ public void handle(SCMCommand command, OzoneContainer container, String.format("Replication command is received for container %d " + "but the size of source datanodes was 0.", containerID)); - LOG.info("Starting replication of container {} from {}", containerID, - sourceDatanodes); - CompletableFuture tempTarFile = downloader - .getContainerDataFromReplicas(containerID, - sourceDatanodes); + ReplicationTask replicationTask = + new ReplicationTask(containerID, sourceDatanodes); + supervisor.addTask(replicationTask); - CompletableFuture result = - tempTarFile.thenAccept(path -> { - LOG.info("Container {} is downloaded, starting to import.", - containerID); - importContainer(containerID, path); - }); - - result.whenComplete((aVoid, throwable) -> { - if (throwable != null) { - LOG.error("Container replication was unsuccessful .", throwable); - } else { - LOG.info("Container {} is replicated successfully", containerID); - } - }); } finally { updateCommandStatus(context, command, true, LOG); - - } - } - - protected void importContainer(long containerID, Path tarFilePath) { - lock.lock(); - try { - ContainerData originalContainerData; - try (FileInputStream tempContainerTarStream = new FileInputStream( - tarFilePath.toFile())) { - byte[] containerDescriptorYaml = - packer.unpackContainerDescriptor(tempContainerTarStream); - originalContainerData = ContainerDataYaml.readContainer( - containerDescriptorYaml); - } - - try (FileInputStream tempContainerTarStream = new FileInputStream( - tarFilePath.toFile())) { - - Handler handler = containerDispatcher.getHandler( - originalContainerData.getContainerType()); - - Container container = handler.importContainer(containerID, - originalContainerData.getMaxSize(), - tempContainerTarStream, - packer); - - containerSet.addContainer(container); - } - - } catch (Exception e) { - LOG.error( - "Can't import the downloaded container data id=" + containerID, - e); - try { - Files.delete(tarFilePath); - } catch (Exception ex) { - LOG.error( - "Container import is failed and the downloaded file can't be " - + "deleted: " - + tarFilePath.toAbsolutePath().toString()); - } - } finally { - lock.unlock(); } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerReplicator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerReplicator.java new file mode 100644 index 0000000000..827b9d69e8 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerReplicator.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.replication; + +/** + * Service to do the real replication task. + * + * An implementation should download the container and im + */ +public interface ContainerReplicator { + void replicate(ReplicationTask task); +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java new file mode 100644 index 0000000000..5ef584184a --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.replication; + +import java.io.FileInputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.ozone.container.common.impl.ContainerData; +import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml; +import org.apache.hadoop.ozone.container.common.impl.ContainerSet; +import org.apache.hadoop.ozone.container.common.interfaces.Container; +import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; +import org.apache.hadoop.ozone.container.common.interfaces.Handler; +import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker; +import org.apache.hadoop.ozone.container.replication.ReplicationTask.Status; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Default replication implementation. + *

+ * This class does the real job. Executes the download and import the container + * to the container set. + */ +public class DownloadAndImportReplicator implements ContainerReplicator { + + private static final Logger LOG = + LoggerFactory.getLogger(DownloadAndImportReplicator.class); + + private final ContainerSet containerSet; + + private final ContainerDispatcher containerDispatcher; + + private final ContainerDownloader downloader; + + private final TarContainerPacker packer; + + public DownloadAndImportReplicator( + ContainerSet containerSet, + ContainerDispatcher containerDispatcher, + ContainerDownloader downloader, + TarContainerPacker packer) { + this.containerSet = containerSet; + this.containerDispatcher = containerDispatcher; + this.downloader = downloader; + this.packer = packer; + } + + public void importContainer(long containerID, Path tarFilePath) { + try { + ContainerData originalContainerData; + try (FileInputStream tempContainerTarStream = new FileInputStream( + tarFilePath.toFile())) { + byte[] containerDescriptorYaml = + packer.unpackContainerDescriptor(tempContainerTarStream); + originalContainerData = ContainerDataYaml.readContainer( + containerDescriptorYaml); + } + + try (FileInputStream tempContainerTarStream = new FileInputStream( + tarFilePath.toFile())) { + + Handler handler = containerDispatcher.getHandler( + originalContainerData.getContainerType()); + + Container container = handler.importContainer(containerID, + originalContainerData.getMaxSize(), + tempContainerTarStream, + packer); + + containerSet.addContainer(container); + } + + } catch (Exception e) { + LOG.error( + "Can't import the downloaded container data id=" + containerID, + e); + try { + Files.delete(tarFilePath); + } catch (Exception ex) { + LOG.error( + "Container import is failed and the downloaded file can't be " + + "deleted: " + + tarFilePath.toAbsolutePath().toString()); + } + } + } + + @Override + public void replicate(ReplicationTask task) { + long containerID = task.getContainerId(); + + List sourceDatanodes = task.getSources(); + + LOG.info("Starting replication of container {} from {}", containerID, + sourceDatanodes); + + CompletableFuture tempTarFile = downloader + .getContainerDataFromReplicas(containerID, + sourceDatanodes); + + try { + //wait for the download. This thread pool is limiting the paralell + //downloads, so it's ok to block here and wait for the full download. + Path path = tempTarFile.get(); + LOG.info("Container {} is downloaded, starting to import.", + containerID); + importContainer(containerID, path); + LOG.info("Container {} is replicated successfully", containerID); + task.setStatus(Status.DONE); + } catch (Exception e) { + LOG.error("Container replication was unsuccessful .", e); + task.setStatus(Status.FAILED); + } + } +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java index 91d098f0b0..3aafb0cb0e 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java @@ -157,8 +157,8 @@ public void onError(Throwable throwable) { public void onCompleted() { try { stream.close(); - response.complete(outputPath); LOG.info("Container is downloaded to {}", outputPath); + response.complete(outputPath); } catch (IOException e) { response.completeExceptionally(e); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java new file mode 100644 index 0000000000..1d8d5f655e --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.replication; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; + +import org.apache.hadoop.ozone.container.common.impl.ContainerSet; +import org.apache.hadoop.ozone.container.replication.ReplicationTask.Status; + +import com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Single point to schedule the downloading tasks based on priorities. + */ +public class ReplicationSupervisor { + + private static final Logger LOG = + LoggerFactory.getLogger(ReplicationSupervisor.class); + + private final Set threadPool = new HashSet<>(); + + private final Map queue = new TreeMap(); + + private final ContainerSet containerSet; + + private final ContainerReplicator replicator; + + private final int poolSize; + + public ReplicationSupervisor( + ContainerSet containerSet, + ContainerReplicator replicator, int poolSize) { + this.containerSet = containerSet; + this.replicator = replicator; + this.poolSize = poolSize; + } + + public synchronized void addTask(ReplicationTask task) { + queue.putIfAbsent(task.getContainerId(), task); + synchronized (threadPool) { + threadPool.notify(); + } + } + + public void start() { + for (int i = 0; i < poolSize; i++) { + Worker worker = new Worker(); + Thread thread = new Thread(worker, "ContainerReplication-" + i); + thread.setDaemon(true); + thread.start(); + threadPool.add(worker); + } + } + + public synchronized ReplicationTask selectTask() { + for (ReplicationTask task : queue.values()) { + if (task.getStatus() == Status.QUEUED) { + if (containerSet.getContainer(task.getContainerId()) == null) { + task.setStatus(Status.DOWNLOADING); + return task; + } else { + LOG.debug("Container {} has already been downloaded.", + task.getContainerId()); + queue.remove(task.getContainerId()); + } + } else if (task.getStatus() == Status.FAILED) { + LOG.error( + "Container {} can't be downloaded from any of the datanodes.", + task.getContainerId()); + queue.remove(task.getContainerId()); + } else if (task.getStatus() == Status.DONE) { + queue.remove(task.getContainerId()); + LOG.info("Container {} is replicated.", task.getContainerId()); + } + } + //no available task. + return null; + } + + public void stop() { + for (Worker worker : threadPool) { + worker.stop(); + } + } + + @VisibleForTesting + public int getQueueSize() { + return queue.size(); + } + + private class Worker implements Runnable { + + private boolean running = true; + + @Override + public void run() { + try { + while (running) { + ReplicationTask task = selectTask(); + if (task == null) { + synchronized (threadPool) { + threadPool.wait(); + } + } else { + replicator.replicate(task); + } + } + } catch (Exception ex) { + LOG.error("Error on doing replication", ex); + try { + Thread.sleep(200); + } catch (InterruptedException e) { + LOG.error("Error on waiting after failed replication task", e); + } + } + } + + public void stop() { + running = false; + } + } +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java new file mode 100644 index 0000000000..90198110b5 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.replication; + +import java.time.Instant; +import java.util.List; +import java.util.Objects; + +import org.apache.hadoop.hdds.protocol.DatanodeDetails; + +/** + * The task to download a container from the sources. + */ +public class ReplicationTask { + + private volatile Status status = Status.QUEUED; + + private final long containerId; + + private List sources; + + private final Instant queued = Instant.now(); + + public ReplicationTask(long containerId, + List sources) { + this.containerId = containerId; + this.sources = sources; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ReplicationTask that = (ReplicationTask) o; + return containerId == that.containerId; + } + + @Override + public int hashCode() { + return Objects.hash(containerId); + } + + public long getContainerId() { + return containerId; + } + + public List getSources() { + return sources; + } + + public Status getStatus() { + return status; + } + + public void setStatus( + Status status) { + this.status = status; + } + + @Override + public String toString() { + return "ReplicationTask{" + + "status=" + status + + ", containerId=" + containerId + + ", sources=" + sources + + ", queued=" + queued + + '}'; + } + + public Instant getQueued() { + return queued; + } + + /** + * Status of the replication. + */ + public enum Status { + QUEUED, + DOWNLOADING, + FAILED, + DONE + } +} diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReplicateContainerCommandHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReplicateContainerCommandHandler.java deleted file mode 100644 index 6529922fa5..0000000000 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReplicateContainerCommandHandler.java +++ /dev/null @@ -1,163 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; - -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeoutException; - -import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.ozone.container.common.impl.ContainerSet; -import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; -import org.apache.hadoop.ozone.container.common.statemachine.StateContext; -import org.apache.hadoop.ozone.container.replication.ContainerDownloader; -import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand; -import org.apache.hadoop.test.GenericTestUtils; -import org.apache.hadoop.test.TestGenericTestUtils; - -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.mockito.Mockito; - -/** - * Test replication command handler. - */ -public class TestReplicateContainerCommandHandler { - - private static final String EXCEPTION_MESSAGE = "Oh my god"; - private ReplicateContainerCommandHandler handler; - private StubDownloader downloader; - private ReplicateContainerCommand command; - private List importedContainerIds; - - @Before - public void init() { - importedContainerIds = new ArrayList<>(); - - OzoneConfiguration conf = new OzoneConfiguration(); - ContainerSet containerSet = Mockito.mock(ContainerSet.class); - ContainerDispatcher containerDispatcher = - Mockito.mock(ContainerDispatcher.class); - - downloader = new StubDownloader(); - - handler = new ReplicateContainerCommandHandler(conf, containerSet, - containerDispatcher, downloader) { - @Override - protected void importContainer(long containerID, Path tarFilePath) { - importedContainerIds.add(containerID); - } - }; - - //the command - ArrayList datanodeDetails = new ArrayList<>(); - datanodeDetails.add(Mockito.mock(DatanodeDetails.class)); - datanodeDetails.add(Mockito.mock(DatanodeDetails.class)); - - command = new ReplicateContainerCommand(1L, datanodeDetails); - } - - @Test - public void handle() throws TimeoutException, InterruptedException { - //GIVEN - - //WHEN - handler.handle(command, null, Mockito.mock(StateContext.class), null); - - TestGenericTestUtils - .waitFor(() -> downloader.futureByContainers.size() == 1, 100, 2000); - - Assert.assertNotNull(downloader.futureByContainers.get(1L)); - downloader.futureByContainers.get(1L).complete(Paths.get("/tmp/test")); - - TestGenericTestUtils - .waitFor(() -> importedContainerIds.size() == 1, 100, 2000); - } - - @Test - public void handleWithErrors() throws TimeoutException, InterruptedException { - //GIVEN - GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer - .captureLogs(ReplicateContainerCommandHandler.LOG); - - //WHEN - handler.handle(command, null, Mockito.mock(StateContext.class), null); - - //THEN - TestGenericTestUtils - .waitFor(() -> downloader.futureByContainers.size() == 1, 100, 2000); - - Assert.assertNotNull(downloader.futureByContainers.get(1L)); - downloader.futureByContainers.get(1L) - .completeExceptionally(new IllegalArgumentException( - EXCEPTION_MESSAGE)); - - TestGenericTestUtils - .waitFor(() -> { - String output = logCapturer.getOutput(); - return output.contains("unsuccessful") && output - .contains(EXCEPTION_MESSAGE); }, - 100, - 2000); - } - - /** - * Can't handle a command if there are no source replicas. - */ - @Test(expected = IllegalArgumentException.class) - public void handleWithoutReplicas() - throws TimeoutException, InterruptedException { - //GIVEN - ReplicateContainerCommand commandWithoutReplicas = - new ReplicateContainerCommand(1L, new ArrayList<>()); - - //WHEN - handler - .handle(commandWithoutReplicas, - null, - Mockito.mock(StateContext.class), - null); - - } - private static class StubDownloader implements ContainerDownloader { - - private Map> futureByContainers = - new HashMap<>(); - - @Override - public void close() { - - } - - @Override - public CompletableFuture getContainerDataFromReplicas( - long containerId, List sources) { - CompletableFuture future = new CompletableFuture<>(); - futureByContainers.put(containerId, future); - return future; - } - } - -} \ No newline at end of file diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java new file mode 100644 index 0000000000..d433319fa7 --- /dev/null +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.replication; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.ozone.container.common.impl.ContainerSet; +import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer; +import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; + +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +/** + * Test the replication supervisor. + */ +public class TestReplicationSupervisor { + + private OzoneConfiguration conf = new OzoneConfiguration(); + + @Test + public void normal() { + //GIVEN + ContainerSet set = new ContainerSet(); + + FakeReplicator replicator = new FakeReplicator(set); + ReplicationSupervisor supervisor = + new ReplicationSupervisor(set, replicator, 5); + + List datanodes = IntStream.range(1, 3) + .mapToObj(v -> Mockito.mock(DatanodeDetails.class)) + .collect(Collectors.toList()); + + try { + supervisor.start(); + //WHEN + supervisor.addTask(new ReplicationTask(1L, datanodes)); + supervisor.addTask(new ReplicationTask(1L, datanodes)); + supervisor.addTask(new ReplicationTask(1L, datanodes)); + supervisor.addTask(new ReplicationTask(2L, datanodes)); + supervisor.addTask(new ReplicationTask(2L, datanodes)); + supervisor.addTask(new ReplicationTask(3L, datanodes)); + try { + Thread.sleep(300); + } catch (InterruptedException e) { + e.printStackTrace(); + } + //THEN + System.out.println(replicator.replicated.get(0)); + + Assert + .assertEquals(3, replicator.replicated.size()); + + } finally { + supervisor.stop(); + } + } + + @Test + public void duplicateMessageAfterAWhile() throws InterruptedException { + //GIVEN + ContainerSet set = new ContainerSet(); + + FakeReplicator replicator = new FakeReplicator(set); + ReplicationSupervisor supervisor = + new ReplicationSupervisor(set, replicator, 2); + + List datanodes = IntStream.range(1, 3) + .mapToObj(v -> Mockito.mock(DatanodeDetails.class)) + .collect(Collectors.toList()); + + try { + supervisor.start(); + //WHEN + supervisor.addTask(new ReplicationTask(1L, datanodes)); + Thread.sleep(400); + supervisor.addTask(new ReplicationTask(1L, datanodes)); + Thread.sleep(300); + + //THEN + System.out.println(replicator.replicated.get(0)); + + Assert + .assertEquals(1, replicator.replicated.size()); + + //the last item is still in the queue as we cleanup the queue during the + // selection + Assert.assertEquals(1, supervisor.getQueueSize()); + + } finally { + supervisor.stop(); + } + } + + private class FakeReplicator implements ContainerReplicator { + + private List replicated = new ArrayList<>(); + + private ContainerSet containerSet; + + FakeReplicator(ContainerSet set) { + this.containerSet = set; + } + + @Override + public void replicate(ReplicationTask task) { + KeyValueContainerData kvcd = + new KeyValueContainerData(task.getContainerId(), 100L); + KeyValueContainer kvc = + new KeyValueContainer(kvcd, conf); + try { + //download is slow + Thread.sleep(100); + replicated.add(task); + containerSet.addContainer(kvc); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } +} \ No newline at end of file diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/package-info.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/package-info.java new file mode 100644 index 0000000000..5c905e0287 --- /dev/null +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +/** + * Tests for the container replication. + */ +package org.apache.hadoop.ozone.container.replication; \ No newline at end of file