HDDS-460. Replication manager failed to import container data. Contributed by Elek, Marton.

This commit is contained in:
Nanda kumar 2018-09-19 23:51:50 +05:30
parent efdea85ad1
commit 042bf74d5e
10 changed files with 601 additions and 273 deletions

View File

@ -40,7 +40,12 @@
.DeleteBlocksCommandHandler; .DeleteBlocksCommandHandler;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
.ReplicateContainerCommandHandler; .ReplicateContainerCommandHandler;
import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.container.replication.ContainerReplicator;
import org.apache.hadoop.ozone.container.replication.DownloadAndImportReplicator;
import org.apache.hadoop.ozone.container.replication.ReplicationSupervisor;
import org.apache.hadoop.ozone.container.replication.SimpleContainerDownloader;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.apache.hadoop.util.concurrent.HadoopExecutors;
@ -69,6 +74,7 @@ public class DatanodeStateMachine implements Closeable {
private AtomicLong nextHB; private AtomicLong nextHB;
private Thread stateMachineThread = null; private Thread stateMachineThread = null;
private Thread cmdProcessThread = null; private Thread cmdProcessThread = null;
private final ReplicationSupervisor supervisor;
/** /**
* Constructs a a datanode state machine. * Constructs a a datanode state machine.
@ -89,14 +95,21 @@ public DatanodeStateMachine(DatanodeDetails datanodeDetails,
new OzoneConfiguration(conf), context); new OzoneConfiguration(conf), context);
nextHB = new AtomicLong(Time.monotonicNow()); nextHB = new AtomicLong(Time.monotonicNow());
ContainerReplicator replicator =
new DownloadAndImportReplicator(container.getContainerSet(),
container.getDispatcher(),
new SimpleContainerDownloader(conf), new TarContainerPacker());
supervisor =
new ReplicationSupervisor(container.getContainerSet(), replicator, 10);
// When we add new handlers just adding a new handler here should do the // When we add new handlers just adding a new handler here should do the
// trick. // trick.
commandDispatcher = CommandDispatcher.newBuilder() commandDispatcher = CommandDispatcher.newBuilder()
.addHandler(new CloseContainerCommandHandler()) .addHandler(new CloseContainerCommandHandler())
.addHandler(new DeleteBlocksCommandHandler(container.getContainerSet(), .addHandler(new DeleteBlocksCommandHandler(container.getContainerSet(),
conf)) conf))
.addHandler(new ReplicateContainerCommandHandler(conf, .addHandler(new ReplicateContainerCommandHandler(conf, supervisor))
container.getContainerSet(), container.getDispatcher()))
.setConnectionManager(connectionManager) .setConnectionManager(connectionManager)
.setContainer(container) .setContainer(container)
.setContext(context) .setContext(context)
@ -295,6 +308,7 @@ public DatanodeStates getNextState() {
public void startDaemon() { public void startDaemon() {
Runnable startStateMachineTask = () -> { Runnable startStateMachineTask = () -> {
try { try {
supervisor.start();
start(); start();
LOG.info("Ozone container server started."); LOG.info("Ozone container server started.");
} catch (Exception ex) { } catch (Exception ex) {
@ -323,6 +337,7 @@ public void join() throws InterruptedException {
*/ */
public synchronized void stopDaemon() { public synchronized void stopDaemon() {
try { try {
supervisor.stop();
context.setState(DatanodeStates.SHUTDOWN); context.setState(DatanodeStates.SHUTDOWN);
reportManager.shutdown(); reportManager.shutdown();
this.close(); this.close();

View File

@ -16,33 +16,17 @@
*/ */
package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
import java.io.FileInputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List; import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
.StorageContainerDatanodeProtocolProtos.SCMCommandProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
import org.apache.hadoop.hdds.protocol.proto import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
import org.apache.hadoop.ozone.container.common.interfaces.Handler;
import org.apache.hadoop.ozone.container.common.statemachine
.SCMConnectionManager;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.container.replication.ContainerDownloader; import org.apache.hadoop.ozone.container.replication.ReplicationSupervisor;
import org.apache.hadoop.ozone.container.replication.SimpleContainerDownloader; import org.apache.hadoop.ozone.container.replication.ReplicationTask;
import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand; import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
@ -58,39 +42,19 @@ public class ReplicateContainerCommandHandler implements CommandHandler {
static final Logger LOG = static final Logger LOG =
LoggerFactory.getLogger(ReplicateContainerCommandHandler.class); LoggerFactory.getLogger(ReplicateContainerCommandHandler.class);
private ContainerDispatcher containerDispatcher;
private int invocationCount; private int invocationCount;
private long totalTime; private long totalTime;
private ContainerDownloader downloader;
private Configuration conf; private Configuration conf;
private TarContainerPacker packer = new TarContainerPacker(); private ReplicationSupervisor supervisor;
private ContainerSet containerSet;
private Lock lock = new ReentrantLock();
public ReplicateContainerCommandHandler( public ReplicateContainerCommandHandler(
Configuration conf, Configuration conf,
ContainerSet containerSet, ReplicationSupervisor supervisor) {
ContainerDispatcher containerDispatcher,
ContainerDownloader downloader) {
this.conf = conf; this.conf = conf;
this.containerSet = containerSet; this.supervisor = supervisor;
this.downloader = downloader;
this.containerDispatcher = containerDispatcher;
}
public ReplicateContainerCommandHandler(
Configuration conf,
ContainerSet containerSet,
ContainerDispatcher containerDispatcher) {
this(conf, containerSet, containerDispatcher,
new SimpleContainerDownloader(conf));
} }
@Override @Override
@ -108,72 +72,12 @@ public void handle(SCMCommand command, OzoneContainer container,
String.format("Replication command is received for container %d " String.format("Replication command is received for container %d "
+ "but the size of source datanodes was 0.", containerID)); + "but the size of source datanodes was 0.", containerID));
LOG.info("Starting replication of container {} from {}", containerID, ReplicationTask replicationTask =
sourceDatanodes); new ReplicationTask(containerID, sourceDatanodes);
CompletableFuture<Path> tempTarFile = downloader supervisor.addTask(replicationTask);
.getContainerDataFromReplicas(containerID,
sourceDatanodes);
CompletableFuture<Void> result =
tempTarFile.thenAccept(path -> {
LOG.info("Container {} is downloaded, starting to import.",
containerID);
importContainer(containerID, path);
});
result.whenComplete((aVoid, throwable) -> {
if (throwable != null) {
LOG.error("Container replication was unsuccessful .", throwable);
} else {
LOG.info("Container {} is replicated successfully", containerID);
}
});
} finally { } finally {
updateCommandStatus(context, command, true, LOG); updateCommandStatus(context, command, true, LOG);
}
}
protected void importContainer(long containerID, Path tarFilePath) {
lock.lock();
try {
ContainerData originalContainerData;
try (FileInputStream tempContainerTarStream = new FileInputStream(
tarFilePath.toFile())) {
byte[] containerDescriptorYaml =
packer.unpackContainerDescriptor(tempContainerTarStream);
originalContainerData = ContainerDataYaml.readContainer(
containerDescriptorYaml);
}
try (FileInputStream tempContainerTarStream = new FileInputStream(
tarFilePath.toFile())) {
Handler handler = containerDispatcher.getHandler(
originalContainerData.getContainerType());
Container container = handler.importContainer(containerID,
originalContainerData.getMaxSize(),
tempContainerTarStream,
packer);
containerSet.addContainer(container);
}
} catch (Exception e) {
LOG.error(
"Can't import the downloaded container data id=" + containerID,
e);
try {
Files.delete(tarFilePath);
} catch (Exception ex) {
LOG.error(
"Container import is failed and the downloaded file can't be "
+ "deleted: "
+ tarFilePath.toAbsolutePath().toString());
}
} finally {
lock.unlock();
} }
} }

View File

@ -0,0 +1,27 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.replication;
/**
* Service to do the real replication task.
*
* An implementation should download the container and im
*/
public interface ContainerReplicator {
void replicate(ReplicationTask task);
}

View File

@ -0,0 +1,136 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.replication;
import java.io.FileInputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
import org.apache.hadoop.ozone.container.common.interfaces.Handler;
import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
import org.apache.hadoop.ozone.container.replication.ReplicationTask.Status;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Default replication implementation.
* <p>
* This class does the real job. Executes the download and import the container
* to the container set.
*/
public class DownloadAndImportReplicator implements ContainerReplicator {
private static final Logger LOG =
LoggerFactory.getLogger(DownloadAndImportReplicator.class);
private final ContainerSet containerSet;
private final ContainerDispatcher containerDispatcher;
private final ContainerDownloader downloader;
private final TarContainerPacker packer;
public DownloadAndImportReplicator(
ContainerSet containerSet,
ContainerDispatcher containerDispatcher,
ContainerDownloader downloader,
TarContainerPacker packer) {
this.containerSet = containerSet;
this.containerDispatcher = containerDispatcher;
this.downloader = downloader;
this.packer = packer;
}
public void importContainer(long containerID, Path tarFilePath) {
try {
ContainerData originalContainerData;
try (FileInputStream tempContainerTarStream = new FileInputStream(
tarFilePath.toFile())) {
byte[] containerDescriptorYaml =
packer.unpackContainerDescriptor(tempContainerTarStream);
originalContainerData = ContainerDataYaml.readContainer(
containerDescriptorYaml);
}
try (FileInputStream tempContainerTarStream = new FileInputStream(
tarFilePath.toFile())) {
Handler handler = containerDispatcher.getHandler(
originalContainerData.getContainerType());
Container container = handler.importContainer(containerID,
originalContainerData.getMaxSize(),
tempContainerTarStream,
packer);
containerSet.addContainer(container);
}
} catch (Exception e) {
LOG.error(
"Can't import the downloaded container data id=" + containerID,
e);
try {
Files.delete(tarFilePath);
} catch (Exception ex) {
LOG.error(
"Container import is failed and the downloaded file can't be "
+ "deleted: "
+ tarFilePath.toAbsolutePath().toString());
}
}
}
@Override
public void replicate(ReplicationTask task) {
long containerID = task.getContainerId();
List<DatanodeDetails> sourceDatanodes = task.getSources();
LOG.info("Starting replication of container {} from {}", containerID,
sourceDatanodes);
CompletableFuture<Path> tempTarFile = downloader
.getContainerDataFromReplicas(containerID,
sourceDatanodes);
try {
//wait for the download. This thread pool is limiting the paralell
//downloads, so it's ok to block here and wait for the full download.
Path path = tempTarFile.get();
LOG.info("Container {} is downloaded, starting to import.",
containerID);
importContainer(containerID, path);
LOG.info("Container {} is replicated successfully", containerID);
task.setStatus(Status.DONE);
} catch (Exception e) {
LOG.error("Container replication was unsuccessful .", e);
task.setStatus(Status.FAILED);
}
}
}

View File

@ -157,8 +157,8 @@ public void onError(Throwable throwable) {
public void onCompleted() { public void onCompleted() {
try { try {
stream.close(); stream.close();
response.complete(outputPath);
LOG.info("Container is downloaded to {}", outputPath); LOG.info("Container is downloaded to {}", outputPath);
response.complete(outputPath);
} catch (IOException e) { } catch (IOException e) {
response.completeExceptionally(e); response.completeExceptionally(e);
} }

View File

@ -0,0 +1,142 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.replication;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.replication.ReplicationTask.Status;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Single point to schedule the downloading tasks based on priorities.
*/
public class ReplicationSupervisor {
private static final Logger LOG =
LoggerFactory.getLogger(ReplicationSupervisor.class);
private final Set<Worker> threadPool = new HashSet<>();
private final Map<Long, ReplicationTask> queue = new TreeMap();
private final ContainerSet containerSet;
private final ContainerReplicator replicator;
private final int poolSize;
public ReplicationSupervisor(
ContainerSet containerSet,
ContainerReplicator replicator, int poolSize) {
this.containerSet = containerSet;
this.replicator = replicator;
this.poolSize = poolSize;
}
public synchronized void addTask(ReplicationTask task) {
queue.putIfAbsent(task.getContainerId(), task);
synchronized (threadPool) {
threadPool.notify();
}
}
public void start() {
for (int i = 0; i < poolSize; i++) {
Worker worker = new Worker();
Thread thread = new Thread(worker, "ContainerReplication-" + i);
thread.setDaemon(true);
thread.start();
threadPool.add(worker);
}
}
public synchronized ReplicationTask selectTask() {
for (ReplicationTask task : queue.values()) {
if (task.getStatus() == Status.QUEUED) {
if (containerSet.getContainer(task.getContainerId()) == null) {
task.setStatus(Status.DOWNLOADING);
return task;
} else {
LOG.debug("Container {} has already been downloaded.",
task.getContainerId());
queue.remove(task.getContainerId());
}
} else if (task.getStatus() == Status.FAILED) {
LOG.error(
"Container {} can't be downloaded from any of the datanodes.",
task.getContainerId());
queue.remove(task.getContainerId());
} else if (task.getStatus() == Status.DONE) {
queue.remove(task.getContainerId());
LOG.info("Container {} is replicated.", task.getContainerId());
}
}
//no available task.
return null;
}
public void stop() {
for (Worker worker : threadPool) {
worker.stop();
}
}
@VisibleForTesting
public int getQueueSize() {
return queue.size();
}
private class Worker implements Runnable {
private boolean running = true;
@Override
public void run() {
try {
while (running) {
ReplicationTask task = selectTask();
if (task == null) {
synchronized (threadPool) {
threadPool.wait();
}
} else {
replicator.replicate(task);
}
}
} catch (Exception ex) {
LOG.error("Error on doing replication", ex);
try {
Thread.sleep(200);
} catch (InterruptedException e) {
LOG.error("Error on waiting after failed replication task", e);
}
}
}
public void stop() {
running = false;
}
}
}

View File

@ -0,0 +1,102 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.replication;
import java.time.Instant;
import java.util.List;
import java.util.Objects;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
/**
* The task to download a container from the sources.
*/
public class ReplicationTask {
private volatile Status status = Status.QUEUED;
private final long containerId;
private List<DatanodeDetails> sources;
private final Instant queued = Instant.now();
public ReplicationTask(long containerId,
List<DatanodeDetails> sources) {
this.containerId = containerId;
this.sources = sources;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ReplicationTask that = (ReplicationTask) o;
return containerId == that.containerId;
}
@Override
public int hashCode() {
return Objects.hash(containerId);
}
public long getContainerId() {
return containerId;
}
public List<DatanodeDetails> getSources() {
return sources;
}
public Status getStatus() {
return status;
}
public void setStatus(
Status status) {
this.status = status;
}
@Override
public String toString() {
return "ReplicationTask{" +
"status=" + status +
", containerId=" + containerId +
", sources=" + sources +
", queued=" + queued +
'}';
}
public Instant getQueued() {
return queued;
}
/**
* Status of the replication.
*/
public enum Status {
QUEUED,
DOWNLOADING,
FAILED,
DONE
}
}

View File

@ -1,163 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.replication.ContainerDownloader;
import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.TestGenericTestUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
/**
* Test replication command handler.
*/
public class TestReplicateContainerCommandHandler {
private static final String EXCEPTION_MESSAGE = "Oh my god";
private ReplicateContainerCommandHandler handler;
private StubDownloader downloader;
private ReplicateContainerCommand command;
private List<Long> importedContainerIds;
@Before
public void init() {
importedContainerIds = new ArrayList<>();
OzoneConfiguration conf = new OzoneConfiguration();
ContainerSet containerSet = Mockito.mock(ContainerSet.class);
ContainerDispatcher containerDispatcher =
Mockito.mock(ContainerDispatcher.class);
downloader = new StubDownloader();
handler = new ReplicateContainerCommandHandler(conf, containerSet,
containerDispatcher, downloader) {
@Override
protected void importContainer(long containerID, Path tarFilePath) {
importedContainerIds.add(containerID);
}
};
//the command
ArrayList<DatanodeDetails> datanodeDetails = new ArrayList<>();
datanodeDetails.add(Mockito.mock(DatanodeDetails.class));
datanodeDetails.add(Mockito.mock(DatanodeDetails.class));
command = new ReplicateContainerCommand(1L, datanodeDetails);
}
@Test
public void handle() throws TimeoutException, InterruptedException {
//GIVEN
//WHEN
handler.handle(command, null, Mockito.mock(StateContext.class), null);
TestGenericTestUtils
.waitFor(() -> downloader.futureByContainers.size() == 1, 100, 2000);
Assert.assertNotNull(downloader.futureByContainers.get(1L));
downloader.futureByContainers.get(1L).complete(Paths.get("/tmp/test"));
TestGenericTestUtils
.waitFor(() -> importedContainerIds.size() == 1, 100, 2000);
}
@Test
public void handleWithErrors() throws TimeoutException, InterruptedException {
//GIVEN
GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
.captureLogs(ReplicateContainerCommandHandler.LOG);
//WHEN
handler.handle(command, null, Mockito.mock(StateContext.class), null);
//THEN
TestGenericTestUtils
.waitFor(() -> downloader.futureByContainers.size() == 1, 100, 2000);
Assert.assertNotNull(downloader.futureByContainers.get(1L));
downloader.futureByContainers.get(1L)
.completeExceptionally(new IllegalArgumentException(
EXCEPTION_MESSAGE));
TestGenericTestUtils
.waitFor(() -> {
String output = logCapturer.getOutput();
return output.contains("unsuccessful") && output
.contains(EXCEPTION_MESSAGE); },
100,
2000);
}
/**
* Can't handle a command if there are no source replicas.
*/
@Test(expected = IllegalArgumentException.class)
public void handleWithoutReplicas()
throws TimeoutException, InterruptedException {
//GIVEN
ReplicateContainerCommand commandWithoutReplicas =
new ReplicateContainerCommand(1L, new ArrayList<>());
//WHEN
handler
.handle(commandWithoutReplicas,
null,
Mockito.mock(StateContext.class),
null);
}
private static class StubDownloader implements ContainerDownloader {
private Map<Long, CompletableFuture<Path>> futureByContainers =
new HashMap<>();
@Override
public void close() {
}
@Override
public CompletableFuture<Path> getContainerDataFromReplicas(
long containerId, List<DatanodeDetails> sources) {
CompletableFuture<Path> future = new CompletableFuture<>();
futureByContainers.put(containerId, future);
return future;
}
}
}

View File

@ -0,0 +1,143 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.replication;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
/**
* Test the replication supervisor.
*/
public class TestReplicationSupervisor {
private OzoneConfiguration conf = new OzoneConfiguration();
@Test
public void normal() {
//GIVEN
ContainerSet set = new ContainerSet();
FakeReplicator replicator = new FakeReplicator(set);
ReplicationSupervisor supervisor =
new ReplicationSupervisor(set, replicator, 5);
List<DatanodeDetails> datanodes = IntStream.range(1, 3)
.mapToObj(v -> Mockito.mock(DatanodeDetails.class))
.collect(Collectors.toList());
try {
supervisor.start();
//WHEN
supervisor.addTask(new ReplicationTask(1L, datanodes));
supervisor.addTask(new ReplicationTask(1L, datanodes));
supervisor.addTask(new ReplicationTask(1L, datanodes));
supervisor.addTask(new ReplicationTask(2L, datanodes));
supervisor.addTask(new ReplicationTask(2L, datanodes));
supervisor.addTask(new ReplicationTask(3L, datanodes));
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
//THEN
System.out.println(replicator.replicated.get(0));
Assert
.assertEquals(3, replicator.replicated.size());
} finally {
supervisor.stop();
}
}
@Test
public void duplicateMessageAfterAWhile() throws InterruptedException {
//GIVEN
ContainerSet set = new ContainerSet();
FakeReplicator replicator = new FakeReplicator(set);
ReplicationSupervisor supervisor =
new ReplicationSupervisor(set, replicator, 2);
List<DatanodeDetails> datanodes = IntStream.range(1, 3)
.mapToObj(v -> Mockito.mock(DatanodeDetails.class))
.collect(Collectors.toList());
try {
supervisor.start();
//WHEN
supervisor.addTask(new ReplicationTask(1L, datanodes));
Thread.sleep(400);
supervisor.addTask(new ReplicationTask(1L, datanodes));
Thread.sleep(300);
//THEN
System.out.println(replicator.replicated.get(0));
Assert
.assertEquals(1, replicator.replicated.size());
//the last item is still in the queue as we cleanup the queue during the
// selection
Assert.assertEquals(1, supervisor.getQueueSize());
} finally {
supervisor.stop();
}
}
private class FakeReplicator implements ContainerReplicator {
private List<ReplicationTask> replicated = new ArrayList<>();
private ContainerSet containerSet;
FakeReplicator(ContainerSet set) {
this.containerSet = set;
}
@Override
public void replicate(ReplicationTask task) {
KeyValueContainerData kvcd =
new KeyValueContainerData(task.getContainerId(), 100L);
KeyValueContainer kvc =
new KeyValueContainer(kvcd, conf);
try {
//download is slow
Thread.sleep(100);
replicated.add(task);
containerSet.addContainer(kvc);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
}

View File

@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
/**
* Tests for the container replication.
*/
package org.apache.hadoop.ozone.container.replication;