sources);
+
+}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerReplicationSource.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerReplicationSource.java
new file mode 100644
index 0000000000..69582f799f
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerReplicationSource.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.replication;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * Contract to prepare provide the container in binary form..
+ *
+ * Prepare will be called when container is closed. An implementation could
+ * precache any binary representation of a container and store the pre packede
+ * images.
+ */
+public interface ContainerReplicationSource {
+
+ /**
+ * Prepare for the replication.
+ *
+ * @param containerId The name of the container the package.
+ */
+ void prepare(long containerId);
+
+ /**
+ * Copy the container data to an output stream.
+ *
+ * @param containerId Container to replicate
+ * @param destination The destination stream to copy all the container data.
+ * @throws IOException
+ */
+ void copyData(long containerId, OutputStream destination)
+ throws IOException;
+
+}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerStreamingOutput.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerStreamingOutput.java
new file mode 100644
index 0000000000..f7fd8a4957
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerStreamingOutput.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.replication;
+
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.StreamingOutput;
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * JAX-RS streaming output to return the binary container data.
+ */
+public class ContainerStreamingOutput implements StreamingOutput {
+
+ private long containerId;
+
+ private ContainerReplicationSource containerReplicationSource;
+
+ public ContainerStreamingOutput(long containerId,
+ ContainerReplicationSource containerReplicationSource) {
+ this.containerId = containerId;
+ this.containerReplicationSource = containerReplicationSource;
+ }
+
+ @Override
+ public void write(OutputStream outputStream)
+ throws IOException, WebApplicationException {
+ containerReplicationSource.copyData(containerId, outputStream);
+ }
+}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java
new file mode 100644
index 0000000000..91d098f0b0
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.replication;
+
+import java.io.BufferedOutputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+ .CopyContainerRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+ .CopyContainerResponseProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto
+ .IntraDatanodeProtocolServiceGrpc;
+import org.apache.hadoop.hdds.protocol.datanode.proto
+ .IntraDatanodeProtocolServiceGrpc.IntraDatanodeProtocolServiceStub;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+
+import com.google.common.base.Preconditions;
+import org.apache.ratis.shaded.io.grpc.ManagedChannel;
+import org.apache.ratis.shaded.io.grpc.netty.NettyChannelBuilder;
+import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Client to read container data from Grpc.
+ */
+public class GrpcReplicationClient {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(GrpcReplicationClient.class);
+
+ private final ManagedChannel channel;
+
+ private final IntraDatanodeProtocolServiceStub client;
+
+ private final Path workingDirectory;
+
+ public GrpcReplicationClient(String host,
+ int port, Path workingDir) {
+
+ channel = NettyChannelBuilder.forAddress(host, port)
+ .usePlaintext()
+ .maxInboundMessageSize(OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE)
+ .build();
+ client = IntraDatanodeProtocolServiceGrpc.newStub(channel);
+ this.workingDirectory = workingDir;
+
+ }
+
+ public CompletableFuture download(long containerId) {
+ CopyContainerRequestProto request =
+ CopyContainerRequestProto.newBuilder()
+ .setContainerID(containerId)
+ .setLen(-1)
+ .setReadOffset(0)
+ .build();
+
+ CompletableFuture response = new CompletableFuture<>();
+
+ Path destinationPath =
+ getWorkingDirectory().resolve("container-" + containerId + ".tar.gz");
+
+ client.download(request,
+ new StreamDownloader(containerId, response, destinationPath));
+ return response;
+ }
+
+ private Path getWorkingDirectory() {
+ return workingDirectory;
+ }
+
+ public void shutdown() {
+ channel.shutdown();
+ }
+
+ /**
+ * Grpc stream observer to ComletableFuture adapter.
+ */
+ public static class StreamDownloader
+ implements StreamObserver {
+
+ private final CompletableFuture response;
+
+ private final long containerId;
+
+ private BufferedOutputStream stream;
+
+ private Path outputPath;
+
+ public StreamDownloader(long containerId, CompletableFuture response,
+ Path outputPath) {
+ this.response = response;
+ this.containerId = containerId;
+ this.outputPath = outputPath;
+ try {
+ outputPath = Preconditions.checkNotNull(outputPath);
+ Path parentPath = Preconditions.checkNotNull(outputPath.getParent());
+ Files.createDirectories(parentPath);
+ stream =
+ new BufferedOutputStream(new FileOutputStream(outputPath.toFile()));
+ } catch (IOException e) {
+ throw new RuntimeException("OutputPath can't be used: " + outputPath,
+ e);
+ }
+
+ }
+
+ @Override
+ public void onNext(CopyContainerResponseProto chunk) {
+ try {
+ stream.write(chunk.getData().toByteArray());
+ } catch (IOException e) {
+ response.completeExceptionally(e);
+ }
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ try {
+ stream.close();
+ LOG.error("Container download was unsuccessfull", throwable);
+ try {
+ Files.delete(outputPath);
+ } catch (IOException ex) {
+ LOG.error(
+ "Error happened during the download but can't delete the "
+ + "temporary destination.", ex);
+ }
+ response.completeExceptionally(throwable);
+ } catch (IOException e) {
+ response.completeExceptionally(e);
+ }
+ }
+
+ @Override
+ public void onCompleted() {
+ try {
+ stream.close();
+ response.complete(outputPath);
+ LOG.info("Container is downloaded to {}", outputPath);
+ } catch (IOException e) {
+ response.completeExceptionally(e);
+ }
+
+ }
+ }
+
+}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java
new file mode 100644
index 0000000000..d8f696f47d
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.replication;
+
+import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+ .CopyContainerRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+ .CopyContainerResponseProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto
+ .IntraDatanodeProtocolServiceGrpc;
+
+import org.apache.ratis.shaded.com.google.protobuf.ByteString;
+import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Service to make containers available for replication.
+ */
+public class GrpcReplicationService extends
+ IntraDatanodeProtocolServiceGrpc.IntraDatanodeProtocolServiceImplBase {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(GrpcReplicationService.class);
+
+ private final ContainerReplicationSource containerReplicationSource;
+
+ public GrpcReplicationService(
+ ContainerReplicationSource containerReplicationSource) {
+ this.containerReplicationSource = containerReplicationSource;
+ }
+
+ @Override
+ public void download(CopyContainerRequestProto request,
+ StreamObserver responseObserver) {
+ LOG.info("Streaming container data ({}) to other datanode",
+ request.getContainerID());
+ try {
+ GrpcOutputStream outputStream =
+ new GrpcOutputStream(responseObserver, request.getContainerID());
+ containerReplicationSource
+ .copyData(request.getContainerID(), outputStream);
+
+ } catch (IOException e) {
+ LOG.error("Can't stream the container data", e);
+ responseObserver.onError(e);
+ }
+ }
+
+ private static class GrpcOutputStream extends OutputStream
+ implements Closeable {
+
+ private static final int BUFFER_SIZE_IN_BYTES = 1024 * 1024;
+
+ private final StreamObserver responseObserver;
+
+ private final ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+
+ private long containerId;
+
+ private int readOffset = 0;
+
+ private int writtenBytes;
+
+ GrpcOutputStream(
+ StreamObserver responseObserver,
+ long containerId) {
+ this.responseObserver = responseObserver;
+ this.containerId = containerId;
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ try {
+ buffer.write(b);
+ if (buffer.size() > BUFFER_SIZE_IN_BYTES) {
+ flushBuffer(false);
+ }
+ } catch (Exception ex) {
+ responseObserver.onError(ex);
+ }
+ }
+
+ private void flushBuffer(boolean eof) {
+ if (buffer.size() > 0) {
+ CopyContainerResponseProto response =
+ CopyContainerResponseProto.newBuilder()
+ .setContainerID(containerId)
+ .setData(ByteString.copyFrom(buffer.toByteArray()))
+ .setEof(eof)
+ .setReadOffset(readOffset)
+ .setLen(buffer.size())
+ .build();
+ responseObserver.onNext(response);
+ readOffset += buffer.size();
+ writtenBytes += buffer.size();
+ buffer.reset();
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ flushBuffer(true);
+ LOG.info("{} bytes written to the rpc stream from container {}",
+ writtenBytes, containerId);
+ responseObserver.onCompleted();
+ }
+ }
+}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java
new file mode 100644
index 0000000000..d557b548b4
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.replication;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerPacker;
+import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
+
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A naive implementation of the replication source which creates a tar file
+ * on-demand without pre-create the compressed archives.
+ */
+public class OnDemandContainerReplicationSource
+ implements ContainerReplicationSource {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ContainerReplicationSource.class);
+
+ private ContainerSet containerSet;
+
+ private ContainerPacker packer = new TarContainerPacker();
+
+ public OnDemandContainerReplicationSource(
+ ContainerSet containerSet) {
+ this.containerSet = containerSet;
+ }
+
+ @Override
+ public void prepare(long containerId) {
+
+ }
+
+ @Override
+ public void copyData(long containerId, OutputStream destination)
+ throws IOException {
+
+ Container container = containerSet.getContainer(containerId);
+
+ Preconditions
+ .checkNotNull(container, "Container is not found " + containerId);
+
+ switch (container.getContainerType()) {
+ case KeyValueContainer:
+ packer.pack(container,
+ destination);
+ break;
+ default:
+ LOG.warn("Container type " + container.getContainerType()
+ + " is not replicable as no compression algorithm for that.");
+ }
+
+ }
+}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java
new file mode 100644
index 0000000000..a461a98f23
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.replication;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Simple ContainerDownloaderImplementation to download the missing container
+ * from the first available datanode.
+ *
+ * This is not the most effective implementation as it uses only one source
+ * for he container download.
+ */
+public class SimpleContainerDownloader implements ContainerDownloader {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(SimpleContainerDownloader.class);
+
+ private final Path workingDirectory;
+
+ private ExecutorService executor;
+
+ public SimpleContainerDownloader(Configuration conf) {
+
+ String workDirString =
+ conf.get(OzoneConfigKeys.OZONE_CONTAINER_COPY_WORKDIR);
+
+ if (workDirString == null) {
+ workingDirectory = Paths.get(System.getProperty("java.io.tmpdir"))
+ .resolve("container-copy");
+ } else {
+ workingDirectory = Paths.get(workDirString);
+ }
+
+ ThreadFactory build = new ThreadFactoryBuilder().setDaemon(true)
+ .setNameFormat("Container downloader thread - %d").build();
+ executor = Executors.newSingleThreadExecutor(build);
+ LOG.info("Starting container downloader service to copy "
+ + "containers to replicate.");
+ }
+
+ @Override
+ public CompletableFuture getContainerDataFromReplicas(long containerId,
+ List sourceDatanodes) {
+
+ CompletableFuture result = null;
+ for (DatanodeDetails datanode : sourceDatanodes) {
+ try {
+
+ if (result == null) {
+ GrpcReplicationClient grpcReplicationClient =
+ new GrpcReplicationClient(datanode.getIpAddress(),
+ datanode.getPort(Name.STANDALONE).getValue(),
+ workingDirectory);
+ result = grpcReplicationClient.download(containerId);
+ } else {
+ result = result.thenApply(CompletableFuture::completedFuture)
+ .exceptionally(t -> {
+ LOG.error("Error on replicating container: " + containerId, t);
+ GrpcReplicationClient grpcReplicationClient =
+ new GrpcReplicationClient(datanode.getIpAddress(),
+ datanode.getPort(Name.STANDALONE).getValue(),
+ workingDirectory);
+ return grpcReplicationClient.download(containerId);
+ }).thenCompose(Function.identity());
+ }
+ } catch (Exception ex) {
+ LOG.error(String.format(
+ "Container %s download from datanode %s was unsuccessful. "
+ + "Trying the next datanode", containerId, datanode), ex);
+ }
+
+ }
+ return result;
+
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ executor.awaitTermination(10, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ LOG.error("Can't stop container downloader gracefully", e);
+ }
+ }
+}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/package-info.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/package-info.java
new file mode 100644
index 0000000000..38a853c72a
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.replication;
+/**
+ Classes to replicate container data between datanodes.
+**/
\ No newline at end of file
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReplicateContainerCommandHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReplicateContainerCommandHandler.java
new file mode 100644
index 0000000000..6a14d333e2
--- /dev/null
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReplicateContainerCommandHandler.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
+
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.container.replication.ContainerDownloader;
+import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.TestGenericTestUtils;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+/**
+ * Test replication command handler.
+ */
+public class TestReplicateContainerCommandHandler {
+
+ private static final String EXCEPTION_MESSAGE = "Oh my god";
+ private ReplicateContainerCommandHandler handler;
+ private StubDownloader downloader;
+ private ReplicateContainerCommand command;
+ private List importedContainerIds;
+
+ @Before
+ public void init() {
+ importedContainerIds = new ArrayList<>();
+
+ OzoneConfiguration conf = new OzoneConfiguration();
+ ContainerSet containerSet = Mockito.mock(ContainerSet.class);
+ ContainerDispatcher containerDispatcher =
+ Mockito.mock(ContainerDispatcher.class);
+
+ downloader = new StubDownloader();
+
+ handler = new ReplicateContainerCommandHandler(conf, containerSet,
+ containerDispatcher, downloader) {
+ @Override
+ protected void importContainer(long containerID, Path tarFilePath) {
+ importedContainerIds.add(containerID);
+ }
+ };
+
+ //the command
+ ArrayList datanodeDetails = new ArrayList<>();
+ datanodeDetails.add(Mockito.mock(DatanodeDetails.class));
+ datanodeDetails.add(Mockito.mock(DatanodeDetails.class));
+
+ command = new ReplicateContainerCommand(1L, datanodeDetails);
+ }
+
+ @Test
+ public void handle() throws TimeoutException, InterruptedException {
+ //GIVEN
+
+ //WHEN
+ handler.handle(command, null, Mockito.mock(StateContext.class), null);
+
+ TestGenericTestUtils
+ .waitFor(() -> downloader.futureByContainers.size() == 1, 100, 2000);
+
+ Assert.assertNotNull(downloader.futureByContainers.get(1L));
+ downloader.futureByContainers.get(1L).complete(Paths.get("/tmp/test"));
+
+ TestGenericTestUtils
+ .waitFor(() -> importedContainerIds.size() == 1, 100, 2000);
+ }
+
+ @Test
+ public void handleWithErrors() throws TimeoutException, InterruptedException {
+ //GIVEN
+ GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
+ .captureLogs(ReplicateContainerCommandHandler.LOG);
+
+ //WHEN
+ handler.handle(command, null, Mockito.mock(StateContext.class), null);
+
+ //THEN
+
+ TestGenericTestUtils
+ .waitFor(() -> downloader.futureByContainers.size() == 1, 100, 2000);
+
+ Assert.assertNotNull(downloader.futureByContainers.get(1L));
+ downloader.futureByContainers.get(1L)
+ .completeExceptionally(new IllegalArgumentException(
+ EXCEPTION_MESSAGE));
+
+ TestGenericTestUtils
+ .waitFor(() -> {
+ String output = logCapturer.getOutput();
+ return output.contains("unsuccessful") && output
+ .contains(EXCEPTION_MESSAGE); },
+ 100,
+ 2000);
+ }
+
+ private static class StubDownloader implements ContainerDownloader {
+
+ private Map> futureByContainers =
+ new HashMap<>();
+
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public CompletableFuture getContainerDataFromReplicas(
+ long containerId, List sources) {
+ CompletableFuture future = new CompletableFuture<>();
+ futureByContainers.put(containerId, future);
+ return future;
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/package-info.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/package-info.java
new file mode 100644
index 0000000000..05ac76d143
--- /dev/null
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Tests for command handlers.
+ */
+package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java
new file mode 100644
index 0000000000..7391b25b42
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+ .ContainerCommandRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+ .ContainerCommandResponseProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+ .ContainerType;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+ .DatanodeBlockID;
+import org.apache.hadoop.hdds.scm.XceiverClientGrpc;
+import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.ozone.HddsDatanodeService;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.container.common.helpers.KeyData;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.container.ozoneimpl.TestOzoneContainer;
+import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
+
+import static org.apache.hadoop.ozone.container.ozoneimpl.TestOzoneContainer
+ .writeChunkForContainer;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+/**
+ * Tests ozone containers replication.
+ */
+public class TestContainerReplication {
+ /**
+ * Set the timeout for every test.
+ */
+ @Rule
+ public Timeout testTimeout = new Timeout(300000);
+
+ @Test
+ public void testContainerReplication() throws Exception {
+ //GIVEN
+ OzoneConfiguration conf = newOzoneConfiguration();
+
+ long containerId = 1L;
+
+ conf.setSocketAddr("hdls.datanode.http-address",
+ new InetSocketAddress("0.0.0.0", 0));
+
+ MiniOzoneCluster cluster =
+ MiniOzoneCluster.newBuilder(conf).setNumDatanodes(2)
+ .setRandomContainerPort(true).build();
+ cluster.waitForClusterToBeReady();
+
+ HddsDatanodeService firstDatanode = cluster.getHddsDatanodes().get(0);
+
+ //copy from the first datanode
+ List sourceDatanodes = new ArrayList<>();
+ sourceDatanodes.add(firstDatanode.getDatanodeDetails());
+
+ Pipeline sourcePipelines =
+ ContainerTestHelper.createPipeline(sourceDatanodes);
+
+ //create a new client
+ XceiverClientSpi client = new XceiverClientGrpc(sourcePipelines, conf);
+ client.connect();
+
+ //New container for testing
+ TestOzoneContainer.createContainerForTesting(client, containerId);
+
+ ContainerCommandRequestProto requestProto =
+ writeChunkForContainer(client, containerId, 1024);
+
+ DatanodeBlockID blockID = requestProto.getWriteChunk().getBlockID();
+
+ // Put Key to the test container
+ ContainerCommandRequestProto putKeyRequest = ContainerTestHelper
+ .getPutKeyRequest(sourcePipelines, requestProto.getWriteChunk());
+
+ ContainerProtos.KeyData keyData = putKeyRequest.getPutKey().getKeyData();
+
+ ContainerCommandResponseProto response = client.sendCommand(putKeyRequest);
+
+ Assert.assertNotNull(response);
+ Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
+ Assert.assertTrue(putKeyRequest.getTraceID().equals(response.getTraceID()));
+
+ HddsDatanodeService destinationDatanode =
+ chooseDatanodeWithoutContainer(sourcePipelines,
+ cluster.getHddsDatanodes());
+
+ //WHEN: send the order to replicate the container
+ cluster.getStorageContainerManager().getScmNodeManager()
+ .addDatanodeCommand(destinationDatanode.getDatanodeDetails().getUuid(),
+ new ReplicateContainerCommand(containerId,
+ sourcePipelines.getMachines()));
+
+ Thread.sleep(3000);
+
+ OzoneContainer ozoneContainer =
+ destinationDatanode.getDatanodeStateMachine().getContainer();
+
+
+
+ Container container =
+ ozoneContainer
+ .getContainerSet().getContainer(containerId);
+
+ Assert.assertNotNull(
+ "Container is not replicated to the destination datanode",
+ container);
+
+ Assert.assertNotNull(
+ "ContainerData of the replicated container is null",
+ container.getContainerData());
+
+ long keyCount = ((KeyValueContainerData) container.getContainerData())
+ .getKeyCount();
+
+ KeyValueHandler handler = (KeyValueHandler) ozoneContainer.getDispatcher()
+ .getHandler(ContainerType.KeyValueContainer);
+
+ KeyData key = handler.getKeyManager()
+ .getKey(container, BlockID.getFromProtobuf(blockID));
+
+ Assert.assertNotNull(key);
+ Assert.assertEquals(1, key.getChunks().size());
+ Assert.assertEquals(requestProto.getWriteChunk().getChunkData(),
+ key.getChunks().get(0));
+
+ }
+
+ private HddsDatanodeService chooseDatanodeWithoutContainer(Pipeline pipeline,
+ List dataNodes) {
+ for (HddsDatanodeService datanode : dataNodes) {
+ if (!pipeline.getMachines().contains(datanode.getDatanodeDetails())) {
+ return datanode;
+ }
+ }
+ throw new AssertionError("No datanode outside of the pipeline");
+ }
+
+ static OzoneConfiguration newOzoneConfiguration() {
+ final OzoneConfiguration conf = new OzoneConfiguration();
+ return conf;
+ }
+
+}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReplicateContainerHandler.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReplicateContainerHandler.java
deleted file mode 100644
index 9e08212629..0000000000
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReplicateContainerHandler.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.concurrent.TimeoutException;
-
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.ozone.MiniOzoneCluster;
-import org.apache.hadoop.ozone.client.rest.OzoneException;
-import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
-import org.apache.hadoop.test.GenericTestUtils;
-
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys
- .OZONE_SCM_CONTAINER_SIZE;
-import org.junit.Test;
-
-/**
- * Tests the behavior of the datanode, when replicate container command is
- * received.
- */
-public class TestReplicateContainerHandler {
-
- @Test
- public void test() throws IOException, TimeoutException, InterruptedException,
- OzoneException {
-
- GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
- .captureLogs(ReplicateContainerCommandHandler.LOG);
-
- OzoneConfiguration conf = new OzoneConfiguration();
- conf.set(OZONE_SCM_CONTAINER_SIZE, "1GB");
- MiniOzoneCluster cluster =
- MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1).build();
- cluster.waitForClusterToBeReady();
-
- DatanodeDetails datanodeDetails =
- cluster.getHddsDatanodes().get(0).getDatanodeDetails();
- //send the order to replicate the container
- cluster.getStorageContainerManager().getScmNodeManager()
- .addDatanodeCommand(datanodeDetails.getUuid(),
- new ReplicateContainerCommand(1L,
- new ArrayList<>()));
-
- //TODO: here we test only the serialization/unserialization as
- // the implementation is not yet done
- GenericTestUtils
- .waitFor(() -> logCapturer.getOutput().contains("not yet handled"), 500,
- 5 * 1000);
-
- }
-
-}
\ No newline at end of file
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
index f112d26841..5dd88fb584 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
@@ -513,7 +513,7 @@ private static XceiverClientGrpc createClientForTesting(
return new XceiverClientGrpc(pipeline, conf);
}
- private static void createContainerForTesting(XceiverClientSpi client,
+ public static void createContainerForTesting(XceiverClientSpi client,
long containerID) throws Exception {
// Create container
ContainerProtos.ContainerCommandRequestProto request =
@@ -525,7 +525,7 @@ private static void createContainerForTesting(XceiverClientSpi client,
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
}
- private static ContainerProtos.ContainerCommandRequestProto
+ public static ContainerProtos.ContainerCommandRequestProto
writeChunkForContainer(XceiverClientSpi client,
long containerID, int dataLen) throws Exception {
// Write Chunk