diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/SCMConnectionManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/SCMConnectionManager.java
index 33d361b8d7..a2384e835e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/SCMConnectionManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/SCMConnectionManager.java
@@ -73,7 +73,7 @@ public Configuration getConf() {
*
* @return - Return RPC timeout.
*/
- public long getRpcTimeout() {
+ public int getRpcTimeout() {
return rpcTimeout;
}
@@ -128,7 +128,7 @@ public void addSCMServer(InetSocketAddress address) throws IOException {
StorageContainerDatanodeProtocolPB rpcProxy = RPC.getProxy(
StorageContainerDatanodeProtocolPB.class, version,
address, UserGroupInformation.getCurrentUser(), conf,
- NetUtils.getDefaultSocketFactory(conf), rpcTimeout);
+ NetUtils.getDefaultSocketFactory(conf), getRpcTimeout());
StorageContainerDatanodeProtocolClientSideTranslatorPB rpcClient =
new StorageContainerDatanodeProtocolClientSideTranslatorPB(rpcProxy);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
index 4fa8729257..15a241edf3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
@@ -19,10 +19,10 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ozone.container.common.states.datanode.InitDatanodeState;
import org.apache.hadoop.ozone.container.common.states.DatanodeState;
-import org.apache.hadoop.ozone.container.common.states.datanode
- .RunningDatanodeState;
+import org.apache.hadoop.ozone.container.common.states.datanode.RunningDatanodeState;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ReportState;
import java.util.LinkedList;
import java.util.Queue;
@@ -34,6 +34,10 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import static org.apache.hadoop.ozone.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.ReportState.states
+ .noContainerReports;
+
/**
* Current Context of State Machine.
*/
@@ -45,6 +49,9 @@ public class StateContext {
private final Configuration conf;
private DatanodeStateMachine.DatanodeStates state;
private SCMNodeReport nrState;
+ private ReportState reportState;
+ private static final ReportState DEFAULT_REPORT_STATE =
+ ReportState.newBuilder().setState(noContainerReports).setCount(0).build();
/**
* Constructs a StateContext.
@@ -174,6 +181,7 @@ public void execute(ExecutorService service, long time, TimeUnit unit)
if (isExiting(newState)) {
task.onExit();
}
+ this.clearReportState();
this.setState(newState);
}
}
@@ -215,4 +223,32 @@ public long getExecutionCount() {
}
+ /**
+ * Gets the ReportState.
+ * @return ReportState.
+ */
+ public synchronized ReportState getContainerReportState() {
+ if (reportState == null) {
+ return DEFAULT_REPORT_STATE;
+ }
+ return reportState;
+ }
+
+ /**
+ * Sets the ReportState.
+ * @param rState - ReportState.
+ */
+ public synchronized void setContainerReportState(ReportState rState) {
+ this.reportState = rState;
+ }
+
+ /**
+ * Clears report state after it has been communicated.
+ */
+ public synchronized void clearReportState() {
+ if(reportState != null) {
+ setContainerReportState(null);
+ }
+ }
+
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
index c0226bbf87..5aba6876c1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
@@ -26,6 +26,13 @@
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto;
+import org.apache.hadoop.ozone.protocol.commands.SendContainerCommand;
+import org.apache.hadoop.ozone.protocol.proto
+ .StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto;
+import org.apache.hadoop.ozone.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -89,12 +96,13 @@ public EndpointStateMachine.EndPointStates call() throws Exception {
DatanodeID datanodeID = DatanodeID.getFromProtoBuf(this
.containerNodeIDProto.getDatanodeID());
- rpcEndpoint.getEndPoint().sendHeartbeat(datanodeID,
- this.context.getNodeReport());
+ SCMHeartbeatResponseProto reponse = rpcEndpoint.getEndPoint()
+ .sendHeartbeat(datanodeID, this.context.getNodeReport(),
+ this.context.getContainerReportState());
+ processResponse(reponse);
rpcEndpoint.zeroMissedCount();
} catch (IOException ex) {
- rpcEndpoint.logIfNeeded(ex
- );
+ rpcEndpoint.logIfNeeded(ex);
} finally {
rpcEndpoint.unlock();
}
@@ -109,6 +117,27 @@ public static Builder newBuilder() {
return new Builder();
}
+ /**
+ * Add this command to command processing Queue.
+ *
+ * @param response - SCMHeartbeat response.
+ */
+ private void processResponse(SCMHeartbeatResponseProto response) {
+ for (SCMCommandResponseProto commandResponseProto : response
+ .getCommandsList()) {
+ if (commandResponseProto.getCmdType() ==
+ StorageContainerDatanodeProtocolProtos.Type.nullCmd) {
+ //this.context.addCommand(NullCommand.newBuilder().build());
+ LOG.debug("Discarding a null command from SCM.");
+ }
+ if (commandResponseProto.getCmdType() ==
+ StorageContainerDatanodeProtocolProtos.Type.sendContainerReport) {
+ this.context.addCommand(SendContainerCommand.getFromProtobuf(
+ commandResponseProto.getSendReport()));
+ }
+ }
+ }
+
/**
* Builder class for HeartbeatEndpointTask.
*/
@@ -138,8 +167,8 @@ public Builder setEndpointStateMachine(EndpointStateMachine rpcEndPoint) {
/**
* Sets the Config.
*
- * @param config - config
- * @return Builder
+ * @param config - config
+ * @return Builder
*/
public Builder setConfig(Configuration config) {
this.conf = config;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java
index e93baeffc5..76f359ca75 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java
@@ -18,6 +18,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ReportState;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
@@ -43,11 +44,12 @@ SCMVersionResponseProto getVersion(SCMVersionRequestProto versionRequest)
* Used by data node to send a Heartbeat.
* @param datanodeID - Datanode ID.
* @param nodeReport - node report state
+ * @param reportState - container report state.
* @return - SCMHeartbeatResponseProto
* @throws IOException
*/
SCMHeartbeatResponseProto sendHeartbeat(DatanodeID datanodeID,
- SCMNodeReport nodeReport) throws IOException;
+ SCMNodeReport nodeReport, ReportState reportState) throws IOException;
/**
* Register Datanode.
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/SendContainerCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/SendContainerCommand.java
new file mode 100644
index 0000000000..6a9fc44246
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/SendContainerCommand.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.protocol.commands;
+
+import org.apache.hadoop.ozone.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.SendContainerReportProto;
+import org.apache.hadoop.ozone.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.Type;
+
+/**
+ * Allows a Datanode to send in the container report.
+ */
+public class SendContainerCommand extends SCMCommand {
+ /**
+ * Returns a NullCommand class from NullCommandResponse Proto.
+ * @param unused - unused
+ * @return NullCommand
+ */
+ public static SendContainerCommand getFromProtobuf(
+ final SendContainerReportProto unused) {
+ return new SendContainerCommand();
+ }
+
+ /**
+ * returns a new builder.
+ * @return Builder
+ */
+ public static SendContainerCommand.Builder newBuilder() {
+ return new SendContainerCommand.Builder();
+ }
+
+ /**
+ * Returns the type of this command.
+ *
+ * @return Type
+ */
+ @Override
+ public Type getType() {
+ return Type.sendContainerReport;
+ }
+
+ /**
+ * Gets the protobuf message of this object.
+ *
+ * @return A protobuf message.
+ */
+ @Override
+ public byte[] getProtoBufMessage() {
+ return SendContainerReportProto.newBuilder().build().toByteArray();
+ }
+
+ /**
+ * A Builder class this is the standard pattern we are using for all commands.
+ */
+ public static class Builder {
+ /**
+ * Return a null command.
+ * @return - NullCommand.
+ */
+ public SendContainerCommand build() {
+ return new SendContainerCommand();
+ }
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java
index 477fac5ade..78b46cfc63 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java
@@ -23,6 +23,8 @@
import org.apache.hadoop.ipc.ProtocolTranslator;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
+import org.apache.hadoop.ozone.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.ReportState;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
import org.apache.hadoop.ozone.protocol.proto
@@ -121,11 +123,12 @@ public SCMVersionResponseProto getVersion(SCMVersionRequestProto
@Override
public SCMHeartbeatResponseProto sendHeartbeat(DatanodeID datanodeID,
- SCMNodeReport nodeReport) throws IOException {
+ SCMNodeReport nodeReport, ReportState reportState) throws IOException {
SCMHeartbeatRequestProto.Builder req = SCMHeartbeatRequestProto
.newBuilder();
req.setDatanodeID(datanodeID.getProtoBufMessage());
req.setNodeReport(nodeReport);
+ req.setContainerReportState(reportState);
final SCMHeartbeatResponseProto resp;
try {
resp = rpcProxy.sendHeartbeat(NULL_RPC_CONTROLLER, req.build());
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java
index 995290f788..ca13305af5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java
@@ -78,7 +78,8 @@ public StorageContainerDatanodeProtocolServerSideTranslatorPB(
SCMHeartbeatRequestProto request) throws ServiceException {
try {
return impl.sendHeartbeat(DatanodeID.getFromProtoBuf(request
- .getDatanodeID()), request.getNodeReport());
+ .getDatanodeID()), request.getNodeReport(),
+ request.getContainerReportState());
} catch (IOException e) {
throw new ServiceException(e);
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java
index 3f31e98c36..5820e6decb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java
@@ -36,6 +36,8 @@
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.ReportState;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.NullCmdResponseProto;
import org.apache.hadoop.ozone.protocol.proto
@@ -391,12 +393,14 @@ public SCMVersionResponseProto getVersion(
* Used by data node to send a Heartbeat.
*
* @param datanodeID - Datanode ID.
+ * @param nodeReport - Node Report
+ * @param reportState - Container report ready info.
* @return - SCMHeartbeatResponseProto
* @throws IOException
*/
@Override
public SCMHeartbeatResponseProto sendHeartbeat(DatanodeID datanodeID,
- SCMNodeReport nodeReport) throws IOException {
+ SCMNodeReport nodeReport, ReportState reportState) throws IOException {
List commands =
getScmNodeManager().sendHeartbeat(datanodeID, nodeReport);
List cmdReponses = new LinkedList<>();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java
index 346cb88ee8..6bcdb4eb05 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java
@@ -49,7 +49,7 @@ public class ContainerMapping implements Mapping {
private final long cacheSize;
private final Lock lock;
private final Charset encoding = Charset.forName("UTF-8");
- private final LevelDBStore store;
+ private final LevelDBStore containerStore;
private final Random rand;
/**
@@ -75,11 +75,14 @@ public ContainerMapping(Configuration conf, NodeManager nodeManager,
throw
new IllegalArgumentException("SCM metadata directory is not valid.");
}
- File dbPath = new File(scmMetaDataDir, "SCM.db");
Options options = new Options();
options.cacheSize(this.cacheSize * (1024L * 1024L));
options.createIfMissing();
- store = new LevelDBStore(dbPath, options);
+
+ // Write the container name to pipeline mapping.
+ File containerDBPath = new File(scmMetaDataDir, "container.db");
+ containerStore = new LevelDBStore(containerDBPath, options);
+
this.lock = new ReentrantLock();
rand = new Random();
}
@@ -103,6 +106,8 @@ private static Pipeline newPipelineFromNodes(DatanodeID node, String
return pipeline;
}
+
+
/**
* Returns the Pipeline from the container name.
*
@@ -114,7 +119,8 @@ public Pipeline getContainer(String containerName) throws IOException {
Pipeline pipeline = null;
lock.lock();
try {
- byte[] pipelineBytes = store.get(containerName.getBytes(encoding));
+ byte[] pipelineBytes =
+ containerStore.get(containerName.getBytes(encoding));
if (pipelineBytes == null) {
throw new IOException("Specified key does not exist. key : " +
containerName);
@@ -145,7 +151,8 @@ public Pipeline allocateContainer(String containerName) throws IOException {
lock.lock();
try {
- byte[] pipelineBytes = store.get(containerName.getBytes(encoding));
+ byte[] pipelineBytes =
+ containerStore.get(containerName.getBytes(encoding));
if (pipelineBytes != null) {
throw new IOException("Specified container already exists. key : " +
containerName);
@@ -153,7 +160,7 @@ public Pipeline allocateContainer(String containerName) throws IOException {
DatanodeID id = getDatanodeID();
if (id != null) {
pipeline = newPipelineFromNodes(id, containerName);
- store.put(containerName.getBytes(encoding),
+ containerStore.put(containerName.getBytes(encoding),
pipeline.getProtobufMessage().toByteArray());
}
} finally {
@@ -193,8 +200,8 @@ private DatanodeID getDatanodeID() throws IOException {
*/
@Override
public void close() throws IOException {
- if (store != null) {
- store.close();
+ if (containerStore != null) {
+ containerStore.close();
}
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/CommandQueue.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/CommandQueue.java
index fce86f7807..9a49fc5c13 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/CommandQueue.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/CommandQueue.java
@@ -39,7 +39,8 @@ public class CommandQueue {
private final Map> commandMap;
private final Lock lock;
- private final List nullList;
+ // This map is used as default return value containing one null command.
+ private static final List DEFAULT_LIST = new LinkedList<>();
/**
* Constructs a Command Queue.
@@ -47,8 +48,7 @@ public class CommandQueue {
public CommandQueue() {
commandMap = new HashMap<>();
lock = new ReentrantLock();
- nullList = new LinkedList<>();
- nullList.add(NullCommand.newBuilder().build());
+ DEFAULT_LIST.add(NullCommand.newBuilder().build());
}
/**
@@ -75,7 +75,7 @@ List getCommand(final DatanodeID datanodeID) {
} finally {
lock.unlock();
}
- return nullList;
+ return DEFAULT_LIST;
}
/**
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto
index ee1378f1e8..38be4bea08 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto
@@ -47,15 +47,80 @@ import "DatanodeContainerProtocol.proto";
*/
message SCMHeartbeatRequestProto {
required DatanodeIDProto datanodeID = 1;
- optional SCMNodeReport nodeReport= 2;
+ optional SCMNodeReport nodeReport = 2;
+ optional ReportState containerReportState = 3;
}
+enum ContainerState {
+ closed = 0;
+ open = 1;
+}
+
+/**
+NodeState contains messages from datanode to SCM saying that it has
+some information that SCM might be interested in.*/
+message ReportState {
+ enum states {
+ noContainerReports = 0;
+ completeContinerReport = 1;
+ deltaContainerReport = 2;
+ }
+ required states state = 1;
+ required int64 count = 2 [default = 0];
+}
+
+
+/**
+This message is used to persist the information about a container in the
+SCM database, This information allows SCM to startup faster and avoid having
+all container info in memory all the time.
+ */
+message ContainerPersistanceProto {
+ required ContainerState state = 1;
+ required hadoop.hdfs.ozone.Pipeline pipeline = 2;
+ required ContainerInfo info = 3;
+}
+
+/**
+This message is used to do a quick look up of which containers are effected
+if a node goes down
+*/
+message NodeContianerMapping {
+ repeated string contianerName = 1;
+}
+
+
+
+/**
+A container report contains the following information.
+*/
+message ContainerInfo {
+ required string containerName = 1;
+ repeated bytes finalhash = 2;
+ optional int64 size = 3;
+ optional int64 keycount = 4;
+}
+
+/**
+A set of container reports, max count is generally set to
+8192 since that keeps the size of the reports under 1 MB.
+*/
+message ContainerReports {
+ enum reportType {
+ fullReport = 0;
+ deltaReport = 1;
+ }
+ repeated ContainerInfo reports = 1;
+ required reportType type = 2;
+}
+
+
/**
* This message is send along with the heart beat to report datanode
* storage utilization by SCM.
*/
message SCMNodeReport {
- repeated SCMStorageReport storageReport= 1;
+ repeated SCMStorageReport storageReport = 1;
}
message SCMStorageReport {
@@ -123,6 +188,12 @@ message NullCmdResponseProto {
}
+/**
+This command tells the data node to send in the container report when possible
+*/
+message SendContainerReportProto {
+}
+
/**
Type of commands supported by SCM to datanode protocol.
*/
@@ -130,6 +201,7 @@ enum Type {
nullCmd = 1;
versionCommand = 2;
registeredCommand = 3;
+ sendContainerReport = 4;
}
/*
@@ -138,6 +210,10 @@ enum Type {
message SCMCommandResponseProto {
required Type cmdType = 2; // Type of the command
optional NullCmdResponseProto nullCommand = 3;
+ optional SCMRegisteredCmdResponseProto registeredProto = 4;
+ optional SCMVersionResponseProto versionProto = 5;
+ optional SendContainerReportProto sendReport = 6;
+
}
@@ -157,9 +233,9 @@ message SCMHeartbeatResponseProto {
* Here is a simple state diagram that shows how a datanode would boot up and
* communicate with SCM.
*
- * +-----------------------+
+ * -----------------------
* | Start |
- * +----------+------------+
+ * ---------- ------------
* |
* |
* |
@@ -167,19 +243,19 @@ message SCMHeartbeatResponseProto {
* |
* |
* |
- * +----------v-------------+
- * | Searching for SCM +------------+
- * +----------+-------------+ |
+ * ----------v-------------
+ * | Searching for SCM ------------
+ * ---------- ------------- |
* | |
* | |
- * | +----------v-------------+
+ * | ----------v-------------
* | | Register if needed |
- * | +-----------+------------+
+ * | ----------- ------------
* | |
* v |
- * +-----------+----------------+ |
- * +---------+ Heartbeat state <--------+
- * | +--------^-------------------+
+ * ----------- ---------------- |
+ * --------- Heartbeat state <--------
+ * | --------^-------------------
* | |
* | |
* | |
@@ -187,7 +263,7 @@ message SCMHeartbeatResponseProto {
* | |
* | |
* | |
- * +------------------+
+ * ------------------
*
*
*
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java
index b20202e9d2..a9787cefa4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java
@@ -20,10 +20,9 @@
import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
import org.apache.hadoop.ozone.protocol.VersionResponse;
import org.apache.hadoop.ozone.protocol.commands.NullCommand;
-import org.apache.hadoop.ozone.protocol.proto
- .StorageContainerDatanodeProtocolProtos;
-import org.apache.hadoop.ozone.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ReportState;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
import org.apache.hadoop.ozone.scm.VersionInfo;
import java.io.IOException;
@@ -37,6 +36,7 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol {
private int rpcResponseDelay;
private AtomicInteger heartbeatCount = new AtomicInteger(0);
private AtomicInteger rpcCount = new AtomicInteger(0);
+ private ReportState reportState;
/**
* Returns the number of heartbeats made to this class.
@@ -112,10 +112,11 @@ private void sleepIfNeeded() {
*/
@Override
public StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto
- sendHeartbeat(DatanodeID datanodeID, SCMNodeReport nodeReport)
- throws IOException {
+ sendHeartbeat(DatanodeID datanodeID, SCMNodeReport nodeReport,
+ ReportState reportState) throws IOException {
rpcCount.incrementAndGet();
heartbeatCount.incrementAndGet();
+ this.reportState = reportState;
sleepIfNeeded();
StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto
cmdResponse = StorageContainerDatanodeProtocolProtos
@@ -153,4 +154,8 @@ private void sleepIfNeeded() {
StorageContainerDatanodeProtocolProtos
.SCMRegisteredCmdResponseProto.ErrorCode.success).build();
}
+
+ public ReportState getReportState() {
+ return this.reportState;
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java
index d9e4cd78a8..b0e361604a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java
@@ -26,6 +26,7 @@
import org.apache.hadoop.ozone.container.common.states.DatanodeState;
import org.apache.hadoop.ozone.container.common.states.datanode.InitDatanodeState;
import org.apache.hadoop.ozone.container.common.states.datanode.RunningDatanodeState;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.junit.After;
@@ -48,6 +49,8 @@
import java.util.concurrent.TimeoutException;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+ .OZONE_SCM_HEARTBEAT_RPC_TIMEOUT;
/**
* Tests the datanode state machine class and its states.
@@ -65,6 +68,7 @@ public class TestDatanodeStateMachine {
@Before
public void setUp() throws Exception {
conf = SCMTestUtils.getConf();
+ conf.setInt(OZONE_SCM_HEARTBEAT_RPC_TIMEOUT, 500);
serverAddresses = new LinkedList<>();
scmServers = new LinkedList<>();
mockServers = new LinkedList<>();
@@ -194,9 +198,9 @@ public void testDatanodeStateContext() throws IOException,
// This execute will invoke getVersion calls against all SCM endpoints
// that we know of.
- task.execute(executorService);
- newState = task.await(2, TimeUnit.SECONDS);
+ task.execute(executorService);
+ newState = task.await(10, TimeUnit.SECONDS);
// If we are in running state, we should be in running.
Assert.assertEquals(DatanodeStateMachine.DatanodeStates.RUNNING,
newState);
@@ -246,8 +250,14 @@ public void testDatanodeStateContext() throws IOException,
Assert.assertEquals(DatanodeStateMachine.DatanodeStates.RUNNING,
newState);
+
for (ScmTestMock mock : mockServers) {
Assert.assertEquals(1, mock.getHeartbeatCount());
+ // Assert that heartbeat did indeed carry that State that we said
+ // have in the datanode.
+ Assert.assertEquals(mock.getReportState().getState().getNumber(),
+ StorageContainerDatanodeProtocolProtos.ReportState.states
+ .noContainerReports.getNumber());
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
index 9404393faf..58f74f42cc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
@@ -20,7 +20,8 @@
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
+import org.apache.hadoop.ozone.container.common.statemachine
+ .DatanodeStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine
.EndpointStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
@@ -30,16 +31,18 @@
.RegisterEndpointTask;
import org.apache.hadoop.ozone.container.common.states.endpoint
.VersionEndpointTask;
+import org.apache.hadoop.ozone.protocol.proto
+ .StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
-import org.apache.hadoop.ozone.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SCMStorageReport;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto;
+import org.apache.hadoop.ozone.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.SCMStorageReport;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
import org.apache.hadoop.ozone.protocol.proto
@@ -58,6 +61,9 @@
import java.util.UUID;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
+import static org.apache.hadoop.ozone.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.ReportState.states
+ .noContainerReports;
/**
* Tests the endpoints.
@@ -67,6 +73,27 @@ public class TestEndPoint {
private static RPC.Server scmServer;
private static ScmTestMock scmServerImpl;
private static File testDir;
+ private static StorageContainerDatanodeProtocolProtos.ReportState
+ defaultReportState;
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ if (scmServer != null) {
+ scmServer.stop();
+ }
+ FileUtil.fullyDelete(testDir);
+ }
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ serverAddress = SCMTestUtils.getReuseableAddress();
+ scmServerImpl = new ScmTestMock();
+ scmServer = SCMTestUtils.startScmRpcServer(SCMTestUtils.getConf(),
+ scmServerImpl, serverAddress, 10);
+ testDir = PathUtils.getTestDir(TestEndPoint.class);
+ defaultReportState = StorageContainerDatanodeProtocolProtos.ReportState.
+ newBuilder().setState(noContainerReports).build();
+ }
@Test
/**
@@ -255,7 +282,7 @@ public void testHeartbeat() throws Exception {
srb.setCapacity(2000).setScmUsed(500).setRemaining(1500).build();
nrb.addStorageReport(srb);
SCMHeartbeatResponseProto responseProto = rpcEndPoint.getEndPoint()
- .sendHeartbeat(dataNode, nrb.build());
+ .sendHeartbeat(dataNode, nrb.build(), defaultReportState);
Assert.assertNotNull(responseProto);
Assert.assertEquals(1, responseProto.getCommandsCount());
Assert.assertNotNull(responseProto.getCommandsList().get(0));
@@ -322,21 +349,4 @@ public void testHeartbeatTaskRpcTimeOut() throws Exception {
scmServerImpl.setRpcResponseDelay(0);
Assert.assertThat(end - start, new LessOrEqual<>(rpcTimeout + tolerance));
}
-
- @AfterClass
- public static void tearDown() throws Exception {
- if (scmServer != null) {
- scmServer.stop();
- }
- FileUtil.fullyDelete(testDir);
- }
-
- @BeforeClass
- public static void setUp() throws Exception {
- serverAddress = SCMTestUtils.getReuseableAddress();
- scmServerImpl = new ScmTestMock();
- scmServer = SCMTestUtils.startScmRpcServer(SCMTestUtils.getConf(),
- scmServerImpl, serverAddress, 10);
- testDir = PathUtils.getTestDir(TestEndPoint.class);
- }
}