HDDS-75. Support for CopyContainer. Contributed by Elek, Marton.

This commit is contained in:
Nanda kumar 2018-09-04 23:41:50 +05:30
parent 9e96ac666d
commit b9932162e9
23 changed files with 1246 additions and 139 deletions

View File

@ -265,6 +265,9 @@ public final class OzoneConfigKeys {
public static final long
HDDS_LOCK_SUPPRESS_WARNING_INTERVAL_MS_DEAFULT = 10000L;
public static final String OZONE_CONTAINER_COPY_WORKDIR =
"hdds.datanode.replication.work.dir";
/**
* There is no need to instantiate this class.
*/

View File

@ -430,16 +430,22 @@ message CopyContainerRequestProto {
}
message CopyContainerResponseProto {
required string archiveName = 1;
required int64 containerID = 1;
required uint64 readOffset = 2;
required uint64 len = 3;
required bool eof = 4;
repeated bytes data = 5;
required bytes data = 5;
optional int64 checksum = 6;
}
service XceiverClientProtocolService {
// A client-to-datanode RPC to send container commands
rpc send(stream ContainerCommandRequestProto) returns
(stream ContainerCommandResponseProto) {}
(stream ContainerCommandResponseProto) {};
}
service IntraDatanodeProtocolService {
// An intradatanode service to copy the raw containerdata betwen nodes
rpc download (CopyContainerRequestProto) returns (stream CopyContainerResponseProto);
}

View File

@ -1124,4 +1124,12 @@
on. Right now, we have SSD and DISK as profile options.</description>
</property>
</configuration>
<property>
<name>hdds.datanode.replication.work.dir</name>
<tag>DATANODE</tag>
<description>Temporary which is used during the container replication
betweeen datanodes. Should have enough space to store multiple container
(in compressed format), but doesn't require fast io access such as SSD.
</description>
</property>
</configuration>

View File

@ -19,6 +19,9 @@
package org.apache.hadoop.ozone.container.common.interfaces;
import java.io.FileInputStream;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerCommandRequestProto;
@ -30,7 +33,7 @@
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
/**
* Dispatcher sends ContainerCommandRequests to Handler. Each Container Type
@ -67,6 +70,16 @@ public static Handler getHandlerForContainerType(ContainerType containerType,
public abstract ContainerCommandResponseProto handle(
ContainerCommandRequestProto msg, Container container);
/**
* Import container data from a raw input stream.
*/
public abstract Container importContainer(
long containerID,
long maxSize,
FileInputStream rawContainerStream,
TarContainerPacker packer)
throws IOException;
public void setScmID(String scmId) {
this.scmID = scmId;
}

View File

@ -16,12 +16,17 @@
*/
package org.apache.hadoop.ozone.container.common.statemachine;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.protocol.proto
@ -39,15 +44,12 @@
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/**
* State Machine Class.
*/
@ -87,13 +89,14 @@ public DatanodeStateMachine(DatanodeDetails datanodeDetails,
new OzoneConfiguration(conf), context);
nextHB = new AtomicLong(Time.monotonicNow());
// When we add new handlers just adding a new handler here should do the
// When we add new handlers just adding a new handler here should do the
// trick.
commandDispatcher = CommandDispatcher.newBuilder()
.addHandler(new CloseContainerCommandHandler())
.addHandler(new DeleteBlocksCommandHandler(container.getContainerSet(),
conf))
.addHandler(new ReplicateContainerCommandHandler())
.addHandler(new ReplicateContainerCommandHandler(conf,
container.getContainerSet(), container.getDispatcher()))
.setConnectionManager(connectionManager)
.setContainer(container)
.setContext(context)

View File

@ -16,14 +16,32 @@
*/
package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
import java.io.FileInputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
import org.apache.hadoop.ozone.container.common.interfaces.Handler;
import org.apache.hadoop.ozone.container.common.statemachine
.SCMConnectionManager;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.container.replication.ContainerDownloader;
import org.apache.hadoop.ozone.container.replication.SimpleContainerDownloader;
import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.slf4j.Logger;
@ -33,22 +51,120 @@
* Command handler to copy containers from sources.
*/
public class ReplicateContainerCommandHandler implements CommandHandler {
static final Logger LOG =
LoggerFactory.getLogger(ReplicateContainerCommandHandler.class);
private ContainerDispatcher containerDispatcher;
private int invocationCount;
private long totalTime;
private boolean cmdExecuted;
private ContainerDownloader downloader;
private Configuration conf;
private TarContainerPacker packer = new TarContainerPacker();
private ContainerSet containerSet;
private Lock lock = new ReentrantLock();
public ReplicateContainerCommandHandler(
Configuration conf,
ContainerSet containerSet,
ContainerDispatcher containerDispatcher,
ContainerDownloader downloader) {
this.conf = conf;
this.containerSet = containerSet;
this.downloader = downloader;
this.containerDispatcher = containerDispatcher;
}
public ReplicateContainerCommandHandler(
Configuration conf,
ContainerSet containerSet,
ContainerDispatcher containerDispatcher) {
this(conf, containerSet, containerDispatcher,
new SimpleContainerDownloader(conf));
}
@Override
public void handle(SCMCommand command, OzoneContainer container,
StateContext context, SCMConnectionManager connectionManager) {
LOG.warn("Replicate command is not yet handled");
ReplicateContainerCommand replicateCommand =
(ReplicateContainerCommand) command;
try {
cmdExecuted = true;
long containerID = replicateCommand.getContainerID();
LOG.info("Starting replication of container {} from {}", containerID,
replicateCommand.getSourceDatanodes());
CompletableFuture<Path> tempTarFile = downloader
.getContainerDataFromReplicas(containerID,
replicateCommand.getSourceDatanodes());
CompletableFuture<Void> result =
tempTarFile.thenAccept(path -> {
LOG.info("Container {} is downloaded, starting to import.",
containerID);
importContainer(containerID, path);
});
result.whenComplete((aVoid, throwable) -> {
if (throwable != null) {
LOG.error("Container replication was unsuccessful .", throwable);
} else {
LOG.info("Container {} is replicated successfully", containerID);
}
});
} finally {
updateCommandStatus(context, command, cmdExecuted, LOG);
updateCommandStatus(context, command, true, LOG);
}
}
protected void importContainer(long containerID, Path tarFilePath) {
lock.lock();
try {
ContainerData originalContainerData;
try (FileInputStream tempContainerTarStream = new FileInputStream(
tarFilePath.toFile())) {
byte[] containerDescriptorYaml =
packer.unpackContainerDescriptor(tempContainerTarStream);
originalContainerData = ContainerDataYaml.readContainer(
containerDescriptorYaml);
}
try (FileInputStream tempContainerTarStream = new FileInputStream(
tarFilePath.toFile())) {
Handler handler = containerDispatcher.getHandler(
originalContainerData.getContainerType());
Container container = handler.importContainer(containerID,
originalContainerData.getMaxSize(),
tempContainerTarStream,
packer);
containerSet.addContainer(container);
}
} catch (Exception e) {
LOG.error(
"Can't import the downloaded container data id=" + containerID,
e);
try {
Files.delete(tarFilePath);
} catch (Exception ex) {
LOG.error(
"Container import is failed and the downloaded file can't be "
+ "deleted: "
+ tarFilePath.toAbsolutePath().toString());
}
} finally {
lock.unlock();
}
}

View File

@ -26,6 +26,8 @@
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
import org.apache.ratis.shaded.io.grpc.BindableService;
import org.apache.ratis.shaded.io.grpc.Server;
import org.apache.ratis.shaded.io.grpc.ServerBuilder;
import org.apache.ratis.shaded.io.grpc.netty.NettyServerBuilder;
@ -54,7 +56,7 @@ public final class XceiverServerGrpc implements XceiverServerSpi {
* @param conf - Configuration
*/
public XceiverServerGrpc(DatanodeDetails datanodeDetails, Configuration conf,
ContainerDispatcher dispatcher) {
ContainerDispatcher dispatcher, BindableService... additionalServices) {
Preconditions.checkNotNull(conf);
this.port = conf.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
@ -80,6 +82,14 @@ public XceiverServerGrpc(DatanodeDetails datanodeDetails, Configuration conf,
.maxInboundMessageSize(OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE)
.addService(new GrpcXceiverService(dispatcher))
.build();
NettyServerBuilder nettyServerBuilder =
((NettyServerBuilder) ServerBuilder.forPort(port))
.maxInboundMessageSize(OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE)
.addService(new GrpcXceiverService(dispatcher));
for (BindableService service : additionalServices) {
nettyServerBuilder.addService(service);
}
server = nettyServerBuilder.build();
storageContainer = dispatcher;
}

View File

@ -35,7 +35,8 @@
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.scm.container.common.helpers
.StorageContainerException;
import org.apache.hadoop.io.nativeio.NativeIO;

View File

@ -18,9 +18,15 @@
package org.apache.hadoop.ozone.container.keyvalue;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.client.BlockID;
@ -35,80 +41,72 @@
.ContainerType;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.GetSmallFileRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.KeyValue;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.PutSmallFileRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.Type;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.common.helpers
.StorageContainerException;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
import org.apache.hadoop.ozone.container.common.helpers.KeyData;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.impl.OpenContainerBlockMap;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil;
import org.apache.hadoop.ozone.container.keyvalue.helpers.SmallFileUtils;
import org.apache.hadoop.ozone.container.common.helpers.KeyData;
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.interfaces.Handler;
import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.common.volume
.RoundRobinVolumeChoosingPolicy;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
import org.apache.hadoop.ozone.container.keyvalue.helpers.ChunkUtils;
import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyUtils;
import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil;
import org.apache.hadoop.ozone.container.keyvalue.helpers.SmallFileUtils;
import org.apache.hadoop.ozone.container.keyvalue.impl.ChunkManagerImpl;
import org.apache.hadoop.ozone.container.keyvalue.impl.KeyManagerImpl;
import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyUtils;
import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager;
import org.apache.hadoop.ozone.container.keyvalue.interfaces.KeyManager;
import org.apache.hadoop.ozone.container.keyvalue.statemachine
.background.BlockDeletingService;
import org.apache.hadoop.ozone.container.keyvalue.statemachine.background
.BlockDeletingService;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import static org.apache.hadoop.hdds.HddsConfigKeys
.HDDS_DATANODE_VOLUME_CHOOSING_POLICY;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.Result.CONTAINER_INTERNAL_ERROR;
.Result.BLOCK_NOT_COMMITTED;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.Result.CLOSED_CONTAINER_IO;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.Result.CONTAINER_INTERNAL_ERROR;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.Result.DELETE_ON_OPEN_CONTAINER;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.Result.IO_EXCEPTION;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.Result.INVALID_CONTAINER_STATE;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.Result.GET_SMALL_FILE_ERROR;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.Result.INVALID_CONTAINER_STATE;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.Result.IO_EXCEPTION;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.Result.PUT_SMALL_FILE_ERROR;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.Result.BLOCK_NOT_COMMITTED;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.Stage;
import static org.apache.hadoop.ozone
.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
import static org.apache.hadoop.ozone
.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT;
import static org.apache.hadoop.ozone
.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT;
import static org.apache.hadoop.ozone
.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT;
import static org.apache.hadoop.hdds.HddsConfigKeys
.HDDS_DATANODE_VOLUME_CHOOSING_POLICY;
import static org.apache.hadoop.ozone.OzoneConfigKeys
.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
import static org.apache.hadoop.ozone.OzoneConfigKeys
.OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys
.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT;
import static org.apache.hadoop.ozone.OzoneConfigKeys
.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Handler for KeyValue Container type.
@ -831,4 +829,22 @@ private void checkContainerOpen(KeyValueContainer kvContainer)
throw new StorageContainerException(msg, result);
}
}
public Container importContainer(long containerID, long maxSize,
FileInputStream rawContainerStream,
TarContainerPacker packer)
throws IOException {
KeyValueContainerData containerData =
new KeyValueContainerData(containerID,
maxSize);
KeyValueContainer container = new KeyValueContainer(containerData,
conf);
populateContainerPathFields(container, maxSize);
container.importContainerData(rawContainerStream, packer);
return container;
}
}

View File

@ -35,6 +35,9 @@
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
import org.apache.hadoop.ozone.container.replication.GrpcReplicationService;
import org.apache.hadoop.ozone.container.replication
.OnDemandContainerReplicationSource;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -79,7 +82,7 @@ public OzoneContainer(DatanodeDetails datanodeDetails, OzoneConfiguration
context);
server = new XceiverServerSpi[]{
new XceiverServerGrpc(datanodeDetails, this.config, this
.hddsDispatcher),
.hddsDispatcher, createReplicationService()),
XceiverServerRatis.newXceiverServerRatis(datanodeDetails, this
.config, hddsDispatcher)
};
@ -87,6 +90,10 @@ public OzoneContainer(DatanodeDetails datanodeDetails, OzoneConfiguration
}
private GrpcReplicationService createReplicationService() {
return new GrpcReplicationService(
new OnDemandContainerReplicationSource(containerSet));
}
/**
* Build's container map.

View File

@ -0,0 +1,40 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.replication;
import java.io.Closeable;
import java.nio.file.Path;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
/**
* Service to download container data from other datanodes.
* <p>
* The implementation of this interface should copy the raw container data in
* compressed form to working directory.
* <p>
* A smart implementation would use multiple sources to do parallel download.
*/
public interface ContainerDownloader extends Closeable {
CompletableFuture<Path> getContainerDataFromReplicas(long containerId,
List<DatanodeDetails> sources);
}

View File

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.replication;
import java.io.IOException;
import java.io.OutputStream;
/**
* Contract to prepare provide the container in binary form..
* <p>
* Prepare will be called when container is closed. An implementation could
* precache any binary representation of a container and store the pre packede
* images.
*/
public interface ContainerReplicationSource {
/**
* Prepare for the replication.
*
* @param containerId The name of the container the package.
*/
void prepare(long containerId);
/**
* Copy the container data to an output stream.
*
* @param containerId Container to replicate
* @param destination The destination stream to copy all the container data.
* @throws IOException
*/
void copyData(long containerId, OutputStream destination)
throws IOException;
}

View File

@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.replication;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.StreamingOutput;
import java.io.IOException;
import java.io.OutputStream;
/**
* JAX-RS streaming output to return the binary container data.
*/
public class ContainerStreamingOutput implements StreamingOutput {
private long containerId;
private ContainerReplicationSource containerReplicationSource;
public ContainerStreamingOutput(long containerId,
ContainerReplicationSource containerReplicationSource) {
this.containerId = containerId;
this.containerReplicationSource = containerReplicationSource;
}
@Override
public void write(OutputStream outputStream)
throws IOException, WebApplicationException {
containerReplicationSource.copyData(containerId, outputStream);
}
}

View File

@ -0,0 +1,169 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.replication;
import java.io.BufferedOutputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.CopyContainerRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.CopyContainerResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto
.IntraDatanodeProtocolServiceGrpc;
import org.apache.hadoop.hdds.protocol.datanode.proto
.IntraDatanodeProtocolServiceGrpc.IntraDatanodeProtocolServiceStub;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import com.google.common.base.Preconditions;
import org.apache.ratis.shaded.io.grpc.ManagedChannel;
import org.apache.ratis.shaded.io.grpc.netty.NettyChannelBuilder;
import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Client to read container data from Grpc.
*/
public class GrpcReplicationClient {
private static final Logger LOG =
LoggerFactory.getLogger(GrpcReplicationClient.class);
private final ManagedChannel channel;
private final IntraDatanodeProtocolServiceStub client;
private final Path workingDirectory;
public GrpcReplicationClient(String host,
int port, Path workingDir) {
channel = NettyChannelBuilder.forAddress(host, port)
.usePlaintext()
.maxInboundMessageSize(OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE)
.build();
client = IntraDatanodeProtocolServiceGrpc.newStub(channel);
this.workingDirectory = workingDir;
}
public CompletableFuture<Path> download(long containerId) {
CopyContainerRequestProto request =
CopyContainerRequestProto.newBuilder()
.setContainerID(containerId)
.setLen(-1)
.setReadOffset(0)
.build();
CompletableFuture<Path> response = new CompletableFuture<>();
Path destinationPath =
getWorkingDirectory().resolve("container-" + containerId + ".tar.gz");
client.download(request,
new StreamDownloader(containerId, response, destinationPath));
return response;
}
private Path getWorkingDirectory() {
return workingDirectory;
}
public void shutdown() {
channel.shutdown();
}
/**
* Grpc stream observer to ComletableFuture adapter.
*/
public static class StreamDownloader
implements StreamObserver<CopyContainerResponseProto> {
private final CompletableFuture<Path> response;
private final long containerId;
private BufferedOutputStream stream;
private Path outputPath;
public StreamDownloader(long containerId, CompletableFuture<Path> response,
Path outputPath) {
this.response = response;
this.containerId = containerId;
this.outputPath = outputPath;
try {
outputPath = Preconditions.checkNotNull(outputPath);
Path parentPath = Preconditions.checkNotNull(outputPath.getParent());
Files.createDirectories(parentPath);
stream =
new BufferedOutputStream(new FileOutputStream(outputPath.toFile()));
} catch (IOException e) {
throw new RuntimeException("OutputPath can't be used: " + outputPath,
e);
}
}
@Override
public void onNext(CopyContainerResponseProto chunk) {
try {
stream.write(chunk.getData().toByteArray());
} catch (IOException e) {
response.completeExceptionally(e);
}
}
@Override
public void onError(Throwable throwable) {
try {
stream.close();
LOG.error("Container download was unsuccessfull", throwable);
try {
Files.delete(outputPath);
} catch (IOException ex) {
LOG.error(
"Error happened during the download but can't delete the "
+ "temporary destination.", ex);
}
response.completeExceptionally(throwable);
} catch (IOException e) {
response.completeExceptionally(e);
}
}
@Override
public void onCompleted() {
try {
stream.close();
response.complete(outputPath);
LOG.info("Container is downloaded to {}", outputPath);
} catch (IOException e) {
response.completeExceptionally(e);
}
}
}
}

View File

@ -0,0 +1,130 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.replication;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStream;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.CopyContainerRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.CopyContainerResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto
.IntraDatanodeProtocolServiceGrpc;
import org.apache.ratis.shaded.com.google.protobuf.ByteString;
import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Service to make containers available for replication.
*/
public class GrpcReplicationService extends
IntraDatanodeProtocolServiceGrpc.IntraDatanodeProtocolServiceImplBase {
private static final Logger LOG =
LoggerFactory.getLogger(GrpcReplicationService.class);
private final ContainerReplicationSource containerReplicationSource;
public GrpcReplicationService(
ContainerReplicationSource containerReplicationSource) {
this.containerReplicationSource = containerReplicationSource;
}
@Override
public void download(CopyContainerRequestProto request,
StreamObserver<CopyContainerResponseProto> responseObserver) {
LOG.info("Streaming container data ({}) to other datanode",
request.getContainerID());
try {
GrpcOutputStream outputStream =
new GrpcOutputStream(responseObserver, request.getContainerID());
containerReplicationSource
.copyData(request.getContainerID(), outputStream);
} catch (IOException e) {
LOG.error("Can't stream the container data", e);
responseObserver.onError(e);
}
}
private static class GrpcOutputStream extends OutputStream
implements Closeable {
private static final int BUFFER_SIZE_IN_BYTES = 1024 * 1024;
private final StreamObserver<CopyContainerResponseProto> responseObserver;
private final ByteArrayOutputStream buffer = new ByteArrayOutputStream();
private long containerId;
private int readOffset = 0;
private int writtenBytes;
GrpcOutputStream(
StreamObserver<CopyContainerResponseProto> responseObserver,
long containerId) {
this.responseObserver = responseObserver;
this.containerId = containerId;
}
@Override
public void write(int b) throws IOException {
try {
buffer.write(b);
if (buffer.size() > BUFFER_SIZE_IN_BYTES) {
flushBuffer(false);
}
} catch (Exception ex) {
responseObserver.onError(ex);
}
}
private void flushBuffer(boolean eof) {
if (buffer.size() > 0) {
CopyContainerResponseProto response =
CopyContainerResponseProto.newBuilder()
.setContainerID(containerId)
.setData(ByteString.copyFrom(buffer.toByteArray()))
.setEof(eof)
.setReadOffset(readOffset)
.setLen(buffer.size())
.build();
responseObserver.onNext(response);
readOffset += buffer.size();
writtenBytes += buffer.size();
buffer.reset();
}
}
@Override
public void close() throws IOException {
flushBuffer(true);
LOG.info("{} bytes written to the rpc stream from container {}",
writtenBytes, containerId);
responseObserver.onCompleted();
}
}
}

View File

@ -0,0 +1,76 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.replication;
import java.io.IOException;
import java.io.OutputStream;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerPacker;
import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A naive implementation of the replication source which creates a tar file
* on-demand without pre-create the compressed archives.
*/
public class OnDemandContainerReplicationSource
implements ContainerReplicationSource {
private static final Logger LOG =
LoggerFactory.getLogger(ContainerReplicationSource.class);
private ContainerSet containerSet;
private ContainerPacker packer = new TarContainerPacker();
public OnDemandContainerReplicationSource(
ContainerSet containerSet) {
this.containerSet = containerSet;
}
@Override
public void prepare(long containerId) {
}
@Override
public void copyData(long containerId, OutputStream destination)
throws IOException {
Container container = containerSet.getContainer(containerId);
Preconditions
.checkNotNull(container, "Container is not found " + containerId);
switch (container.getContainerType()) {
case KeyValueContainer:
packer.pack(container,
destination);
break;
default:
LOG.warn("Container type " + container.getContainerType()
+ " is not replicable as no compression algorithm for that.");
}
}
}

View File

@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.replication;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Simple ContainerDownloaderImplementation to download the missing container
* from the first available datanode.
* <p>
* This is not the most effective implementation as it uses only one source
* for he container download.
*/
public class SimpleContainerDownloader implements ContainerDownloader {
private static final Logger LOG =
LoggerFactory.getLogger(SimpleContainerDownloader.class);
private final Path workingDirectory;
private ExecutorService executor;
public SimpleContainerDownloader(Configuration conf) {
String workDirString =
conf.get(OzoneConfigKeys.OZONE_CONTAINER_COPY_WORKDIR);
if (workDirString == null) {
workingDirectory = Paths.get(System.getProperty("java.io.tmpdir"))
.resolve("container-copy");
} else {
workingDirectory = Paths.get(workDirString);
}
ThreadFactory build = new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("Container downloader thread - %d").build();
executor = Executors.newSingleThreadExecutor(build);
LOG.info("Starting container downloader service to copy "
+ "containers to replicate.");
}
@Override
public CompletableFuture<Path> getContainerDataFromReplicas(long containerId,
List<DatanodeDetails> sourceDatanodes) {
CompletableFuture<Path> result = null;
for (DatanodeDetails datanode : sourceDatanodes) {
try {
if (result == null) {
GrpcReplicationClient grpcReplicationClient =
new GrpcReplicationClient(datanode.getIpAddress(),
datanode.getPort(Name.STANDALONE).getValue(),
workingDirectory);
result = grpcReplicationClient.download(containerId);
} else {
result = result.thenApply(CompletableFuture::completedFuture)
.exceptionally(t -> {
LOG.error("Error on replicating container: " + containerId, t);
GrpcReplicationClient grpcReplicationClient =
new GrpcReplicationClient(datanode.getIpAddress(),
datanode.getPort(Name.STANDALONE).getValue(),
workingDirectory);
return grpcReplicationClient.download(containerId);
}).thenCompose(Function.identity());
}
} catch (Exception ex) {
LOG.error(String.format(
"Container %s download from datanode %s was unsuccessful. "
+ "Trying the next datanode", containerId, datanode), ex);
}
}
return result;
}
@Override
public void close() throws IOException {
try {
executor.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
LOG.error("Can't stop container downloader gracefully", e);
}
}
}

View File

@ -0,0 +1,21 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.replication;
/**
Classes to replicate container data between datanodes.
**/

View File

@ -0,0 +1,146 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.replication.ContainerDownloader;
import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.TestGenericTestUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
/**
* Test replication command handler.
*/
public class TestReplicateContainerCommandHandler {
private static final String EXCEPTION_MESSAGE = "Oh my god";
private ReplicateContainerCommandHandler handler;
private StubDownloader downloader;
private ReplicateContainerCommand command;
private List<Long> importedContainerIds;
@Before
public void init() {
importedContainerIds = new ArrayList<>();
OzoneConfiguration conf = new OzoneConfiguration();
ContainerSet containerSet = Mockito.mock(ContainerSet.class);
ContainerDispatcher containerDispatcher =
Mockito.mock(ContainerDispatcher.class);
downloader = new StubDownloader();
handler = new ReplicateContainerCommandHandler(conf, containerSet,
containerDispatcher, downloader) {
@Override
protected void importContainer(long containerID, Path tarFilePath) {
importedContainerIds.add(containerID);
}
};
//the command
ArrayList<DatanodeDetails> datanodeDetails = new ArrayList<>();
datanodeDetails.add(Mockito.mock(DatanodeDetails.class));
datanodeDetails.add(Mockito.mock(DatanodeDetails.class));
command = new ReplicateContainerCommand(1L, datanodeDetails);
}
@Test
public void handle() throws TimeoutException, InterruptedException {
//GIVEN
//WHEN
handler.handle(command, null, Mockito.mock(StateContext.class), null);
TestGenericTestUtils
.waitFor(() -> downloader.futureByContainers.size() == 1, 100, 2000);
Assert.assertNotNull(downloader.futureByContainers.get(1L));
downloader.futureByContainers.get(1L).complete(Paths.get("/tmp/test"));
TestGenericTestUtils
.waitFor(() -> importedContainerIds.size() == 1, 100, 2000);
}
@Test
public void handleWithErrors() throws TimeoutException, InterruptedException {
//GIVEN
GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
.captureLogs(ReplicateContainerCommandHandler.LOG);
//WHEN
handler.handle(command, null, Mockito.mock(StateContext.class), null);
//THEN
TestGenericTestUtils
.waitFor(() -> downloader.futureByContainers.size() == 1, 100, 2000);
Assert.assertNotNull(downloader.futureByContainers.get(1L));
downloader.futureByContainers.get(1L)
.completeExceptionally(new IllegalArgumentException(
EXCEPTION_MESSAGE));
TestGenericTestUtils
.waitFor(() -> {
String output = logCapturer.getOutput();
return output.contains("unsuccessful") && output
.contains(EXCEPTION_MESSAGE); },
100,
2000);
}
private static class StubDownloader implements ContainerDownloader {
private Map<Long, CompletableFuture<Path>> futureByContainers =
new HashMap<>();
@Override
public void close() {
}
@Override
public CompletableFuture<Path> getContainerDataFromReplicas(
long containerId, List<DatanodeDetails> sources) {
CompletableFuture<Path> future = new CompletableFuture<>();
futureByContainers.put(containerId, future);
return future;
}
}
}

View File

@ -0,0 +1,22 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Tests for command handlers.
*/
package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;

View File

@ -0,0 +1,175 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerType;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.DatanodeBlockID;
import org.apache.hadoop.hdds.scm.XceiverClientGrpc;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.ozone.HddsDatanodeService;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.container.common.helpers.KeyData;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.container.ozoneimpl.TestOzoneContainer;
import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
import static org.apache.hadoop.ozone.container.ozoneimpl.TestOzoneContainer
.writeChunkForContainer;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
/**
* Tests ozone containers replication.
*/
public class TestContainerReplication {
/**
* Set the timeout for every test.
*/
@Rule
public Timeout testTimeout = new Timeout(300000);
@Test
public void testContainerReplication() throws Exception {
//GIVEN
OzoneConfiguration conf = newOzoneConfiguration();
long containerId = 1L;
conf.setSocketAddr("hdls.datanode.http-address",
new InetSocketAddress("0.0.0.0", 0));
MiniOzoneCluster cluster =
MiniOzoneCluster.newBuilder(conf).setNumDatanodes(2)
.setRandomContainerPort(true).build();
cluster.waitForClusterToBeReady();
HddsDatanodeService firstDatanode = cluster.getHddsDatanodes().get(0);
//copy from the first datanode
List<DatanodeDetails> sourceDatanodes = new ArrayList<>();
sourceDatanodes.add(firstDatanode.getDatanodeDetails());
Pipeline sourcePipelines =
ContainerTestHelper.createPipeline(sourceDatanodes);
//create a new client
XceiverClientSpi client = new XceiverClientGrpc(sourcePipelines, conf);
client.connect();
//New container for testing
TestOzoneContainer.createContainerForTesting(client, containerId);
ContainerCommandRequestProto requestProto =
writeChunkForContainer(client, containerId, 1024);
DatanodeBlockID blockID = requestProto.getWriteChunk().getBlockID();
// Put Key to the test container
ContainerCommandRequestProto putKeyRequest = ContainerTestHelper
.getPutKeyRequest(sourcePipelines, requestProto.getWriteChunk());
ContainerProtos.KeyData keyData = putKeyRequest.getPutKey().getKeyData();
ContainerCommandResponseProto response = client.sendCommand(putKeyRequest);
Assert.assertNotNull(response);
Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
Assert.assertTrue(putKeyRequest.getTraceID().equals(response.getTraceID()));
HddsDatanodeService destinationDatanode =
chooseDatanodeWithoutContainer(sourcePipelines,
cluster.getHddsDatanodes());
//WHEN: send the order to replicate the container
cluster.getStorageContainerManager().getScmNodeManager()
.addDatanodeCommand(destinationDatanode.getDatanodeDetails().getUuid(),
new ReplicateContainerCommand(containerId,
sourcePipelines.getMachines()));
Thread.sleep(3000);
OzoneContainer ozoneContainer =
destinationDatanode.getDatanodeStateMachine().getContainer();
Container container =
ozoneContainer
.getContainerSet().getContainer(containerId);
Assert.assertNotNull(
"Container is not replicated to the destination datanode",
container);
Assert.assertNotNull(
"ContainerData of the replicated container is null",
container.getContainerData());
long keyCount = ((KeyValueContainerData) container.getContainerData())
.getKeyCount();
KeyValueHandler handler = (KeyValueHandler) ozoneContainer.getDispatcher()
.getHandler(ContainerType.KeyValueContainer);
KeyData key = handler.getKeyManager()
.getKey(container, BlockID.getFromProtobuf(blockID));
Assert.assertNotNull(key);
Assert.assertEquals(1, key.getChunks().size());
Assert.assertEquals(requestProto.getWriteChunk().getChunkData(),
key.getChunks().get(0));
}
private HddsDatanodeService chooseDatanodeWithoutContainer(Pipeline pipeline,
List<HddsDatanodeService> dataNodes) {
for (HddsDatanodeService datanode : dataNodes) {
if (!pipeline.getMachines().contains(datanode.getDatanodeDetails())) {
return datanode;
}
}
throw new AssertionError("No datanode outside of the pipeline");
}
static OzoneConfiguration newOzoneConfiguration() {
final OzoneConfiguration conf = new OzoneConfiguration();
return conf;
}
}

View File

@ -1,70 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.client.rest.OzoneException;
import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
import org.apache.hadoop.test.GenericTestUtils;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys
.OZONE_SCM_CONTAINER_SIZE;
import org.junit.Test;
/**
* Tests the behavior of the datanode, when replicate container command is
* received.
*/
public class TestReplicateContainerHandler {
@Test
public void test() throws IOException, TimeoutException, InterruptedException,
OzoneException {
GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
.captureLogs(ReplicateContainerCommandHandler.LOG);
OzoneConfiguration conf = new OzoneConfiguration();
conf.set(OZONE_SCM_CONTAINER_SIZE, "1GB");
MiniOzoneCluster cluster =
MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1).build();
cluster.waitForClusterToBeReady();
DatanodeDetails datanodeDetails =
cluster.getHddsDatanodes().get(0).getDatanodeDetails();
//send the order to replicate the container
cluster.getStorageContainerManager().getScmNodeManager()
.addDatanodeCommand(datanodeDetails.getUuid(),
new ReplicateContainerCommand(1L,
new ArrayList<>()));
//TODO: here we test only the serialization/unserialization as
// the implementation is not yet done
GenericTestUtils
.waitFor(() -> logCapturer.getOutput().contains("not yet handled"), 500,
5 * 1000);
}
}

View File

@ -513,7 +513,7 @@ private static XceiverClientGrpc createClientForTesting(
return new XceiverClientGrpc(pipeline, conf);
}
private static void createContainerForTesting(XceiverClientSpi client,
public static void createContainerForTesting(XceiverClientSpi client,
long containerID) throws Exception {
// Create container
ContainerProtos.ContainerCommandRequestProto request =
@ -525,7 +525,7 @@ private static void createContainerForTesting(XceiverClientSpi client,
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
}
private static ContainerProtos.ContainerCommandRequestProto
public static ContainerProtos.ContainerCommandRequestProto
writeChunkForContainer(XceiverClientSpi client,
long containerID, int dataLen) throws Exception {
// Write Chunk