From b9932162e9eb4acc9c790fc3c4938a5057fc1658 Mon Sep 17 00:00:00 2001 From: Nanda kumar Date: Tue, 4 Sep 2018 23:41:50 +0530 Subject: [PATCH] HDDS-75. Support for CopyContainer. Contributed by Elek, Marton. --- .../apache/hadoop/ozone/OzoneConfigKeys.java | 3 + .../proto/DatanodeContainerProtocol.proto | 12 +- .../src/main/resources/ozone-default.xml | 10 +- .../container/common/interfaces/Handler.java | 15 +- .../statemachine/DatanodeStateMachine.java | 25 +-- .../ReplicateContainerCommandHandler.java | 124 ++++++++++++- .../transport/server/XceiverServerGrpc.java | 12 +- .../container/keyvalue/KeyValueContainer.java | 3 +- .../container/keyvalue/KeyValueHandler.java | 104 ++++++----- .../container/ozoneimpl/OzoneContainer.java | 9 +- .../replication/ContainerDownloader.java | 40 ++++ .../ContainerReplicationSource.java | 49 +++++ .../replication/ContainerStreamingOutput.java | 45 +++++ .../replication/GrpcReplicationClient.java | 169 +++++++++++++++++ .../replication/GrpcReplicationService.java | 130 +++++++++++++ .../OnDemandContainerReplicationSource.java | 76 ++++++++ .../SimpleContainerDownloader.java | 121 ++++++++++++ .../container/replication/package-info.java | 21 +++ .../TestReplicateContainerCommandHandler.java | 146 +++++++++++++++ .../commandhandler/package-info.java | 22 +++ .../container/TestContainerReplication.java | 175 ++++++++++++++++++ .../TestReplicateContainerHandler.java | 70 ------- .../ozoneimpl/TestOzoneContainer.java | 4 +- 23 files changed, 1246 insertions(+), 139 deletions(-) create mode 100644 hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerDownloader.java create mode 100644 hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerReplicationSource.java create mode 100644 hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerStreamingOutput.java create mode 100644 hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java create mode 100644 hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java create mode 100644 hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java create mode 100644 hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java create mode 100644 hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/package-info.java create mode 100644 hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReplicateContainerCommandHandler.java create mode 100644 hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/package-info.java create mode 100644 hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java delete mode 100644 hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReplicateContainerHandler.java diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java index 8f53da58b4..0f2b108848 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java @@ -265,6 +265,9 @@ public final class OzoneConfigKeys { public static final long HDDS_LOCK_SUPPRESS_WARNING_INTERVAL_MS_DEAFULT = 10000L; + public static final String OZONE_CONTAINER_COPY_WORKDIR = + "hdds.datanode.replication.work.dir"; + /** * There is no need to instantiate this class. */ diff --git a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto index 930f314515..ba0d2d445e 100644 --- a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto +++ b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto @@ -430,16 +430,22 @@ message CopyContainerRequestProto { } message CopyContainerResponseProto { - required string archiveName = 1; + required int64 containerID = 1; required uint64 readOffset = 2; required uint64 len = 3; required bool eof = 4; - repeated bytes data = 5; + required bytes data = 5; optional int64 checksum = 6; } service XceiverClientProtocolService { // A client-to-datanode RPC to send container commands rpc send(stream ContainerCommandRequestProto) returns - (stream ContainerCommandResponseProto) {} + (stream ContainerCommandResponseProto) {}; + +} + +service IntraDatanodeProtocolService { + // An intradatanode service to copy the raw containerdata betwen nodes + rpc download (CopyContainerRequestProto) returns (stream CopyContainerResponseProto); } \ No newline at end of file diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index a9fd10b21a..ca3da41747 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -1124,4 +1124,12 @@ on. Right now, we have SSD and DISK as profile options. - \ No newline at end of file + + hdds.datanode.replication.work.dir + DATANODE + Temporary which is used during the container replication + betweeen datanodes. Should have enough space to store multiple container + (in compressed format), but doesn't require fast io access such as SSD. + + + diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java index 15eed4faa8..53e1c68a4e 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java @@ -19,6 +19,9 @@ package org.apache.hadoop.ozone.container.common.interfaces; +import java.io.FileInputStream; +import java.io.IOException; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .ContainerCommandRequestProto; @@ -30,7 +33,7 @@ import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.common.volume.VolumeSet; import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler; - +import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker; /** * Dispatcher sends ContainerCommandRequests to Handler. Each Container Type @@ -67,6 +70,16 @@ public static Handler getHandlerForContainerType(ContainerType containerType, public abstract ContainerCommandResponseProto handle( ContainerCommandRequestProto msg, Container container); + /** + * Import container data from a raw input stream. + */ + public abstract Container importContainer( + long containerID, + long maxSize, + FileInputStream rawContainerStream, + TarContainerPacker packer) + throws IOException; + public void setScmID(String scmId) { this.scmID = scmId; } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java index 0a23912ff2..875d0638d5 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java @@ -16,12 +16,17 @@ */ package org.apache.hadoop.ozone.container.common.statemachine; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; import org.apache.hadoop.hdds.protocol.proto @@ -39,15 +44,12 @@ import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.apache.hadoop.util.Time; import org.apache.hadoop.util.concurrent.HadoopExecutors; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.Closeable; -import java.io.IOException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; - /** * State Machine Class. */ @@ -87,13 +89,14 @@ public DatanodeStateMachine(DatanodeDetails datanodeDetails, new OzoneConfiguration(conf), context); nextHB = new AtomicLong(Time.monotonicNow()); - // When we add new handlers just adding a new handler here should do the + // When we add new handlers just adding a new handler here should do the // trick. commandDispatcher = CommandDispatcher.newBuilder() .addHandler(new CloseContainerCommandHandler()) .addHandler(new DeleteBlocksCommandHandler(container.getContainerSet(), conf)) - .addHandler(new ReplicateContainerCommandHandler()) + .addHandler(new ReplicateContainerCommandHandler(conf, + container.getContainerSet(), container.getDispatcher())) .setConnectionManager(connectionManager) .setContainer(container) .setContext(context) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java index fe1d4e81af..d1895a8985 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java @@ -16,14 +16,32 @@ */ package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; +import java.io.FileInputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMCommandProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type; +import org.apache.hadoop.ozone.container.common.impl.ContainerData; +import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml; +import org.apache.hadoop.ozone.container.common.impl.ContainerSet; +import org.apache.hadoop.ozone.container.common.interfaces.Container; +import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; +import org.apache.hadoop.ozone.container.common.interfaces.Handler; import org.apache.hadoop.ozone.container.common.statemachine .SCMConnectionManager; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; +import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker; import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; +import org.apache.hadoop.ozone.container.replication.ContainerDownloader; +import org.apache.hadoop.ozone.container.replication.SimpleContainerDownloader; +import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.slf4j.Logger; @@ -33,22 +51,120 @@ * Command handler to copy containers from sources. */ public class ReplicateContainerCommandHandler implements CommandHandler { + static final Logger LOG = LoggerFactory.getLogger(ReplicateContainerCommandHandler.class); + private ContainerDispatcher containerDispatcher; + private int invocationCount; private long totalTime; - private boolean cmdExecuted; + + private ContainerDownloader downloader; + + private Configuration conf; + + private TarContainerPacker packer = new TarContainerPacker(); + + private ContainerSet containerSet; + + private Lock lock = new ReentrantLock(); + + public ReplicateContainerCommandHandler( + Configuration conf, + ContainerSet containerSet, + ContainerDispatcher containerDispatcher, + ContainerDownloader downloader) { + this.conf = conf; + this.containerSet = containerSet; + this.downloader = downloader; + this.containerDispatcher = containerDispatcher; + } + + public ReplicateContainerCommandHandler( + Configuration conf, + ContainerSet containerSet, + ContainerDispatcher containerDispatcher) { + this(conf, containerSet, containerDispatcher, + new SimpleContainerDownloader(conf)); + } @Override public void handle(SCMCommand command, OzoneContainer container, StateContext context, SCMConnectionManager connectionManager) { - LOG.warn("Replicate command is not yet handled"); + + ReplicateContainerCommand replicateCommand = + (ReplicateContainerCommand) command; try { - cmdExecuted = true; + + long containerID = replicateCommand.getContainerID(); + LOG.info("Starting replication of container {} from {}", containerID, + replicateCommand.getSourceDatanodes()); + CompletableFuture tempTarFile = downloader + .getContainerDataFromReplicas(containerID, + replicateCommand.getSourceDatanodes()); + + CompletableFuture result = + tempTarFile.thenAccept(path -> { + LOG.info("Container {} is downloaded, starting to import.", + containerID); + importContainer(containerID, path); + }); + + result.whenComplete((aVoid, throwable) -> { + if (throwable != null) { + LOG.error("Container replication was unsuccessful .", throwable); + } else { + LOG.info("Container {} is replicated successfully", containerID); + } + }); } finally { - updateCommandStatus(context, command, cmdExecuted, LOG); + updateCommandStatus(context, command, true, LOG); + + } + } + + protected void importContainer(long containerID, Path tarFilePath) { + lock.lock(); + try { + ContainerData originalContainerData; + try (FileInputStream tempContainerTarStream = new FileInputStream( + tarFilePath.toFile())) { + byte[] containerDescriptorYaml = + packer.unpackContainerDescriptor(tempContainerTarStream); + originalContainerData = ContainerDataYaml.readContainer( + containerDescriptorYaml); + } + + try (FileInputStream tempContainerTarStream = new FileInputStream( + tarFilePath.toFile())) { + + Handler handler = containerDispatcher.getHandler( + originalContainerData.getContainerType()); + + Container container = handler.importContainer(containerID, + originalContainerData.getMaxSize(), + tempContainerTarStream, + packer); + + containerSet.addContainer(container); + } + + } catch (Exception e) { + LOG.error( + "Can't import the downloaded container data id=" + containerID, + e); + try { + Files.delete(tarFilePath); + } catch (Exception ex) { + LOG.error( + "Container import is failed and the downloaded file can't be " + + "deleted: " + + tarFilePath.toAbsolutePath().toString()); + } + } finally { + lock.unlock(); } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java index 4dc232db7b..4a90144f4e 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java @@ -26,6 +26,8 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; + +import org.apache.ratis.shaded.io.grpc.BindableService; import org.apache.ratis.shaded.io.grpc.Server; import org.apache.ratis.shaded.io.grpc.ServerBuilder; import org.apache.ratis.shaded.io.grpc.netty.NettyServerBuilder; @@ -54,7 +56,7 @@ public final class XceiverServerGrpc implements XceiverServerSpi { * @param conf - Configuration */ public XceiverServerGrpc(DatanodeDetails datanodeDetails, Configuration conf, - ContainerDispatcher dispatcher) { + ContainerDispatcher dispatcher, BindableService... additionalServices) { Preconditions.checkNotNull(conf); this.port = conf.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT, @@ -80,6 +82,14 @@ public XceiverServerGrpc(DatanodeDetails datanodeDetails, Configuration conf, .maxInboundMessageSize(OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE) .addService(new GrpcXceiverService(dispatcher)) .build(); + NettyServerBuilder nettyServerBuilder = + ((NettyServerBuilder) ServerBuilder.forPort(port)) + .maxInboundMessageSize(OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE) + .addService(new GrpcXceiverService(dispatcher)); + for (BindableService service : additionalServices) { + nettyServerBuilder.addService(service); + } + server = nettyServerBuilder.build(); storageContainer = dispatcher; } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java index f60c5e9322..b893a389f1 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java @@ -35,7 +35,8 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .ContainerType; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.hdds.scm.container.common.helpers .StorageContainerException; import org.apache.hadoop.io.nativeio.NativeIO; diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index e25483305c..5acecb4f04 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -18,9 +18,15 @@ package org.apache.hadoop.ozone.container.keyvalue; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.protobuf.ByteString; +import java.io.FileInputStream; +import java.io.IOException; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.hdds.client.BlockID; @@ -35,80 +41,72 @@ .ContainerType; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .GetSmallFileRequestProto; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .KeyValue; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .PutSmallFileRequestProto; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .Type; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.common.helpers .StorageContainerException; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; +import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; +import org.apache.hadoop.ozone.container.common.helpers.KeyData; +import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.common.impl.OpenContainerBlockMap; import org.apache.hadoop.ozone.container.common.interfaces.Container; -import org.apache.hadoop.ozone.container.common.volume.HddsVolume; -import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil; -import org.apache.hadoop.ozone.container.keyvalue.helpers.SmallFileUtils; -import org.apache.hadoop.ozone.container.common.helpers.KeyData; -import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; -import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.common.interfaces.Handler; import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy; +import org.apache.hadoop.ozone.container.common.volume.HddsVolume; import org.apache.hadoop.ozone.container.common.volume .RoundRobinVolumeChoosingPolicy; import org.apache.hadoop.ozone.container.common.volume.VolumeSet; import org.apache.hadoop.ozone.container.keyvalue.helpers.ChunkUtils; +import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyUtils; +import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil; +import org.apache.hadoop.ozone.container.keyvalue.helpers.SmallFileUtils; import org.apache.hadoop.ozone.container.keyvalue.impl.ChunkManagerImpl; import org.apache.hadoop.ozone.container.keyvalue.impl.KeyManagerImpl; -import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyUtils; import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager; import org.apache.hadoop.ozone.container.keyvalue.interfaces.KeyManager; -import org.apache.hadoop.ozone.container.keyvalue.statemachine - .background.BlockDeletingService; +import org.apache.hadoop.ozone.container.keyvalue.statemachine.background + .BlockDeletingService; import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.util.ReflectionUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReentrantLock; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.protobuf.ByteString; +import static org.apache.hadoop.hdds.HddsConfigKeys + .HDDS_DATANODE_VOLUME_CHOOSING_POLICY; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .Result.CONTAINER_INTERNAL_ERROR; + .Result.BLOCK_NOT_COMMITTED; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .Result.CLOSED_CONTAINER_IO; +import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos + .Result.CONTAINER_INTERNAL_ERROR; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .Result.DELETE_ON_OPEN_CONTAINER; -import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .Result.IO_EXCEPTION; -import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .Result.INVALID_CONTAINER_STATE; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .Result.GET_SMALL_FILE_ERROR; +import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos + .Result.INVALID_CONTAINER_STATE; +import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos + .Result.IO_EXCEPTION; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .Result.PUT_SMALL_FILE_ERROR; -import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .Result.BLOCK_NOT_COMMITTED; - import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .Stage; -import static org.apache.hadoop.ozone - .OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL; -import static org.apache.hadoop.ozone - .OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT; -import static org.apache.hadoop.ozone - .OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT; -import static org.apache.hadoop.ozone - .OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT; -import static org.apache.hadoop.hdds.HddsConfigKeys - .HDDS_DATANODE_VOLUME_CHOOSING_POLICY; +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_BLOCK_DELETING_SERVICE_INTERVAL; +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT; +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_BLOCK_DELETING_SERVICE_TIMEOUT; +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Handler for KeyValue Container type. @@ -831,4 +829,22 @@ private void checkContainerOpen(KeyValueContainer kvContainer) throw new StorageContainerException(msg, result); } } + + public Container importContainer(long containerID, long maxSize, + FileInputStream rawContainerStream, + TarContainerPacker packer) + throws IOException { + + KeyValueContainerData containerData = + new KeyValueContainerData(containerID, + maxSize); + + KeyValueContainer container = new KeyValueContainer(containerData, + conf); + + populateContainerPathFields(container, maxSize); + container.importContainerData(rawContainerStream, packer); + return container; + + } } \ No newline at end of file diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java index 812777b6d4..b1bf38129c 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java @@ -35,6 +35,9 @@ import org.apache.hadoop.ozone.container.common.volume.HddsVolume; import org.apache.hadoop.ozone.container.common.volume.VolumeSet; +import org.apache.hadoop.ozone.container.replication.GrpcReplicationService; +import org.apache.hadoop.ozone.container.replication + .OnDemandContainerReplicationSource; import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -79,7 +82,7 @@ public OzoneContainer(DatanodeDetails datanodeDetails, OzoneConfiguration context); server = new XceiverServerSpi[]{ new XceiverServerGrpc(datanodeDetails, this.config, this - .hddsDispatcher), + .hddsDispatcher, createReplicationService()), XceiverServerRatis.newXceiverServerRatis(datanodeDetails, this .config, hddsDispatcher) }; @@ -87,6 +90,10 @@ public OzoneContainer(DatanodeDetails datanodeDetails, OzoneConfiguration } + private GrpcReplicationService createReplicationService() { + return new GrpcReplicationService( + new OnDemandContainerReplicationSource(containerSet)); + } /** * Build's container map. diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerDownloader.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerDownloader.java new file mode 100644 index 0000000000..9511241fb5 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerDownloader.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.replication; + +import java.io.Closeable; +import java.nio.file.Path; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +import org.apache.hadoop.hdds.protocol.DatanodeDetails; + +/** + * Service to download container data from other datanodes. + *

+ * The implementation of this interface should copy the raw container data in + * compressed form to working directory. + *

+ * A smart implementation would use multiple sources to do parallel download. + */ +public interface ContainerDownloader extends Closeable { + + CompletableFuture getContainerDataFromReplicas(long containerId, + List sources); + +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerReplicationSource.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerReplicationSource.java new file mode 100644 index 0000000000..69582f799f --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerReplicationSource.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.replication; + +import java.io.IOException; +import java.io.OutputStream; + +/** + * Contract to prepare provide the container in binary form.. + *

+ * Prepare will be called when container is closed. An implementation could + * precache any binary representation of a container and store the pre packede + * images. + */ +public interface ContainerReplicationSource { + + /** + * Prepare for the replication. + * + * @param containerId The name of the container the package. + */ + void prepare(long containerId); + + /** + * Copy the container data to an output stream. + * + * @param containerId Container to replicate + * @param destination The destination stream to copy all the container data. + * @throws IOException + */ + void copyData(long containerId, OutputStream destination) + throws IOException; + +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerStreamingOutput.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerStreamingOutput.java new file mode 100644 index 0000000000..f7fd8a4957 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerStreamingOutput.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.replication; + +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.StreamingOutput; +import java.io.IOException; +import java.io.OutputStream; + +/** + * JAX-RS streaming output to return the binary container data. + */ +public class ContainerStreamingOutput implements StreamingOutput { + + private long containerId; + + private ContainerReplicationSource containerReplicationSource; + + public ContainerStreamingOutput(long containerId, + ContainerReplicationSource containerReplicationSource) { + this.containerId = containerId; + this.containerReplicationSource = containerReplicationSource; + } + + @Override + public void write(OutputStream outputStream) + throws IOException, WebApplicationException { + containerReplicationSource.copyData(containerId, outputStream); + } +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java new file mode 100644 index 0000000000..91d098f0b0 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java @@ -0,0 +1,169 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.replication; + +import java.io.BufferedOutputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.concurrent.CompletableFuture; + +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos + .CopyContainerRequestProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos + .CopyContainerResponseProto; +import org.apache.hadoop.hdds.protocol.datanode.proto + .IntraDatanodeProtocolServiceGrpc; +import org.apache.hadoop.hdds.protocol.datanode.proto + .IntraDatanodeProtocolServiceGrpc.IntraDatanodeProtocolServiceStub; +import org.apache.hadoop.ozone.OzoneConfigKeys; + +import com.google.common.base.Preconditions; +import org.apache.ratis.shaded.io.grpc.ManagedChannel; +import org.apache.ratis.shaded.io.grpc.netty.NettyChannelBuilder; +import org.apache.ratis.shaded.io.grpc.stub.StreamObserver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Client to read container data from Grpc. + */ +public class GrpcReplicationClient { + + private static final Logger LOG = + LoggerFactory.getLogger(GrpcReplicationClient.class); + + private final ManagedChannel channel; + + private final IntraDatanodeProtocolServiceStub client; + + private final Path workingDirectory; + + public GrpcReplicationClient(String host, + int port, Path workingDir) { + + channel = NettyChannelBuilder.forAddress(host, port) + .usePlaintext() + .maxInboundMessageSize(OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE) + .build(); + client = IntraDatanodeProtocolServiceGrpc.newStub(channel); + this.workingDirectory = workingDir; + + } + + public CompletableFuture download(long containerId) { + CopyContainerRequestProto request = + CopyContainerRequestProto.newBuilder() + .setContainerID(containerId) + .setLen(-1) + .setReadOffset(0) + .build(); + + CompletableFuture response = new CompletableFuture<>(); + + Path destinationPath = + getWorkingDirectory().resolve("container-" + containerId + ".tar.gz"); + + client.download(request, + new StreamDownloader(containerId, response, destinationPath)); + return response; + } + + private Path getWorkingDirectory() { + return workingDirectory; + } + + public void shutdown() { + channel.shutdown(); + } + + /** + * Grpc stream observer to ComletableFuture adapter. + */ + public static class StreamDownloader + implements StreamObserver { + + private final CompletableFuture response; + + private final long containerId; + + private BufferedOutputStream stream; + + private Path outputPath; + + public StreamDownloader(long containerId, CompletableFuture response, + Path outputPath) { + this.response = response; + this.containerId = containerId; + this.outputPath = outputPath; + try { + outputPath = Preconditions.checkNotNull(outputPath); + Path parentPath = Preconditions.checkNotNull(outputPath.getParent()); + Files.createDirectories(parentPath); + stream = + new BufferedOutputStream(new FileOutputStream(outputPath.toFile())); + } catch (IOException e) { + throw new RuntimeException("OutputPath can't be used: " + outputPath, + e); + } + + } + + @Override + public void onNext(CopyContainerResponseProto chunk) { + try { + stream.write(chunk.getData().toByteArray()); + } catch (IOException e) { + response.completeExceptionally(e); + } + } + + @Override + public void onError(Throwable throwable) { + try { + stream.close(); + LOG.error("Container download was unsuccessfull", throwable); + try { + Files.delete(outputPath); + } catch (IOException ex) { + LOG.error( + "Error happened during the download but can't delete the " + + "temporary destination.", ex); + } + response.completeExceptionally(throwable); + } catch (IOException e) { + response.completeExceptionally(e); + } + } + + @Override + public void onCompleted() { + try { + stream.close(); + response.complete(outputPath); + LOG.info("Container is downloaded to {}", outputPath); + } catch (IOException e) { + response.completeExceptionally(e); + } + + } + } + +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java new file mode 100644 index 0000000000..d8f696f47d --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.replication; + +import java.io.ByteArrayOutputStream; +import java.io.Closeable; +import java.io.IOException; +import java.io.OutputStream; + +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos + .CopyContainerRequestProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos + .CopyContainerResponseProto; +import org.apache.hadoop.hdds.protocol.datanode.proto + .IntraDatanodeProtocolServiceGrpc; + +import org.apache.ratis.shaded.com.google.protobuf.ByteString; +import org.apache.ratis.shaded.io.grpc.stub.StreamObserver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Service to make containers available for replication. + */ +public class GrpcReplicationService extends + IntraDatanodeProtocolServiceGrpc.IntraDatanodeProtocolServiceImplBase { + + private static final Logger LOG = + LoggerFactory.getLogger(GrpcReplicationService.class); + + private final ContainerReplicationSource containerReplicationSource; + + public GrpcReplicationService( + ContainerReplicationSource containerReplicationSource) { + this.containerReplicationSource = containerReplicationSource; + } + + @Override + public void download(CopyContainerRequestProto request, + StreamObserver responseObserver) { + LOG.info("Streaming container data ({}) to other datanode", + request.getContainerID()); + try { + GrpcOutputStream outputStream = + new GrpcOutputStream(responseObserver, request.getContainerID()); + containerReplicationSource + .copyData(request.getContainerID(), outputStream); + + } catch (IOException e) { + LOG.error("Can't stream the container data", e); + responseObserver.onError(e); + } + } + + private static class GrpcOutputStream extends OutputStream + implements Closeable { + + private static final int BUFFER_SIZE_IN_BYTES = 1024 * 1024; + + private final StreamObserver responseObserver; + + private final ByteArrayOutputStream buffer = new ByteArrayOutputStream(); + + private long containerId; + + private int readOffset = 0; + + private int writtenBytes; + + GrpcOutputStream( + StreamObserver responseObserver, + long containerId) { + this.responseObserver = responseObserver; + this.containerId = containerId; + } + + @Override + public void write(int b) throws IOException { + try { + buffer.write(b); + if (buffer.size() > BUFFER_SIZE_IN_BYTES) { + flushBuffer(false); + } + } catch (Exception ex) { + responseObserver.onError(ex); + } + } + + private void flushBuffer(boolean eof) { + if (buffer.size() > 0) { + CopyContainerResponseProto response = + CopyContainerResponseProto.newBuilder() + .setContainerID(containerId) + .setData(ByteString.copyFrom(buffer.toByteArray())) + .setEof(eof) + .setReadOffset(readOffset) + .setLen(buffer.size()) + .build(); + responseObserver.onNext(response); + readOffset += buffer.size(); + writtenBytes += buffer.size(); + buffer.reset(); + } + } + + @Override + public void close() throws IOException { + flushBuffer(true); + LOG.info("{} bytes written to the rpc stream from container {}", + writtenBytes, containerId); + responseObserver.onCompleted(); + } + } +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java new file mode 100644 index 0000000000..d557b548b4 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.replication; + +import java.io.IOException; +import java.io.OutputStream; + +import org.apache.hadoop.ozone.container.common.impl.ContainerSet; +import org.apache.hadoop.ozone.container.common.interfaces.Container; +import org.apache.hadoop.ozone.container.common.interfaces.ContainerPacker; +import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker; + +import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A naive implementation of the replication source which creates a tar file + * on-demand without pre-create the compressed archives. + */ +public class OnDemandContainerReplicationSource + implements ContainerReplicationSource { + + private static final Logger LOG = + LoggerFactory.getLogger(ContainerReplicationSource.class); + + private ContainerSet containerSet; + + private ContainerPacker packer = new TarContainerPacker(); + + public OnDemandContainerReplicationSource( + ContainerSet containerSet) { + this.containerSet = containerSet; + } + + @Override + public void prepare(long containerId) { + + } + + @Override + public void copyData(long containerId, OutputStream destination) + throws IOException { + + Container container = containerSet.getContainer(containerId); + + Preconditions + .checkNotNull(container, "Container is not found " + containerId); + + switch (container.getContainerType()) { + case KeyValueContainer: + packer.pack(container, + destination); + break; + default: + LOG.warn("Container type " + container.getContainerType() + + " is not replicable as no compression algorithm for that."); + } + + } +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java new file mode 100644 index 0000000000..a461a98f23 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.replication; + +import java.io.IOException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name; +import org.apache.hadoop.ozone.OzoneConfigKeys; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Simple ContainerDownloaderImplementation to download the missing container + * from the first available datanode. + *

+ * This is not the most effective implementation as it uses only one source + * for he container download. + */ +public class SimpleContainerDownloader implements ContainerDownloader { + + private static final Logger LOG = + LoggerFactory.getLogger(SimpleContainerDownloader.class); + + private final Path workingDirectory; + + private ExecutorService executor; + + public SimpleContainerDownloader(Configuration conf) { + + String workDirString = + conf.get(OzoneConfigKeys.OZONE_CONTAINER_COPY_WORKDIR); + + if (workDirString == null) { + workingDirectory = Paths.get(System.getProperty("java.io.tmpdir")) + .resolve("container-copy"); + } else { + workingDirectory = Paths.get(workDirString); + } + + ThreadFactory build = new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("Container downloader thread - %d").build(); + executor = Executors.newSingleThreadExecutor(build); + LOG.info("Starting container downloader service to copy " + + "containers to replicate."); + } + + @Override + public CompletableFuture getContainerDataFromReplicas(long containerId, + List sourceDatanodes) { + + CompletableFuture result = null; + for (DatanodeDetails datanode : sourceDatanodes) { + try { + + if (result == null) { + GrpcReplicationClient grpcReplicationClient = + new GrpcReplicationClient(datanode.getIpAddress(), + datanode.getPort(Name.STANDALONE).getValue(), + workingDirectory); + result = grpcReplicationClient.download(containerId); + } else { + result = result.thenApply(CompletableFuture::completedFuture) + .exceptionally(t -> { + LOG.error("Error on replicating container: " + containerId, t); + GrpcReplicationClient grpcReplicationClient = + new GrpcReplicationClient(datanode.getIpAddress(), + datanode.getPort(Name.STANDALONE).getValue(), + workingDirectory); + return grpcReplicationClient.download(containerId); + }).thenCompose(Function.identity()); + } + } catch (Exception ex) { + LOG.error(String.format( + "Container %s download from datanode %s was unsuccessful. " + + "Trying the next datanode", containerId, datanode), ex); + } + + } + return result; + + } + + @Override + public void close() throws IOException { + try { + executor.awaitTermination(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOG.error("Can't stop container downloader gracefully", e); + } + } +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/package-info.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/package-info.java new file mode 100644 index 0000000000..38a853c72a --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/package-info.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.replication; +/** + Classes to replicate container data between datanodes. +**/ \ No newline at end of file diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReplicateContainerCommandHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReplicateContainerCommandHandler.java new file mode 100644 index 0000000000..6a14d333e2 --- /dev/null +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReplicateContainerCommandHandler.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; + +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeoutException; + +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.ozone.container.common.impl.ContainerSet; +import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; +import org.apache.hadoop.ozone.container.common.statemachine.StateContext; +import org.apache.hadoop.ozone.container.replication.ContainerDownloader; +import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.test.TestGenericTestUtils; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +/** + * Test replication command handler. + */ +public class TestReplicateContainerCommandHandler { + + private static final String EXCEPTION_MESSAGE = "Oh my god"; + private ReplicateContainerCommandHandler handler; + private StubDownloader downloader; + private ReplicateContainerCommand command; + private List importedContainerIds; + + @Before + public void init() { + importedContainerIds = new ArrayList<>(); + + OzoneConfiguration conf = new OzoneConfiguration(); + ContainerSet containerSet = Mockito.mock(ContainerSet.class); + ContainerDispatcher containerDispatcher = + Mockito.mock(ContainerDispatcher.class); + + downloader = new StubDownloader(); + + handler = new ReplicateContainerCommandHandler(conf, containerSet, + containerDispatcher, downloader) { + @Override + protected void importContainer(long containerID, Path tarFilePath) { + importedContainerIds.add(containerID); + } + }; + + //the command + ArrayList datanodeDetails = new ArrayList<>(); + datanodeDetails.add(Mockito.mock(DatanodeDetails.class)); + datanodeDetails.add(Mockito.mock(DatanodeDetails.class)); + + command = new ReplicateContainerCommand(1L, datanodeDetails); + } + + @Test + public void handle() throws TimeoutException, InterruptedException { + //GIVEN + + //WHEN + handler.handle(command, null, Mockito.mock(StateContext.class), null); + + TestGenericTestUtils + .waitFor(() -> downloader.futureByContainers.size() == 1, 100, 2000); + + Assert.assertNotNull(downloader.futureByContainers.get(1L)); + downloader.futureByContainers.get(1L).complete(Paths.get("/tmp/test")); + + TestGenericTestUtils + .waitFor(() -> importedContainerIds.size() == 1, 100, 2000); + } + + @Test + public void handleWithErrors() throws TimeoutException, InterruptedException { + //GIVEN + GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer + .captureLogs(ReplicateContainerCommandHandler.LOG); + + //WHEN + handler.handle(command, null, Mockito.mock(StateContext.class), null); + + //THEN + + TestGenericTestUtils + .waitFor(() -> downloader.futureByContainers.size() == 1, 100, 2000); + + Assert.assertNotNull(downloader.futureByContainers.get(1L)); + downloader.futureByContainers.get(1L) + .completeExceptionally(new IllegalArgumentException( + EXCEPTION_MESSAGE)); + + TestGenericTestUtils + .waitFor(() -> { + String output = logCapturer.getOutput(); + return output.contains("unsuccessful") && output + .contains(EXCEPTION_MESSAGE); }, + 100, + 2000); + } + + private static class StubDownloader implements ContainerDownloader { + + private Map> futureByContainers = + new HashMap<>(); + + @Override + public void close() { + + } + + @Override + public CompletableFuture getContainerDataFromReplicas( + long containerId, List sources) { + CompletableFuture future = new CompletableFuture<>(); + futureByContainers.put(containerId, future); + return future; + } + } + +} \ No newline at end of file diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/package-info.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/package-info.java new file mode 100644 index 0000000000..05ac76d143 --- /dev/null +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/package-info.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Tests for command handlers. + */ +package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java new file mode 100644 index 0000000000..7391b25b42 --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container; + +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos + .ContainerCommandRequestProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos + .ContainerCommandResponseProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos + .ContainerType; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos + .DatanodeBlockID; +import org.apache.hadoop.hdds.scm.XceiverClientGrpc; +import org.apache.hadoop.hdds.scm.XceiverClientSpi; +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.ozone.HddsDatanodeService; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.container.common.helpers.KeyData; +import org.apache.hadoop.ozone.container.common.interfaces.Container; +import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; +import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler; +import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; +import org.apache.hadoop.ozone.container.ozoneimpl.TestOzoneContainer; +import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand; + +import static org.apache.hadoop.ozone.container.ozoneimpl.TestOzoneContainer + .writeChunkForContainer; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; + +/** + * Tests ozone containers replication. + */ +public class TestContainerReplication { + /** + * Set the timeout for every test. + */ + @Rule + public Timeout testTimeout = new Timeout(300000); + + @Test + public void testContainerReplication() throws Exception { + //GIVEN + OzoneConfiguration conf = newOzoneConfiguration(); + + long containerId = 1L; + + conf.setSocketAddr("hdls.datanode.http-address", + new InetSocketAddress("0.0.0.0", 0)); + + MiniOzoneCluster cluster = + MiniOzoneCluster.newBuilder(conf).setNumDatanodes(2) + .setRandomContainerPort(true).build(); + cluster.waitForClusterToBeReady(); + + HddsDatanodeService firstDatanode = cluster.getHddsDatanodes().get(0); + + //copy from the first datanode + List sourceDatanodes = new ArrayList<>(); + sourceDatanodes.add(firstDatanode.getDatanodeDetails()); + + Pipeline sourcePipelines = + ContainerTestHelper.createPipeline(sourceDatanodes); + + //create a new client + XceiverClientSpi client = new XceiverClientGrpc(sourcePipelines, conf); + client.connect(); + + //New container for testing + TestOzoneContainer.createContainerForTesting(client, containerId); + + ContainerCommandRequestProto requestProto = + writeChunkForContainer(client, containerId, 1024); + + DatanodeBlockID blockID = requestProto.getWriteChunk().getBlockID(); + + // Put Key to the test container + ContainerCommandRequestProto putKeyRequest = ContainerTestHelper + .getPutKeyRequest(sourcePipelines, requestProto.getWriteChunk()); + + ContainerProtos.KeyData keyData = putKeyRequest.getPutKey().getKeyData(); + + ContainerCommandResponseProto response = client.sendCommand(putKeyRequest); + + Assert.assertNotNull(response); + Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult()); + Assert.assertTrue(putKeyRequest.getTraceID().equals(response.getTraceID())); + + HddsDatanodeService destinationDatanode = + chooseDatanodeWithoutContainer(sourcePipelines, + cluster.getHddsDatanodes()); + + //WHEN: send the order to replicate the container + cluster.getStorageContainerManager().getScmNodeManager() + .addDatanodeCommand(destinationDatanode.getDatanodeDetails().getUuid(), + new ReplicateContainerCommand(containerId, + sourcePipelines.getMachines())); + + Thread.sleep(3000); + + OzoneContainer ozoneContainer = + destinationDatanode.getDatanodeStateMachine().getContainer(); + + + + Container container = + ozoneContainer + .getContainerSet().getContainer(containerId); + + Assert.assertNotNull( + "Container is not replicated to the destination datanode", + container); + + Assert.assertNotNull( + "ContainerData of the replicated container is null", + container.getContainerData()); + + long keyCount = ((KeyValueContainerData) container.getContainerData()) + .getKeyCount(); + + KeyValueHandler handler = (KeyValueHandler) ozoneContainer.getDispatcher() + .getHandler(ContainerType.KeyValueContainer); + + KeyData key = handler.getKeyManager() + .getKey(container, BlockID.getFromProtobuf(blockID)); + + Assert.assertNotNull(key); + Assert.assertEquals(1, key.getChunks().size()); + Assert.assertEquals(requestProto.getWriteChunk().getChunkData(), + key.getChunks().get(0)); + + } + + private HddsDatanodeService chooseDatanodeWithoutContainer(Pipeline pipeline, + List dataNodes) { + for (HddsDatanodeService datanode : dataNodes) { + if (!pipeline.getMachines().contains(datanode.getDatanodeDetails())) { + return datanode; + } + } + throw new AssertionError("No datanode outside of the pipeline"); + } + + static OzoneConfiguration newOzoneConfiguration() { + final OzoneConfiguration conf = new OzoneConfiguration(); + return conf; + } + +} diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReplicateContainerHandler.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReplicateContainerHandler.java deleted file mode 100644 index 9e08212629..0000000000 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReplicateContainerHandler.java +++ /dev/null @@ -1,70 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.concurrent.TimeoutException; - -import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.ozone.MiniOzoneCluster; -import org.apache.hadoop.ozone.client.rest.OzoneException; -import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand; -import org.apache.hadoop.test.GenericTestUtils; - -import static org.apache.hadoop.hdds.scm.ScmConfigKeys - .OZONE_SCM_CONTAINER_SIZE; -import org.junit.Test; - -/** - * Tests the behavior of the datanode, when replicate container command is - * received. - */ -public class TestReplicateContainerHandler { - - @Test - public void test() throws IOException, TimeoutException, InterruptedException, - OzoneException { - - GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer - .captureLogs(ReplicateContainerCommandHandler.LOG); - - OzoneConfiguration conf = new OzoneConfiguration(); - conf.set(OZONE_SCM_CONTAINER_SIZE, "1GB"); - MiniOzoneCluster cluster = - MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1).build(); - cluster.waitForClusterToBeReady(); - - DatanodeDetails datanodeDetails = - cluster.getHddsDatanodes().get(0).getDatanodeDetails(); - //send the order to replicate the container - cluster.getStorageContainerManager().getScmNodeManager() - .addDatanodeCommand(datanodeDetails.getUuid(), - new ReplicateContainerCommand(1L, - new ArrayList<>())); - - //TODO: here we test only the serialization/unserialization as - // the implementation is not yet done - GenericTestUtils - .waitFor(() -> logCapturer.getOutput().contains("not yet handled"), 500, - 5 * 1000); - - } - -} \ No newline at end of file diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java index f112d26841..5dd88fb584 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java @@ -513,7 +513,7 @@ private static XceiverClientGrpc createClientForTesting( return new XceiverClientGrpc(pipeline, conf); } - private static void createContainerForTesting(XceiverClientSpi client, + public static void createContainerForTesting(XceiverClientSpi client, long containerID) throws Exception { // Create container ContainerProtos.ContainerCommandRequestProto request = @@ -525,7 +525,7 @@ private static void createContainerForTesting(XceiverClientSpi client, Assert.assertTrue(request.getTraceID().equals(response.getTraceID())); } - private static ContainerProtos.ContainerCommandRequestProto + public static ContainerProtos.ContainerCommandRequestProto writeChunkForContainer(XceiverClientSpi client, long containerID, int dataLen) throws Exception { // Write Chunk