HDDS-1898. GrpcReplicationService#download cannot replicate the container. (#1326)
This commit is contained in:
parent
172bcd8e01
commit
2b16d5377c
@ -19,8 +19,9 @@
|
||||
package org.apache.hadoop.ozone.container.common.interfaces;
|
||||
|
||||
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
||||
@ -109,14 +110,23 @@ public abstract ContainerCommandResponseProto handle(
|
||||
DispatcherContext dispatcherContext);
|
||||
|
||||
/**
|
||||
* Import container data from a raw input stream.
|
||||
* Imports container from a raw input stream.
|
||||
*/
|
||||
public abstract Container importContainer(
|
||||
long containerID,
|
||||
long maxSize,
|
||||
String originPipelineId,
|
||||
String originNodeId,
|
||||
FileInputStream rawContainerStream,
|
||||
InputStream rawContainerStream,
|
||||
TarContainerPacker packer)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Exports container to the output stream.
|
||||
*/
|
||||
public abstract void exportContainer(
|
||||
Container container,
|
||||
OutputStream outputStream,
|
||||
TarContainerPacker packer)
|
||||
throws IOException;
|
||||
|
||||
|
@ -127,7 +127,12 @@ public void handle(SCMCommand command, OzoneContainer container,
|
||||
case KeyValueContainer:
|
||||
KeyValueContainerData containerData = (KeyValueContainerData)
|
||||
cont.getContainerData();
|
||||
deleteKeyValueContainerBlocks(containerData, entry);
|
||||
cont.writeLock();
|
||||
try {
|
||||
deleteKeyValueContainerBlocks(containerData, entry);
|
||||
} finally {
|
||||
cont.writeUnlock();
|
||||
}
|
||||
txResultBuilder.setContainerID(containerId)
|
||||
.setSuccess(true);
|
||||
break;
|
||||
|
@ -330,6 +330,9 @@ public void close() throws StorageContainerException {
|
||||
} finally {
|
||||
writeUnlock();
|
||||
}
|
||||
LOG.info("Container {} is closed with bcsId {}.",
|
||||
containerData.getContainerID(),
|
||||
containerData.getBlockCommitSequenceId());
|
||||
}
|
||||
|
||||
/**
|
||||
@ -361,13 +364,10 @@ private void updateContainerData(Runnable update)
|
||||
}
|
||||
}
|
||||
|
||||
void compactDB() throws StorageContainerException {
|
||||
private void compactDB() throws StorageContainerException {
|
||||
try {
|
||||
try(ReferenceCountedDB db = BlockUtils.getDB(containerData, config)) {
|
||||
db.getStore().compactDB();
|
||||
LOG.info("Container {} is closed with bcsId {}.",
|
||||
containerData.getContainerID(),
|
||||
containerData.getBlockCommitSequenceId());
|
||||
}
|
||||
} catch (StorageContainerException ex) {
|
||||
throw ex;
|
||||
@ -524,6 +524,7 @@ public void exportContainerData(OutputStream destination,
|
||||
"Only closed containers could be exported: ContainerId="
|
||||
+ getContainerData().getContainerID());
|
||||
}
|
||||
compactDB();
|
||||
packer.pack(this, destination);
|
||||
}
|
||||
|
||||
|
@ -18,8 +18,9 @@
|
||||
|
||||
package org.apache.hadoop.ozone.container.keyvalue;
|
||||
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
@ -841,13 +842,14 @@ private void checkContainerOpen(KeyValueContainer kvContainer)
|
||||
throw new StorageContainerException(msg, result);
|
||||
}
|
||||
|
||||
public Container importContainer(long containerID, long maxSize,
|
||||
String originPipelineId,
|
||||
String originNodeId,
|
||||
FileInputStream rawContainerStream,
|
||||
TarContainerPacker packer)
|
||||
@Override
|
||||
public Container importContainer(final long containerID,
|
||||
final long maxSize, final String originPipelineId,
|
||||
final String originNodeId, final InputStream rawContainerStream,
|
||||
final TarContainerPacker packer)
|
||||
throws IOException {
|
||||
|
||||
// TODO: Add layout version!
|
||||
KeyValueContainerData containerData =
|
||||
new KeyValueContainerData(containerID,
|
||||
maxSize, originPipelineId, originNodeId);
|
||||
@ -862,6 +864,20 @@ public Container importContainer(long containerID, long maxSize,
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void exportContainer(final Container container,
|
||||
final OutputStream outputStream,
|
||||
final TarContainerPacker packer)
|
||||
throws IOException{
|
||||
container.readLock();
|
||||
try {
|
||||
final KeyValueContainer kvc = (KeyValueContainer) container;
|
||||
kvc.exportContainerData(outputStream, packer);
|
||||
} finally {
|
||||
container.readUnlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void markContainerForClose(Container container)
|
||||
throws IOException {
|
||||
|
@ -23,6 +23,7 @@
|
||||
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
|
||||
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
|
||||
import org.apache.hadoop.ozone.container.common.impl.TopNOrderedContainerDeletionChoosingPolicy;
|
||||
import org.apache.hadoop.ozone.container.common.interfaces.Container;
|
||||
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDeletionChoosingPolicy;
|
||||
import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis;
|
||||
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
|
||||
@ -247,6 +248,9 @@ private class BlockDeletingTask
|
||||
@Override
|
||||
public BackgroundTaskResult call() throws Exception {
|
||||
ContainerBackgroundTaskResult crr = new ContainerBackgroundTaskResult();
|
||||
final Container container = ozoneContainer.getContainerSet()
|
||||
.getContainer(containerData.getContainerID());
|
||||
container.writeLock();
|
||||
long startTime = Time.monotonicNow();
|
||||
// Scan container's db and get list of under deletion blocks
|
||||
try (ReferenceCountedDB meta = BlockUtils.getDB(containerData, conf)) {
|
||||
@ -313,6 +317,8 @@ public BackgroundTaskResult call() throws Exception {
|
||||
}
|
||||
crr.addAll(succeedBlocks);
|
||||
return crr;
|
||||
} finally {
|
||||
container.writeUnlock();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -29,8 +29,9 @@
|
||||
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
|
||||
import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
|
||||
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
|
||||
@ -120,13 +121,20 @@ public void closeContainer(final long containerId) throws IOException {
|
||||
|
||||
public Container importContainer(final ContainerType type,
|
||||
final long containerId, final long maxSize, final String originPipelineId,
|
||||
final String originNodeId, final FileInputStream rawContainerStream,
|
||||
final String originNodeId, final InputStream rawContainerStream,
|
||||
final TarContainerPacker packer)
|
||||
throws IOException {
|
||||
return handlers.get(type).importContainer(containerId, maxSize,
|
||||
originPipelineId, originNodeId, rawContainerStream, packer);
|
||||
}
|
||||
|
||||
public void exportContainer(final ContainerType type,
|
||||
final long containerId, final OutputStream outputStream,
|
||||
final TarContainerPacker packer) throws IOException {
|
||||
handlers.get(type).exportContainer(
|
||||
containerSet.getContainer(containerId), outputStream, packer);
|
||||
}
|
||||
|
||||
/**
|
||||
* Deletes a container given its Id.
|
||||
* @param containerId Id of the container to be deleted
|
||||
|
@ -21,7 +21,6 @@
|
||||
import java.io.OutputStream;
|
||||
|
||||
import org.apache.hadoop.ozone.container.common.interfaces.Container;
|
||||
import org.apache.hadoop.ozone.container.common.interfaces.ContainerPacker;
|
||||
import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
@ -41,7 +40,7 @@ public class OnDemandContainerReplicationSource
|
||||
|
||||
private ContainerController controller;
|
||||
|
||||
private ContainerPacker packer = new TarContainerPacker();
|
||||
private TarContainerPacker packer = new TarContainerPacker();
|
||||
|
||||
public OnDemandContainerReplicationSource(
|
||||
ContainerController controller) {
|
||||
@ -59,18 +58,11 @@ public void copyData(long containerId, OutputStream destination)
|
||||
|
||||
Container container = controller.getContainer(containerId);
|
||||
|
||||
Preconditions
|
||||
.checkNotNull(container, "Container is not found " + containerId);
|
||||
Preconditions.checkNotNull(
|
||||
container, "Container is not found " + containerId);
|
||||
|
||||
switch (container.getContainerType()) {
|
||||
case KeyValueContainer:
|
||||
packer.pack(container,
|
||||
destination);
|
||||
break;
|
||||
default:
|
||||
LOG.warn("Container type " + container.getContainerType()
|
||||
+ " is not replicable as no compression algorithm for that.");
|
||||
}
|
||||
controller.exportContainer(
|
||||
container.getContainerType(), containerId, destination, packer);
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -46,6 +46,7 @@
|
||||
import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
|
||||
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
|
||||
import org.apache.hadoop.ozone.container.ozoneimpl.TestOzoneContainer;
|
||||
import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
|
||||
import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
|
||||
@ -119,6 +120,13 @@ public void testContainerReplication() throws Exception {
|
||||
chooseDatanodeWithoutContainer(sourcePipelines,
|
||||
cluster.getHddsDatanodes());
|
||||
|
||||
// Close the container
|
||||
cluster.getStorageContainerManager().getScmNodeManager()
|
||||
.addDatanodeCommand(
|
||||
sourceDatanodes.get(0).getUuid(),
|
||||
new CloseContainerCommand(containerId,
|
||||
sourcePipelines.getId(), true));
|
||||
|
||||
//WHEN: send the order to replicate the container
|
||||
cluster.getStorageContainerManager().getScmNodeManager()
|
||||
.addDatanodeCommand(destinationDatanode.getDatanodeDetails().getUuid(),
|
||||
|
Loading…
Reference in New Issue
Block a user