HDDS-253. SCMBlockDeletingService should publish events for delete blocks to EventQueue. Contributed by Lokesh Jain.
This commit is contained in:
parent
3f3f72221f
commit
1fe5b93843
@ -28,6 +28,7 @@
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
|
||||
import org.apache.hadoop.hdds.server.events.EventPublisher;
|
||||
import org.apache.hadoop.metrics2.util.MBeans;
|
||||
import org.apache.hadoop.ozone.OzoneConsts;
|
||||
import org.apache.hadoop.hdds.client.BlockID;
|
||||
@ -87,10 +88,12 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
|
||||
* @param conf - configuration.
|
||||
* @param nodeManager - node manager.
|
||||
* @param containerManager - container manager.
|
||||
* @param eventPublisher - event publisher.
|
||||
* @throws IOException
|
||||
*/
|
||||
public BlockManagerImpl(final Configuration conf,
|
||||
final NodeManager nodeManager, final Mapping containerManager)
|
||||
final NodeManager nodeManager, final Mapping containerManager,
|
||||
EventPublisher eventPublisher)
|
||||
throws IOException {
|
||||
this.nodeManager = nodeManager;
|
||||
this.containerManager = containerManager;
|
||||
@ -120,9 +123,8 @@ public BlockManagerImpl(final Configuration conf,
|
||||
OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT,
|
||||
TimeUnit.MILLISECONDS);
|
||||
blockDeletingService =
|
||||
new SCMBlockDeletingService(
|
||||
deletedBlockLog, containerManager, nodeManager, svcInterval,
|
||||
serviceTimeout, conf);
|
||||
new SCMBlockDeletingService(deletedBlockLog, containerManager,
|
||||
nodeManager, eventPublisher, svcInterval, serviceTimeout, conf);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -20,11 +20,14 @@
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdds.scm.container.Mapping;
|
||||
import org.apache.hadoop.hdds.scm.events.SCMEvents;
|
||||
import org.apache.hadoop.hdds.scm.node.NodeManager;
|
||||
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
|
||||
import org.apache.hadoop.hdds.server.events.EventPublisher;
|
||||
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
|
||||
import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.apache.hadoop.utils.BackgroundService;
|
||||
@ -61,6 +64,7 @@ public class SCMBlockDeletingService extends BackgroundService {
|
||||
private final DeletedBlockLog deletedBlockLog;
|
||||
private final Mapping mappingService;
|
||||
private final NodeManager nodeManager;
|
||||
private final EventPublisher eventPublisher;
|
||||
|
||||
// Block delete limit size is dynamically calculated based on container
|
||||
// delete limit size (ozone.block.deleting.container.limit.per.interval)
|
||||
@ -76,13 +80,14 @@ public class SCMBlockDeletingService extends BackgroundService {
|
||||
private int blockDeleteLimitSize;
|
||||
|
||||
public SCMBlockDeletingService(DeletedBlockLog deletedBlockLog,
|
||||
Mapping mapper, NodeManager nodeManager,
|
||||
long interval, long serviceTimeout, Configuration conf) {
|
||||
Mapping mapper, NodeManager nodeManager, EventPublisher eventPublisher,
|
||||
long interval, long serviceTimeout, Configuration conf) {
|
||||
super("SCMBlockDeletingService", interval, TimeUnit.MILLISECONDS,
|
||||
BLOCK_DELETING_SERVICE_CORE_POOL_SIZE, serviceTimeout);
|
||||
this.deletedBlockLog = deletedBlockLog;
|
||||
this.mappingService = mapper;
|
||||
this.nodeManager = nodeManager;
|
||||
this.eventPublisher = eventPublisher;
|
||||
|
||||
int containerLimit = conf.getInt(
|
||||
OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL,
|
||||
@ -145,8 +150,8 @@ public EmptyTaskResult call() throws Exception {
|
||||
// We should stop caching new commands if num of un-processed
|
||||
// command is bigger than a limit, e.g 50. In case datanode goes
|
||||
// offline for sometime, the cached commands be flooded.
|
||||
nodeManager.addDatanodeCommand(dnId,
|
||||
new DeleteBlocksCommand(dnTXs));
|
||||
eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND,
|
||||
new CommandForDatanode<>(dnId, new DeleteBlocksCommand(dnTXs)));
|
||||
LOG.debug(
|
||||
"Added delete block command for datanode {} in the queue,"
|
||||
+ " number of delete block transactions: {}, TxID list: {}",
|
||||
|
@ -181,7 +181,7 @@ private StorageContainerManager(OzoneConfiguration conf) throws IOException {
|
||||
scmContainerManager = new ContainerMapping(
|
||||
conf, getScmNodeManager(), cacheSize);
|
||||
scmBlockManager = new BlockManagerImpl(
|
||||
conf, getScmNodeManager(), scmContainerManager);
|
||||
conf, getScmNodeManager(), scmContainerManager, eventQueue);
|
||||
|
||||
Node2ContainerMap node2ContainerMap = new Node2ContainerMap();
|
||||
|
||||
|
@ -74,7 +74,7 @@ public static void setUp() throws Exception {
|
||||
}
|
||||
nodeManager = new MockNodeManager(true, 10);
|
||||
mapping = new ContainerMapping(conf, nodeManager, 128);
|
||||
blockManager = new BlockManagerImpl(conf, nodeManager, mapping);
|
||||
blockManager = new BlockManagerImpl(conf, nodeManager, mapping, null);
|
||||
if(conf.getBoolean(ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY,
|
||||
ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT)){
|
||||
factor = HddsProtos.ReplicationFactor.THREE;
|
||||
|
@ -17,7 +17,6 @@
|
||||
*/
|
||||
package org.apache.hadoop.ozone.scm;
|
||||
|
||||
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
||||
import org.apache.hadoop.hdds.scm.node.NodeManager;
|
||||
import org.apache.hadoop.ozone.MiniOzoneCluster;
|
||||
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
||||
@ -117,7 +116,7 @@ public void setup() throws Exception {
|
||||
|
||||
nodeManager = cluster.getStorageContainerManager().getScmNodeManager();
|
||||
mapping = new ContainerMapping(conf, nodeManager, 128);
|
||||
blockManager = new BlockManagerImpl(conf, nodeManager, mapping);
|
||||
blockManager = new BlockManagerImpl(conf, nodeManager, mapping, null);
|
||||
|
||||
// blockManager.allocateBlock() will create containers if there is none
|
||||
// stored in levelDB. The number of containers to create is the value of
|
||||
|
Loading…
Reference in New Issue
Block a user