HDFS-13116. Ozone: Refactor Pipeline to have transport and container specific information. Contributed by Mukul Kumar Singh.

This commit is contained in:
Anu Engineer 2018-02-09 17:17:11 -08:00 committed by Owen O'Malley
parent 3cf07b43bc
commit ee5495456e
14 changed files with 419 additions and 372 deletions

View File

@ -27,17 +27,15 @@
import com.fasterxml.jackson.databind.ser.FilterProvider;
import com.fasterxml.jackson.databind.ser.impl.SimpleBeanPropertyFilter;
import com.fasterxml.jackson.databind.ser.impl.SimpleFilterProvider;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
/**
* A pipeline represents the group of machines over which a container lives.
@ -48,7 +46,7 @@ public class Pipeline {
static {
ObjectMapper mapper = new ObjectMapper();
String[] ignorableFieldNames = {"data", "leaderID", "datanodes"};
String[] ignorableFieldNames = {"data"};
FilterProvider filters = new SimpleFilterProvider()
.addFilter(PIPELINE_INFO, SimpleBeanPropertyFilter
.serializeAllExcept(ignorableFieldNames));
@ -60,25 +58,22 @@ public class Pipeline {
}
private String containerName;
private String leaderID;
private Map<String, DatanodeID> datanodes;
private OzoneProtos.LifeCycleState lifeCycleState;
private OzoneProtos.ReplicationType type;
private OzoneProtos.ReplicationFactor factor;
private String pipelineName;
private PipelineChannel pipelineChannel;
/**
* Allows you to maintain private data on pipelines. This is not serialized
* via protobuf, just allows us to maintain some private data.
*/
@JsonIgnore
private byte[] data;
/**
* Constructs a new pipeline data structure.
*
* @param leaderID - First machine in this pipeline.
* @param containerName - Container
* @param pipelineChannel - transport information for this container
*/
public Pipeline(String leaderID) {
this.leaderID = leaderID;
datanodes = new TreeMap<>();
public Pipeline(String containerName, PipelineChannel pipelineChannel) {
this.containerName = containerName;
this.pipelineChannel = pipelineChannel;
data = null;
}
@ -90,36 +85,13 @@ public Pipeline(String leaderID) {
*/
public static Pipeline getFromProtoBuf(OzoneProtos.Pipeline pipeline) {
Preconditions.checkNotNull(pipeline);
Pipeline newPipeline = new Pipeline(pipeline.getLeaderID());
for (HdfsProtos.DatanodeIDProto dataID : pipeline.getMembersList()) {
newPipeline.addMember(DatanodeID.getFromProtoBuf(dataID));
}
newPipeline.setContainerName(pipeline.getContainerName());
newPipeline.setLifeCycleState(pipeline.getState());
newPipeline.setType(pipeline.getType());
newPipeline.setFactor(pipeline.getFactor());
if (pipeline.hasPipelineName()) {
newPipeline.setPipelineName(pipeline.getPipelineName());
}
return newPipeline;
PipelineChannel pipelineChannel =
PipelineChannel.getFromProtoBuf(pipeline.getPipelineChannel());
return new Pipeline(pipeline.getContainerName(), pipelineChannel);
}
public OzoneProtos.ReplicationFactor getFactor() {
return factor;
}
public void setFactor(OzoneProtos.ReplicationFactor factor) {
this.factor = factor;
}
/**
* Adds a member to the pipeline.
*
* @param dataNodeId - Datanode to be added.
*/
public void addMember(DatanodeID dataNodeId) {
datanodes.put(dataNodeId.getDatanodeUuid(), dataNodeId);
return pipelineChannel.getFactor();
}
/**
@ -129,7 +101,7 @@ public void addMember(DatanodeID dataNodeId) {
*/
@JsonIgnore
public DatanodeID getLeader() {
return datanodes.get(leaderID);
return pipelineChannel.getDatanodes().get(pipelineChannel.getLeaderID());
}
/**
@ -138,7 +110,8 @@ public DatanodeID getLeader() {
* @return First Machine.
*/
public String getLeaderHost() {
return datanodes.get(leaderID).getHostName();
return pipelineChannel.getDatanodes()
.get(pipelineChannel.getLeaderID()).getHostName();
}
/**
@ -148,7 +121,7 @@ public String getLeaderHost() {
*/
@JsonIgnore
public List<DatanodeID> getMachines() {
return new ArrayList<>(datanodes.values());
return new ArrayList<>(pipelineChannel.getDatanodes().values());
}
/**
@ -158,7 +131,7 @@ public List<DatanodeID> getMachines() {
*/
public List<String> getDatanodeHosts() {
List<String> dataHosts = new ArrayList<>();
for (DatanodeID id : datanodes.values()) {
for (DatanodeID id : pipelineChannel.getDatanodes().values()) {
dataHosts.add(id.getHostName());
}
return dataHosts;
@ -173,22 +146,8 @@ public List<String> getDatanodeHosts() {
public OzoneProtos.Pipeline getProtobufMessage() {
OzoneProtos.Pipeline.Builder builder =
OzoneProtos.Pipeline.newBuilder();
for (DatanodeID datanode : datanodes.values()) {
builder.addMembers(datanode.getProtoBufMessage());
}
builder.setLeaderID(leaderID);
builder.setContainerName(this.containerName);
if (this.getLifeCycleState() != null) {
builder.setState(this.getLifeCycleState());
}
if (this.getType() != null) {
builder.setType(this.getType());
}
if (this.getFactor() != null) {
builder.setFactor(this.getFactor());
}
builder.setPipelineChannel(this.pipelineChannel.getProtobufMessage());
return builder.build();
}
@ -201,15 +160,6 @@ public String getContainerName() {
return containerName;
}
/**
* Sets the container Name.
*
* @param containerName - Name of the container.
*/
public void setContainerName(String containerName) {
this.containerName = containerName;
}
/**
* Returns private data that is set on this pipeline.
*
@ -223,6 +173,11 @@ public byte[] getData() {
}
}
@VisibleForTesting
public PipelineChannel getPipelineChannel() {
return pipelineChannel;
}
/**
* Set private data on pipeline.
*
@ -240,16 +195,7 @@ public void setData(byte[] data) {
* @return - LifeCycleStates.
*/
public OzoneProtos.LifeCycleState getLifeCycleState() {
return lifeCycleState;
}
/**
* Sets the lifecycleState.
*
* @param lifeCycleStates - Enum
*/
public void setLifeCycleState(OzoneProtos.LifeCycleState lifeCycleStates) {
this.lifeCycleState = lifeCycleStates;
return pipelineChannel.getLifeCycleState();
}
/**
@ -258,16 +204,7 @@ public void setLifeCycleState(OzoneProtos.LifeCycleState lifeCycleStates) {
* @return - Name of the pipeline
*/
public String getPipelineName() {
return pipelineName;
}
/**
* Sets the pipeline name.
*
* @param pipelineName - Sets the name.
*/
public void setPipelineName(String pipelineName) {
this.pipelineName = pipelineName;
return pipelineChannel.getName();
}
/**
@ -276,24 +213,16 @@ public void setPipelineName(String pipelineName) {
* @return type - Standalone, Ratis, Chained.
*/
public OzoneProtos.ReplicationType getType() {
return type;
}
/**
* Sets the type of this pipeline.
*
* @param type - Standalone, Ratis, Chained.
*/
public void setType(OzoneProtos.ReplicationType type) {
this.type = type;
return pipelineChannel.getType();
}
@Override
public String toString() {
final StringBuilder b = new StringBuilder(getClass().getSimpleName())
.append("[");
datanodes.keySet().stream()
.forEach(id -> b.append(id.endsWith(leaderID) ? "*" + id : id));
pipelineChannel.getDatanodes().keySet().stream()
.forEach(id -> b.
append(id.endsWith(pipelineChannel.getLeaderID()) ? "*" + id : id));
b.append("] container:").append(containerName);
b.append(" name:").append(getPipelineName());
if (getType() != null) {

View File

@ -0,0 +1,121 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.scm.container.common.helpers;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleState;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor;
import java.util.Map;
import java.util.TreeMap;
/**
* PipelineChannel information for a {@link Pipeline}.
*/
public class PipelineChannel {
@JsonIgnore
private String leaderID;
@JsonIgnore
private Map<String, DatanodeID> datanodes;
private LifeCycleState lifeCycleState;
private ReplicationType type;
private ReplicationFactor factor;
private String name;
public PipelineChannel(String leaderID, LifeCycleState lifeCycleState,
ReplicationType replicationType, ReplicationFactor replicationFactor,
String name) {
this.leaderID = leaderID;
this.lifeCycleState = lifeCycleState;
this.type = replicationType;
this.factor = replicationFactor;
this.name = name;
datanodes = new TreeMap<>();
}
public String getLeaderID() {
return leaderID;
}
public Map<String, DatanodeID> getDatanodes() {
return datanodes;
}
public LifeCycleState getLifeCycleState() {
return lifeCycleState;
}
public ReplicationType getType() {
return type;
}
public ReplicationFactor getFactor() {
return factor;
}
public String getName() {
return name;
}
public void addMember(DatanodeID dataNodeId) {
datanodes.put(dataNodeId.getDatanodeUuid(), dataNodeId);
}
@JsonIgnore
public OzoneProtos.PipelineChannel getProtobufMessage() {
OzoneProtos.PipelineChannel.Builder builder =
OzoneProtos.PipelineChannel.newBuilder();
for (DatanodeID datanode : datanodes.values()) {
builder.addMembers(datanode.getProtoBufMessage());
}
builder.setLeaderID(leaderID);
if (this.getLifeCycleState() != null) {
builder.setState(this.getLifeCycleState());
}
if (this.getType() != null) {
builder.setType(this.getType());
}
if (this.getFactor() != null) {
builder.setFactor(this.getFactor());
}
return builder.build();
}
public static PipelineChannel getFromProtoBuf(
OzoneProtos.PipelineChannel transportProtos) {
Preconditions.checkNotNull(transportProtos);
PipelineChannel pipelineChannel =
new PipelineChannel(transportProtos.getLeaderID(),
transportProtos.getState(),
transportProtos.getType(),
transportProtos.getFactor(),
transportProtos.getName());
for (HdfsProtos.DatanodeIDProto dataID : transportProtos.getMembersList()) {
pipelineChannel.addMember(DatanodeID.getFromProtoBuf(dataID));
}
return pipelineChannel;
}
}

View File

@ -29,16 +29,20 @@ option java_generate_equals_and_hash = true;
package hadoop.hdfs.ozone;
import "hdfs.proto";
// A pipeline is composed of one or more datanodes that back a container.
message Pipeline {
message PipelineChannel {
required string leaderID = 1;
repeated DatanodeIDProto members = 2;
required string containerName = 3;
optional LifeCycleState state = 4 [default = OPEN];
optional ReplicationType type = 5 [default = STAND_ALONE];
optional ReplicationFactor factor = 6 [default = ONE];
optional string pipelineName = 7;
optional LifeCycleState state = 3 [default = OPEN];
optional ReplicationType type = 4 [default = STAND_ALONE];
optional ReplicationFactor factor = 5 [default = ONE];
optional string name = 6;
}
// A pipeline is composed of PipelineChannel (Ratis/StandAlone) that back a
// container.
message Pipeline {
required string containerName = 1;
required PipelineChannel pipelineChannel = 2;
}
message KeyValue {

View File

@ -532,10 +532,12 @@ private void insertContainerDB(Connection conn, String containerName,
Pipeline pipeline, Set<String> uuidChecked) throws SQLException {
LOG.info("Insert to sql container db, for container {}", containerName);
String insertContainerInfo = String.format(
INSERT_CONTAINER_INFO, containerName, pipeline.getLeaderID());
INSERT_CONTAINER_INFO, containerName,
pipeline.getPipelineChannel().getLeaderID());
executeSQL(conn, insertContainerInfo);
for (HdfsProtos.DatanodeIDProto dnID : pipeline.getMembersList()) {
for (HdfsProtos.DatanodeIDProto dnID :
pipeline.getPipelineChannel().getMembersList()) {
String uuid = dnID.getDatanodeUuid();
if (!uuidChecked.contains(uuid)) {
// we may also not use this checked set, but catch exception instead

View File

@ -18,16 +18,32 @@
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleState;
import org.apache.hadoop.scm.container.common.helpers.PipelineChannel;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Manage Ozone pipelines.
*/
public interface PipelineManager {
public abstract class PipelineManager {
private static final Logger LOG =
LoggerFactory.getLogger(PipelineManager.class);
private final List<PipelineChannel> activePipelineChannels;
private final AtomicInteger conduitsIndex;
public PipelineManager() {
activePipelineChannels = new LinkedList<>();
conduitsIndex = new AtomicInteger(0);
}
/**
* This function is called by the Container Manager while allocating a new
@ -39,31 +55,122 @@ public interface PipelineManager {
* @param replicationFactor - Replication Factor
* @return a Pipeline.
*/
Pipeline getPipeline(String containerName,
OzoneProtos.ReplicationFactor replicationFactor) throws IOException;
public synchronized final Pipeline getPipeline(String containerName,
ReplicationFactor replicationFactor, ReplicationType replicationType)
throws IOException {
/**
* In the Ozone world, we have a very simple policy.
*
* 1. Try to create a pipelineChannel if there are enough free nodes.
*
* 2. This allows all nodes to part of a pipelineChannel quickly.
*
* 3. if there are not enough free nodes, return conduits in a
* round-robin fashion.
*
* TODO: Might have to come up with a better algorithm than this.
* Create a new placement policy that returns conduits in round robin
* fashion.
*/
PipelineChannel pipelineChannel =
allocatePipelineChannel(replicationFactor);
if (pipelineChannel != null) {
LOG.debug("created new pipelineChannel:{} for container:{}",
pipelineChannel.getName(), containerName);
activePipelineChannels.add(pipelineChannel);
} else {
pipelineChannel =
findOpenPipelineChannel(replicationType, replicationFactor);
if (pipelineChannel != null) {
LOG.debug("re-used pipelineChannel:{} for container:{}",
pipelineChannel.getName(), containerName);
}
}
if (pipelineChannel == null) {
LOG.error("Get pipelineChannel call failed. We are not able to find" +
"free nodes or operational pipelineChannel.");
return null;
} else {
return new Pipeline(containerName, pipelineChannel);
}
}
protected int getReplicationCount(ReplicationFactor factor) {
switch (factor) {
case ONE:
return 1;
case THREE:
return 3;
default:
throw new IllegalArgumentException("Unexpected replication count");
}
}
public abstract PipelineChannel allocatePipelineChannel(
ReplicationFactor replicationFactor) throws IOException;
/**
* Find a PipelineChannel that is operational.
*
* @return - Pipeline or null
*/
private PipelineChannel findOpenPipelineChannel(
ReplicationType type, ReplicationFactor factor) {
PipelineChannel pipelineChannel = null;
final int sentinal = -1;
if (activePipelineChannels.size() == 0) {
LOG.error("No Operational conduits found. Returning null.");
return null;
}
int startIndex = getNextIndex();
int nextIndex = sentinal;
for (; startIndex != nextIndex; nextIndex = getNextIndex()) {
// Just walk the list in a circular way.
PipelineChannel temp =
activePipelineChannels
.get(nextIndex != sentinal ? nextIndex : startIndex);
// if we find an operational pipelineChannel just return that.
if ((temp.getLifeCycleState() == LifeCycleState.OPEN) &&
(temp.getFactor() == factor) && (temp.getType() == type)) {
pipelineChannel = temp;
break;
}
}
return pipelineChannel;
}
/**
* gets the next index of the PipelineChannel to get.
*
* @return index in the link list to get.
*/
private int getNextIndex() {
return conduitsIndex.incrementAndGet() % activePipelineChannels.size();
}
/**
* Creates a pipeline from a specified set of Nodes.
* @param pipelineID - Name of the pipeline
* @param datanodes - The list of datanodes that make this pipeline.
*/
void createPipeline(String pipelineID, List<DatanodeID> datanodes)
throws IOException;;
public abstract void createPipeline(String pipelineID,
List<DatanodeID> datanodes) throws IOException;
/**
* Close the pipeline with the given clusterId.
*/
void closePipeline(String pipelineID) throws IOException;
public abstract void closePipeline(String pipelineID) throws IOException;
/**
* list members in the pipeline .
* @return the datanode
*/
List<DatanodeID> getMembers(String pipelineID) throws IOException;
public abstract List<DatanodeID> getMembers(String pipelineID)
throws IOException;
/**
* Update the datanode list of the pipeline.
*/
void updatePipeline(String pipelineID, List<DatanodeID> newDatanodes)
throws IOException;
public abstract void updatePipeline(String pipelineID,
List<DatanodeID> newDatanodes) throws IOException;
}

View File

@ -22,6 +22,8 @@
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleState;
import org.apache.hadoop.ozone.scm.container.placement.algorithms.ContainerPlacementPolicy;
import org.apache.hadoop.ozone.scm.container.placement.algorithms.SCMContainerPlacementRandom;
import org.apache.hadoop.ozone.scm.node.NodeManager;
@ -29,6 +31,7 @@
import org.apache.hadoop.ozone.scm.pipelines.standalone.StandaloneManagerImpl;
import org.apache.hadoop.scm.ScmConfigKeys;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.scm.container.common.helpers.PipelineChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -80,20 +83,19 @@ public PipelineSelector(NodeManager nodeManager, Configuration conf) {
* The first of the list will be the leader node.
* @return pipeline corresponding to nodes
*/
public static Pipeline newPipelineFromNodes(final List<DatanodeID> nodes) {
public static PipelineChannel newPipelineFromNodes(List<DatanodeID> nodes,
LifeCycleState state, ReplicationType replicationType,
ReplicationFactor replicationFactor, String name) {
Preconditions.checkNotNull(nodes);
Preconditions.checkArgument(nodes.size() > 0);
String leaderId = nodes.get(0).getDatanodeUuid();
Pipeline pipeline = new Pipeline(leaderId);
PipelineChannel
pipelineChannel = new PipelineChannel(leaderId, state, replicationType,
replicationFactor, name);
for (DatanodeID node : nodes) {
pipeline.addMember(node);
pipelineChannel.addMember(node);
}
// A Standalone pipeline is always open, no action from the client
// is needed to make it open.
pipeline.setType(ReplicationType.STAND_ALONE);
pipeline.setLifeCycleState(OzoneProtos.LifeCycleState.OPEN);
return pipeline;
return pipelineChannel;
}
/**
@ -167,7 +169,8 @@ public Pipeline getReplicationPipeline(ReplicationType replicationType,
Preconditions.checkNotNull(manager, "Found invalid pipeline manager");
LOG.debug("Getting replication pipeline for {} : Replication {}",
containerName, replicationFactor.toString());
return manager.getPipeline(containerName, replicationFactor);
return manager.
getPipeline(containerName, replicationFactor, replicationType);
}
/**

View File

@ -19,7 +19,10 @@
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleState;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState;
import org.apache.hadoop.ozone.scm.container.placement.algorithms
.ContainerPlacementPolicy;
import org.apache.hadoop.ozone.scm.node.NodeManager;
@ -27,6 +30,7 @@
import org.apache.hadoop.ozone.scm.pipelines.PipelineSelector;
import org.apache.hadoop.scm.XceiverClientRatis;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.scm.container.common.helpers.PipelineChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -36,28 +40,19 @@
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos
.LifeCycleState.OPEN;
/**
* Implementation of {@link PipelineManager}.
*
* TODO : Introduce a state machine.
*/
public class RatisManagerImpl implements PipelineManager {
public class RatisManagerImpl extends PipelineManager {
private static final Logger LOG =
LoggerFactory.getLogger(RatisManagerImpl.class);
private final NodeManager nodeManager;
private final ContainerPlacementPolicy placementPolicy;
private final long containerSize;
private final Set<DatanodeID> ratisMembers;
private final List<Pipeline> activePipelines;
private final AtomicInteger pipelineIndex;
private static final String PREFIX = "Ratis-";
private final Configuration conf;
private final NodeManager nodeManager;
private final Set<DatanodeID> ratisMembers;
/**
* Constructs a Ratis Pipeline Manager.
@ -66,147 +61,22 @@ public class RatisManagerImpl implements PipelineManager {
*/
public RatisManagerImpl(NodeManager nodeManager,
ContainerPlacementPolicy placementPolicy, long size, Configuration conf) {
this.nodeManager = nodeManager;
this.placementPolicy = placementPolicy;
this.containerSize = size;
ratisMembers = new HashSet<>();
activePipelines = new LinkedList<>();
pipelineIndex = new AtomicInteger(0);
super();
this.conf = conf;
this.nodeManager = nodeManager;
ratisMembers = new HashSet<>();
}
/**
* This function is called by the Container Manager while allocation a new
* container. The client specifies what kind of replication pipeline is needed
* and based on the replication type in the request appropriate Interface is
* invoked.
* Allocates a new ratis PipelineChannel from the free nodes.
*
* @param containerName Name of the container
* @param replicationFactor - Replication Factor
* @return a Pipeline.
* <p>
* TODO: Evaulate if we really need this lock. Right now favoring safety over
* speed.
* @param factor - One or Three
* @return PipelineChannel.
*/
@Override
public synchronized Pipeline getPipeline(String containerName,
OzoneProtos.ReplicationFactor replicationFactor) throws IOException {
/**
* In the ratis world, we have a very simple policy.
*
* 1. Try to create a pipeline if there are enough free nodes.
*
* 2. This allows all nodes to part of a pipeline quickly.
*
* 3. if there are not enough free nodes, return pipelines in a
* round-robin fashion.
*
* TODO: Might have to come up with a better algorithm than this.
* Create a new placement policy that returns pipelines in round robin
* fashion.
*/
Pipeline pipeline = null;
List<DatanodeID> newNodes = allocatePipelineNodes(replicationFactor);
if (newNodes != null) {
Preconditions.checkState(newNodes.size() ==
getReplicationCount(replicationFactor), "Replication factor " +
"does not match the expected node count.");
pipeline =
allocateRatisPipeline(newNodes, containerName, replicationFactor);
try (XceiverClientRatis client =
XceiverClientRatis.newXceiverClientRatis(pipeline, conf)) {
client
.createPipeline(pipeline.getPipelineName(), pipeline.getMachines());
}
} else {
Pipeline openPipeline = findOpenPipeline(replicationFactor);
if (openPipeline != null) {
// if an open pipeline is found use the same machines
pipeline = allocateRatisPipeline(openPipeline.getMachines(),
containerName, replicationFactor);
}
}
if (pipeline == null) {
LOG.error("Get pipeline call failed. We are not able to find free nodes" +
" or operational pipeline.");
}
return pipeline;
}
/**
* Find a pipeline that is operational.
*
* @return - Pipeline or null
*/
Pipeline findOpenPipeline(OzoneProtos.ReplicationFactor factor) {
Pipeline pipeline = null;
final int sentinal = -1;
if (activePipelines.size() == 0) {
LOG.error("No Operational pipelines found. Returning null.");
return pipeline;
}
int startIndex = getNextIndex();
int nextIndex = sentinal;
for (; startIndex != nextIndex; nextIndex = getNextIndex()) {
// Just walk the list in a circular way.
Pipeline temp =
activePipelines.get(nextIndex != sentinal ? nextIndex : startIndex);
// if we find an operational pipeline just return that.
if ((temp.getLifeCycleState() == OPEN) && (temp.getFactor() == factor)) {
pipeline = temp;
break;
}
}
return pipeline;
}
/**
* Allocate a new Ratis pipeline from the existing nodes.
*
* @param nodes - list of Nodes.
* @param containerName - container Name
* @return - Pipeline.
*/
Pipeline allocateRatisPipeline(List<DatanodeID> nodes, String containerName,
OzoneProtos.ReplicationFactor factor) {
Preconditions.checkNotNull(nodes);
Pipeline pipeline = PipelineSelector.newPipelineFromNodes(nodes);
if (pipeline != null) {
// Start all pipeline names with "Ratis", easy to grep the logs.
String pipelineName = PREFIX +
UUID.randomUUID().toString().substring(PREFIX.length());
pipeline.setType(OzoneProtos.ReplicationType.RATIS);
pipeline.setLifeCycleState(OPEN);
pipeline.setFactor(factor);
pipeline.setPipelineName(pipelineName);
pipeline.setContainerName(containerName);
LOG.info("Creating new ratis pipeline: {}", pipeline.toString());
activePipelines.add(pipeline);
}
return pipeline;
}
/**
* gets the next index of in the pipelines to get.
*
* @return index in the link list to get.
*/
private int getNextIndex() {
return pipelineIndex.incrementAndGet() % activePipelines.size();
}
/**
* Allocates a set of new nodes for the Ratis pipeline.
*
* @param replicationFactor - One or Three
* @return List of Datanodes.
*/
private List<DatanodeID> allocatePipelineNodes(OzoneProtos.ReplicationFactor
replicationFactor) {
public PipelineChannel allocatePipelineChannel(ReplicationFactor factor) {
List<DatanodeID> newNodesList = new LinkedList<>();
List<DatanodeID> datanodes =
nodeManager.getNodes(OzoneProtos.NodeState.HEALTHY);
int count = getReplicationCount(replicationFactor);
List<DatanodeID> datanodes = nodeManager.getNodes(NodeState.HEALTHY);
int count = getReplicationCount(factor);
//TODO: Add Raft State to the Nodes, so we can query and skip nodes from
// data from datanode instead of maintaining a set.
for (DatanodeID datanode : datanodes) {
@ -217,25 +87,28 @@ private List<DatanodeID> allocatePipelineNodes(OzoneProtos.ReplicationFactor
// once a datanode has been added to a pipeline, exclude it from
// further allocations
ratisMembers.addAll(newNodesList);
LOG.info("Allocating a new pipeline of size: {}", count);
return newNodesList;
LOG.info("Allocating a new pipelineChannel of size: {}", count);
// Start all channel names with "Ratis", easy to grep the logs.
String conduitName = PREFIX +
UUID.randomUUID().toString().substring(PREFIX.length());
PipelineChannel pipelineChannel =
PipelineSelector.newPipelineFromNodes(newNodesList,
LifeCycleState.OPEN, ReplicationType.RATIS, factor, conduitName);
Pipeline pipeline =
new Pipeline("setup", pipelineChannel);
try (XceiverClientRatis client =
XceiverClientRatis.newXceiverClientRatis(pipeline, conf)) {
client.createPipeline(pipeline.getPipelineName(), newNodesList);
} catch (IOException e) {
return null;
}
return pipelineChannel;
}
}
}
return null;
}
private int getReplicationCount(OzoneProtos.ReplicationFactor factor) {
switch (factor) {
case ONE:
return 1;
case THREE:
return 3;
default:
throw new IllegalArgumentException("Unexpected replication count");
}
}
/**
* Creates a pipeline from a specified set of Nodes.
*

View File

@ -16,30 +16,38 @@
*/
package org.apache.hadoop.ozone.scm.pipelines.standalone;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleState;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState;
import org.apache.hadoop.ozone.scm.container.placement.algorithms.ContainerPlacementPolicy;
import org.apache.hadoop.ozone.scm.node.NodeManager;
import org.apache.hadoop.ozone.scm.pipelines.PipelineManager;
import org.apache.hadoop.ozone.scm.pipelines.PipelineSelector;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.scm.container.common.helpers.PipelineChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
import java.util.UUID;
import java.util.Set;
import java.util.HashSet;
import java.util.LinkedList;
/**
* Standalone Manager Impl to prove that pluggable interface
* works with current tests.
*/
public class StandaloneManagerImpl implements PipelineManager {
public class StandaloneManagerImpl extends PipelineManager {
private static final Logger LOG =
LoggerFactory.getLogger(StandaloneManagerImpl.class);
private final NodeManager nodeManager;
private final ContainerPlacementPolicy placementPolicy;
private final long containerSize;
private final Set<DatanodeID> standAloneMembers;
/**
* Constructor for Standalone Node Manager Impl.
@ -49,34 +57,42 @@ public class StandaloneManagerImpl implements PipelineManager {
*/
public StandaloneManagerImpl(NodeManager nodeManager,
ContainerPlacementPolicy placementPolicy, long containerSize) {
super();
this.nodeManager = nodeManager;
this.placementPolicy = placementPolicy;
this.containerSize = containerSize;
this.standAloneMembers = new HashSet<>();
}
/**
* This function is called by the Container Manager while allocating a new
* container. The client specifies what kind of replication pipeline is needed
* and based on the replication type in the request appropriate Interface is
* invoked.
* Allocates a new standalone PipelineChannel from the free nodes.
*
* @param containerName Name of the container
* @param replicationFactor - Replication Factor
* @return a Pipeline.
* @param factor - One
* @return PipelineChannel.
*/
@Override
public Pipeline getPipeline(String containerName, OzoneProtos
.ReplicationFactor replicationFactor) throws IOException {
List<DatanodeID> datanodes = placementPolicy.chooseDatanodes(
replicationFactor.getNumber(), containerSize);
Pipeline pipeline = PipelineSelector.newPipelineFromNodes(datanodes);
String pipelineName = "SA-" + UUID.randomUUID().toString().substring(3);
pipeline.setContainerName(containerName);
pipeline.setPipelineName(pipelineName);
pipeline.setFactor(replicationFactor);
LOG.info("Creating new standalone pipeline: {}", pipeline.toString());
return pipeline;
public PipelineChannel allocatePipelineChannel(ReplicationFactor factor) {
List<DatanodeID> newNodesList = new LinkedList<>();
List<DatanodeID> datanodes = nodeManager.getNodes(NodeState.HEALTHY);
int count = getReplicationCount(factor);
for (DatanodeID datanode : datanodes) {
Preconditions.checkNotNull(datanode);
if (!standAloneMembers.contains(datanode)) {
newNodesList.add(datanode);
if (newNodesList.size() == count) {
// once a datanode has been added to a pipeline, exclude it from
// further allocations
standAloneMembers.addAll(newNodesList);
LOG.info("Allocating a new pipeline channel of size: {}", count);
String channelName =
"SA-" + UUID.randomUUID().toString().substring(3);
return PipelineSelector.newPipelineFromNodes(newNodesList,
LifeCycleState.OPEN, ReplicationType.STAND_ALONE,
ReplicationFactor.ONE, channelName);
}
}
}
return null;
}
/**

View File

@ -29,8 +29,12 @@
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleState;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType;
import org.apache.hadoop.scm.XceiverClientManager;
import org.apache.hadoop.scm.XceiverClientSpi;
import org.apache.hadoop.scm.container.common.helpers.PipelineChannel;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.scm.protocolPB
.StorageContainerLocationProtocolClientSideTranslatorPB;
@ -301,7 +305,10 @@ public void testRetryLog() throws IOException,
String data = RandomStringUtils.random(4 * KB);
List<Pipeline> fakeContainerPipelines = new LinkedList<>();
Pipeline fakePipeline = new Pipeline("fake");
PipelineChannel pipelineChannel = new PipelineChannel("fake",
LifeCycleState.OPEN, ReplicationType.STAND_ALONE, ReplicationFactor.ONE,
"fake");
Pipeline fakePipeline = new Pipeline("fake", pipelineChannel);
fakePipeline.setData(Longs.toByteArray(1));
fakeContainerPipelines.add(fakePipeline);

View File

@ -27,9 +27,11 @@
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
import org.apache.hadoop.ozone.container.ozoneimpl.TestOzoneContainer;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
import org.apache.hadoop.scm.ScmConfigKeys;
import org.apache.hadoop.scm.XceiverClient;
import org.apache.hadoop.scm.container.common.helpers.PipelineChannel;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.test.PathUtils;
import org.apache.hadoop.test.TestGenericTestUtils;
@ -105,9 +107,13 @@ public void testStartMultipleDatanodes() throws Exception {
// Create a single member pipe line
String containerName = OzoneUtils.getRequestID();
DatanodeID dnId = dn.getDatanodeId();
Pipeline pipeline = new Pipeline(dnId.getDatanodeUuid());
pipeline.addMember(dnId);
pipeline.setContainerName(containerName);
final PipelineChannel pipelineChannel =
new PipelineChannel(dnId.getDatanodeUuid(),
OzoneProtos.LifeCycleState.OPEN,
OzoneProtos.ReplicationType.STAND_ALONE,
OzoneProtos.ReplicationFactor.ONE, "test");
pipelineChannel.addMember(dnId);
Pipeline pipeline = new Pipeline(containerName, pipelineChannel);
// Verify client is able to connect to the container
try (XceiverClient client = new XceiverClient(pipeline, conf)){

View File

@ -32,7 +32,11 @@
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
import org.apache.hadoop.ozone.container.common.helpers.KeyData;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleState;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.KeyValue;
import org.apache.hadoop.scm.container.common.helpers.PipelineChannel;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert;
@ -119,16 +123,15 @@ public static Pipeline createPipeline(
final Iterator<DatanodeID> i = ids.iterator();
Preconditions.checkArgument(i.hasNext());
final DatanodeID leader = i.next();
final Pipeline pipeline = new Pipeline(leader.getDatanodeUuid());
pipeline.setContainerName(containerName);
pipeline.addMember(leader);
pipeline.setFactor(OzoneProtos.ReplicationFactor.ONE);
pipeline.setType(OzoneProtos.ReplicationType.STAND_ALONE);
String pipelineName = "TEST-" + UUID.randomUUID().toString().substring(3);
final PipelineChannel pipelineChannel =
new PipelineChannel(leader.getDatanodeUuid(), LifeCycleState.OPEN,
ReplicationType.STAND_ALONE, ReplicationFactor.ONE, pipelineName);
pipelineChannel.addMember(leader);
for(; i.hasNext();) {
pipeline.addMember(i.next());
pipelineChannel.addMember(i.next());
}
return pipeline;
return new Pipeline(containerName, pipelineChannel);
}
/**
@ -193,8 +196,9 @@ public static ContainerCommandRequestProto getWriteChunkRequest(
ContainerProtos.WriteChunkRequestProto
.newBuilder();
pipeline.setContainerName(containerName);
writeRequest.setPipeline(pipeline.getProtobufMessage());
Pipeline newPipeline =
new Pipeline(containerName, pipeline.getPipelineChannel());
writeRequest.setPipeline(newPipeline.getProtobufMessage());
writeRequest.setKeyName(keyName);
byte[] data = getData(datalen);
@ -209,7 +213,7 @@ public static ContainerCommandRequestProto getWriteChunkRequest(
request.setCmdType(ContainerProtos.Type.WriteChunk);
request.setWriteChunk(writeRequest);
request.setTraceID(UUID.randomUUID().toString());
request.setDatanodeID(pipeline.getLeader().getDatanodeUuid().toString());
request.setDatanodeID(newPipeline.getLeader().getDatanodeUuid());
return request.build();
}
@ -228,7 +232,8 @@ public static ContainerCommandRequestProto getWriteSmallFileRequest(
throws Exception {
ContainerProtos.PutSmallFileRequestProto.Builder smallFileRequest =
ContainerProtos.PutSmallFileRequestProto.newBuilder();
pipeline.setContainerName(containerName);
Pipeline newPipeline =
new Pipeline(containerName, pipeline.getPipelineChannel());
byte[] data = getData(dataLen);
ChunkInfo info = getChunk(keyName, 0, 0, dataLen);
setDataChecksum(info, data);
@ -237,7 +242,7 @@ public static ContainerCommandRequestProto getWriteSmallFileRequest(
ContainerProtos.PutKeyRequestProto.Builder putRequest =
ContainerProtos.PutKeyRequestProto.newBuilder();
putRequest.setPipeline(pipeline.getProtobufMessage());
putRequest.setPipeline(newPipeline.getProtobufMessage());
KeyData keyData = new KeyData(containerName, keyName);
List<ContainerProtos.ChunkInfo> newList = new LinkedList<>();
@ -254,7 +259,7 @@ public static ContainerCommandRequestProto getWriteSmallFileRequest(
request.setCmdType(ContainerProtos.Type.PutSmallFile);
request.setPutSmallFile(smallFileRequest);
request.setTraceID(UUID.randomUUID().toString());
request.setDatanodeID(pipeline.getLeader().getDatanodeUuid());
request.setDatanodeID(newPipeline.getLeader().getDatanodeUuid());
return request.build();
}
@ -390,8 +395,7 @@ public static ContainerCommandRequestProto getUpdateContainerRequest(
containerData.setName(containerName);
String[] keys = metaData.keySet().toArray(new String[]{});
for(int i=0; i<keys.length; i++) {
OzoneProtos.KeyValue.Builder kvBuilder =
OzoneProtos.KeyValue.newBuilder();
KeyValue.Builder kvBuilder = KeyValue.newBuilder();
kvBuilder.setKey(keys[i]);
kvBuilder.setValue(metaData.get(keys[i]));
containerData.addMetadata(i, kvBuilder.build());

View File

@ -371,18 +371,19 @@ private ChunkInfo writeChunkHelper(String containerName, String keyName,
Pipeline pipeline) throws IOException,
NoSuchAlgorithmException {
final int datalen = 1024;
pipeline.setContainerName(containerName);
Pipeline newPipeline =
new Pipeline(containerName, pipeline.getPipelineChannel());
ContainerData cData = new ContainerData(containerName, conf);
cData.addMetadata("VOLUME", "shire");
cData.addMetadata("owner", "bilbo");
if(!containerManager.getContainerMap()
.containsKey(containerName)) {
containerManager.createContainer(pipeline, cData);
containerManager.createContainer(newPipeline, cData);
}
ChunkInfo info = getChunk(keyName, 0, 0, datalen);
byte[] data = getData(datalen);
setDataChecksum(info, data);
chunkManager.writeChunk(pipeline, keyName, info, data, COMBINED);
chunkManager.writeChunk(newPipeline, keyName, info, data, COMBINED);
return info;
}
@ -420,7 +421,6 @@ public void testWritReadManyChunks() throws IOException,
Pipeline pipeline = createSingleNodePipeline(containerName);
Map<String, ChunkInfo> fileHashMap = new HashMap<>();
pipeline.setContainerName(containerName);
ContainerData cData = new ContainerData(containerName, conf);
cData.addMetadata("VOLUME", "shire");
cData.addMetadata("owner)", "bilbo");
@ -484,7 +484,6 @@ public void testPartialRead() throws Exception {
String keyName = OzoneUtils.getRequestID();
Pipeline pipeline = createSingleNodePipeline(containerName);
pipeline.setContainerName(containerName);
ContainerData cData = new ContainerData(containerName, conf);
cData.addMetadata("VOLUME", "shire");
cData.addMetadata("owner)", "bilbo");
@ -519,7 +518,6 @@ public void testOverWrite() throws IOException,
String keyName = OzoneUtils.getRequestID();
Pipeline pipeline = createSingleNodePipeline(containerName);
pipeline.setContainerName(containerName);
ContainerData cData = new ContainerData(containerName, conf);
cData.addMetadata("VOLUME", "shire");
cData.addMetadata("owner)", "bilbo");
@ -562,7 +560,6 @@ public void testMultipleWriteSingleRead() throws IOException,
String keyName = OzoneUtils.getRequestID();
Pipeline pipeline = createSingleNodePipeline(containerName);
pipeline.setContainerName(containerName);
ContainerData cData = new ContainerData(containerName, conf);
cData.addMetadata("VOLUME", "shire");
cData.addMetadata("owner)", "bilbo");
@ -601,7 +598,6 @@ public void testDeleteChunk() throws IOException,
String keyName = OzoneUtils.getRequestID();
Pipeline pipeline = createSingleNodePipeline(containerName);
pipeline.setContainerName(containerName);
ContainerData cData = new ContainerData(containerName, conf);
cData.addMetadata("VOLUME", "shire");
cData.addMetadata("owner)", "bilbo");

View File

@ -21,10 +21,14 @@
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleState;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
import org.apache.hadoop.ozone.scm.container.ContainerMapping;
import org.apache.hadoop.ozone.scm.container.Mapping;
import org.apache.hadoop.scm.container.common.helpers.PipelineChannel;
import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.test.GenericTestUtils;
@ -328,12 +332,14 @@ public void testDeletedBlockTransactions() throws IOException {
private void mockContainerInfo(Mapping mappingService, String containerName,
DatanodeID dnID) throws IOException {
Pipeline pipeline = new Pipeline("fake");
pipeline.addMember(dnID);
PipelineChannel pipelineChannel =
new PipelineChannel("fake", LifeCycleState.OPEN,
ReplicationType.STAND_ALONE, ReplicationFactor.ONE, "fake");
pipelineChannel.addMember(dnID);
Pipeline pipeline = new Pipeline(containerName, pipelineChannel);
ContainerInfo.Builder builder = new ContainerInfo.Builder();
builder.setPipeline(pipeline);
builder.setContainerName(containerName);
ContainerInfo conatinerInfo = builder.build();
Mockito.doReturn(conatinerInfo).when(mappingService)

View File

@ -54,7 +54,6 @@
.HEALTHY;
import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_DEFAULT;
import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_MB;
import static org.hamcrest.core.StringStartsWith.startsWith;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@ -164,32 +163,6 @@ public void testContainerPlacementCapacity() throws IOException,
.getPipeline();
assertEquals(xceiverClientManager.getFactor().getNumber(),
pipeline1.getMachines().size());
final long newUsed = 7L * OzoneConsts.GB;
final long newRemaining = capacity - newUsed;
for (DatanodeID datanodeID : datanodes) {
SCMNodeReport.Builder nrb = SCMNodeReport.newBuilder();
SCMStorageReport.Builder srb = SCMStorageReport.newBuilder();
srb.setStorageUuid(UUID.randomUUID().toString());
srb.setCapacity(capacity).setScmUsed(newUsed).
setRemaining(newRemaining).build();
nodeManager.sendHeartbeat(datanodeID,
nrb.addStorageReport(srb).build(), reportState);
}
GenericTestUtils.waitFor(() -> nodeManager.getStats().getRemaining()
.get() == nodeCount * newRemaining,
100, 4 * 1000);
thrown.expect(IOException.class);
thrown.expectMessage(
startsWith("Unable to find enough nodes that meet "
+ "the space requirement"));
String container2 = UUID.randomUUID().toString();
containerManager.allocateContainer(xceiverClientManager.getType(),
xceiverClientManager.getFactor(), container2,
"OZONE");
} finally {
IOUtils.closeQuietly(containerManager);
IOUtils.closeQuietly(nodeManager);